Skip to Content
TroubleshootingKafka rebalance-loop

Kafka consumer: rebalance loop

Runbook: consumer-group постоянно перераспределяет партиции между репликами, сообщения идут рывками или не идут вообще, в логах Group X rebalancing. Симптом смежный со «consumer застрял», но причина другая — broker считает consumer мёртвым и передаёт его партиции другой реплике, та тоже не справляется, цикл повторяется.

Общий runbook по стуку consumer’а — в kafka-consumer-stuck. Эта страница — специально про rebalance: как отличить, где смотреть, какие параметры крутить. Reference по Watermill + Kafka — в ../conventions/events.

Содержание

Симптомы

  • kafka_consumer_lag колеблется: то растёт, то частично опадает, никогда не стабилизируется близко к 0.

  • В логах сервиса повторяются сообщения:

    group coordinator dead, marking as unknown attempting to join group ... group rebalancing: new members joined partitions revoked: [...] partitions assigned: [...]

    Цикл повторяется каждые 30–60 секунд.

  • kafka-consumer-groups --describe --group <name> показывает разные CONSUMER-ID / HOST на последовательных запусках — члены группы меняются.

  • Метрика messages_processed_total идёт рывками: 200 сообщений проехали → тишина на 30 секунд → ещё 200.

Быстрая проверка: это rebalance?

Отличия от «просто застрявшего» consumer’а:

StuckRebalance loop
ЛагРастёт монотонноКолеблется
Сообщения «едут»НетРывками
MEMBERS в describeСтабильныеМеняются
ЛогиТишина или постоянный error«joining group», «revoke», «assign»
state consumer-group’ыStablePreparingRebalance / CompletingRebalance

Проверка state:

kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \ --describe --group <name> --state

Жизненный цикл consumer в group

Чтобы понять, почему происходит rebalance, нужна модель:

Rebalance триггерится:

  1. Новый consumer вступил в group — deploy новой реплики, autoscaler добавил pod.
  2. Consumer вышел из group — pod упал, SIGTERM, rolling restart.
  3. Consumer не шлёт heartbeat > session.timeout.ms — broker считает его мёртвым.
  4. Consumer не делает poll() > max.poll.interval.ms — broker считает его зависшим.

Причины 3 и 4 — самые частые для rebalance-loop в проде.

1. Медленный handler

Самая частая причина. Handler обрабатывает одно сообщение дольше max.poll.interval.ms (default 300s в Sarama / Watermill). Во время обработки consumer не может сделать poll() за следующей партией — broker считает его зависшим, начинает rebalance. После rebalance’а consumer «возвращается» с новой сессией, но в handler’е та же тяжёлая логика — цикл повторяется.

Проверка.

  • Tempo-trace по одному сообщению: сколько длится handler.Handle span?
  • Метрика messages_handler_duration_seconds_bucket — есть ли хвост

    100s?

  • В логах: «processing message id=X» без парного «finished message id=X» на протяжении минут.

Фикс.

Handler должен отвечать быстро — не вся работа, а только «принять команду». Тяжёлую часть — в отдельный воркер:

// ПЛОХО: handler делает всё внутри себя func (h *Handler) Handle(msg *message.Message) error { ctx := msg.Context() if err := h.svc.ProcessAllImages(ctx, payload); err != nil { // 10 минут return err } return nil } // ХОРОШО: handler сохраняет команду, воркер обрабатывает асинхронно func (h *Handler) Handle(msg *message.Message) error { ctx := msg.Context() return h.svc.EnqueueJob(ctx, payload) // < 100ms: INSERT в БД }

«Тяжёлый» воркер отдельно читает задания из БД (или из отдельной Kafka-topic’и) и обрабатывает их. Consumer на основном топике — лёгкий, быстрый, всегда успевает poll().

Подъём max.poll.interval.ms — маскирующее решение. Если handler реально должен обрабатывать 5 минут одно сообщение, подними лимит; но чаще оказывается, что handler должен делать < 1s, и он делает минуты из-за неоптимизации.

2. GC pauses

Go-GC не блокирует поток goroutines > миллисекунд в современных версиях, но при 4GB heap и плотной нагрузке паузы могут достигать секунд. Сервис с такими паузами пропускает heartbeat’ы.

Проверка.

  • Метрика go_gc_duration_seconds — 99th процентиль > 100ms → подозрительно.
  • go_memstats_heap_inuse_bytes — если > 2GB и растёт, см. memory-leak.
  • Лог: в момент rebalance Check GC forced в stderr.

Фикс.

  • Оптимизировать память: пул буферов, ограничить in-memory cache, уменьшить allocation rate.
  • GOGC — выше (150 вместо default 100) уменьшает частоту GC, но увеличивает heap.
  • GOMEMLIMIT (Go 1.19+) — цели-лимит, GC активнее при приближении.

3. Неверный heartbeat

Watermill kafka-subscriber использует Sarama под капотом. Параметры:

ПараметрDefaultЧто значит
session.timeout.ms45sBroker считает consumer мёртвым, если heartbeat не приходил столько
heartbeat.interval.ms3sConsumer шлёт heartbeat каждые столько
max.poll.interval.ms300s (5 минут)Между двумя Poll() не должно пройти больше

Правило: heartbeat.interval.ms < session.timeout.ms / 3. Отклонение → broker делает rebalance при одном пропущенном heartbeat.

Heartbeat в современных клиентах — отдельная goroutine, она продолжает работать, пока handler обрабатывает сообщение. Так что долгий handler не блокирует heartbeat. Но он блокирует pollmax.poll.interval.ms нарушается.

4. Новая реплика поднимается / падает

Симптом. Rebalance при каждом деплое или autoscale-событии.

Это штатно: когда pod входит/выходит из group, rebalance неизбежен. Но:

  • Deploy strategy не должен дёргать rebalance каждые 30 секунд. Rolling deploy — да, 1-2 rebalance’а, стабилизация, всё ок.
  • CrashLoopBackOff pod’а = бесконечные rebalance’ы. Починка — разобрать, почему pod падает (логи, exit code).
  • HPA с агрессивными порогами скейлит вверх-вниз каждую минуту → то же самое. Настрой behavior.scaleDown.stabilizationWindowSeconds хотя бы 300.

Фикс.

  • CrashLoop → читаем kubectl logs <pod> --previous, фиксим.
  • HPA-флаппинг → увеличить stabilization window, пересмотреть thresholds.
  • Cooperative rebalance (см. §6) — снижает «боль» нормальных rebalance’ов, стоит включить.

5. Network flap между consumer и broker

Симптом. В логах consumer «connection reset», «i/o timeout» по kafka-порту, rebalance.

Причина. Сетевая проблема между pod’ом сервиса и брокером: NetworkPolicy change, broker под нагрузкой, node-проблема.

Проверка.

# из pod'а сервиса kubectl exec -it <pod> -- sh -c "nc -zv <broker-host> 9092" # метрики broker'а: CPU, disk IO, network # Kafka UI / Grafana Dashboard на Kafka broker'ах

Фикс. Обычно не в ведении backend-owner’а: эскалация к инфра-команде. Пока ждёшь — увеличить session.timeout.ms до 60–90s, чтобы мелкие flap’ы не триггерили rebalance.

6. Статическое членство (static membership)

group.instance.id — фича Kafka 2.3+, позволяет pod’у при рестарте вернуться в group под тем же identity без rebalance. Это особенно ценно при rolling-deploy.

Настройка в Watermill (Sarama config):

saramaCfg := sarama.NewConfig() saramaCfg.Version = sarama.V2_6_0_0 // минимум 2.3 saramaCfg.Consumer.Group.InstanceId = os.Getenv("HOSTNAME") // + установить session.timeout.ms > grace period деплоя saramaCfg.Consumer.Group.Session.Timeout = 60 * time.Second

Эффект: при рестарте pod’а за < 60s broker «подождёт» возвращения, без rebalance. При действительном исчезновении pod’а (> 60s) — штатный rebalance.

Не применяй, если pod’ы эфемерны и часто пересоздаются с новыми именами — тогда static membership теряет смысл.

Cooperative rebalance protocol (KIP-429, Sarama 1.28+) — параллельно включи:

saramaCfg.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyCooperativeSticky()

Отличие от default (eager): при rebalance старые members не теряют свои партиции, только передают часть новому. Меньше «проседания» процессинга при деплое.

Параметры клиента

Сводная таблица, когда что крутить:

СимптомПараметрКак изменить
Rebalance при deploygroup.instance.id (static) + session.timeout.ms > graceВключить
Handler иногда > 5 минутmax.poll.interval.msУвеличить до handler_p99 × 3
Network flapsession.timeout.ms60–90s
Rebalance при scale-outrebalance.strategy = cooperative-stickyВключить
Слишком много heartbeat-трафикаheartbeat.interval.msУвеличить, но не больше session.timeout / 3

Максимальные значения, за которые лучше не переходить без согласования с Kafka-owner’ом:

  • session.timeout.ms ≤ 120s — больше = долго ждать смерти реального умершего consumer’а → лаг.
  • max.poll.interval.ms ≤ 600s — больше указывает на то, что handler должен быть разделён (§1).

Все эти параметры устанавливаются в internal/config/ сервис-репо и проезжают через env. Изменение — через PR.

Как правильно скейлить

num.partitions топика задаёт потолок параллелизма consumer’ов. Если num.partitions = 4, запустить 8 реплик сервиса — 4 будут idle.

  • Scale-out реплик ≤ num.partitions. Всё лишнее — деньги на ветер.
  • Увеличение num.partitions — имеет последствия для ordering, см. kafka-consumer-stuck.md §4.
  • Key-hash балансировка — если один partition key горячий (10% сообщений приходит с одним user_id), увеличение партиций не поможет: один партишн — одна горячая партия. Решение — partition_key с более широким spread’ом (hash от uuid, не от бизнес-ID).

Что не делать

  • Не ставить `max.poll.interval.ms = 1h «чтобы rebalance не был». Это значит: при реальном зависании handler’а broker будет ждать час, пока начнёт rebalance. Лаг вырастет до часов.
  • Не делать group.instance.id = "consumer-1" (статическое имя). Если 3 реплики → все три имеют одну ID → broker считает, что «один consumer залогинился 3 раза». Имя должно быть уникально per-реплика: os.Getenv("HOSTNAME") или подобное.
  • Не использовать один consumer-group name для нескольких сервисов — см. kafka-consumer-stuck.
  • Не игнорировать постоянный rebalance как «так работает». Rebalance каждые 30 секунд = consumer не работает эффективно, каждое 5-е сообщение будет обработано с задержкой.
  • Не крутить параметры без мониторинга. Сначала метрики на consumer lag, latency handler’а, rebalance count; потом крути.

Чеклист

  • Подтвердил: rebalance, а не просто stuck — MEMBERS меняются, в логах «rebalancing».
  • Смотрел p99 handler duration: если > 60s — §1.
  • Проверил go_gc_duration_seconds: нет пауз > 100ms.
  • Проверил pod-stability: нет CrashLoop, HPA не флаппит.
  • Network-проверка: consumer достаёт broker без connection reset.
  • Параметры клиента посмотрены в internal/config/: соответствуют табличке §Параметры клиента.
  • Если deploy триггерит rebalance каждый раз — включён group.instance.id + cooperative-sticky.
  • Если тяжёлый handler реально нужен — вынесен в async-воркер, consumer делает только INSERT / enqueue.
  • num.partitions ≥ replica count.
  • После фикса: kafka_consumer_lag стабилизируется, state группы Stable > 10 минут подряд.

Связанные разделы

  • kafka-consumer-stuck — общий runbook по застрявшему consumer’у, если не rebalance — иди туда.
  • ../conventions/events — middleware stack, envelope, retry.
  • ../patterns/idempotent-consumer — дедупликация; при частых rebalance’ах дубли неизбежны, handler обязан быть idempotent.
  • ../patterns/outbox — partition count и ordering guarantees.
  • memory-leak — если GC-паузы вызывают rebalance из-за тяжёлого heap.
  • ../how-to/debug-outbox-lag — если source-сервис (publisher) лагает; consumer сам может быть в норме.
Last updated on