Kafka consumer: rebalance loop¶
Runbook: consumer-group постоянно перераспределяет партиции между
репликами, сообщения идут рывками или не идут вообще, в логах
Group X rebalancing. Симптом смежный со «consumer застрял», но
причина другая — broker считает consumer мёртвым и передаёт его
партиции другой реплике, та тоже не справляется, цикл повторяется.
Общий runbook по стуку consumer'а — в
kafka-consumer-stuck.md. Эта страница —
специально про rebalance: как отличить, где смотреть, какие
параметры крутить. Reference по Watermill + Kafka — в
../conventions/events.md.
Содержание¶
- Симптомы
- Быстрая проверка: это rebalance?
- Жизненный цикл consumer в group
- 1. Медленный handler
- 2. GC pauses
- 3. Неверный heartbeat
- 4. Новая реплика поднимается / падает
- 5. Network flap между consumer и broker
- 6. Статическое членство (static membership)
- Параметры клиента
- Как правильно скейлить
- Что не делать
- Чеклист
- Связанные разделы
Симптомы¶
kafka_consumer_lagколеблется: то растёт, то частично опадает, никогда не стабилизируется близко к 0.- В логах сервиса повторяются сообщения:
group coordinator dead, marking as unknown
attempting to join group ...
group rebalancing: new members joined
partitions revoked: [...]
partitions assigned: [...]
Цикл повторяется каждые 30–60 секунд.
- kafka-consumer-groups --describe --group <name> показывает
разные CONSUMER-ID / HOST на последовательных запусках — члены
группы меняются.
- Метрика messages_processed_total идёт рывками: 200 сообщений
проехали → тишина на 30 секунд → ещё 200.
Быстрая проверка: это rebalance?¶
Отличия от «просто застрявшего» consumer'а:
| Stuck | Rebalance loop | |
|---|---|---|
| Лаг | Растёт монотонно | Колеблется |
| Сообщения «едут» | Нет | Рывками |
MEMBERS в describe |
Стабильные | Меняются |
| Логи | Тишина или постоянный error | «joining group», «revoke», «assign» |
state consumer-group'ы |
Stable |
PreparingRebalance / CompletingRebalance |
Проверка state:
Жизненный цикл consumer в group¶
Чтобы понять, почему происходит rebalance, нужна модель:
stateDiagram-v2
[*] --> Joining: consumer starts
Joining --> Stable: coordinator assigned partitions
Stable --> Rebalancing: member timeout / join / leave
Rebalancing --> Stable: assignment done
Stable --> [*]: consumer stops
Rebalance триггерится:
- Новый consumer вступил в group — deploy новой реплики, autoscaler добавил pod.
- Consumer вышел из group — pod упал, SIGTERM, rolling restart.
- Consumer не шлёт heartbeat >
session.timeout.ms— broker считает его мёртвым. - Consumer не делает
poll()>max.poll.interval.ms— broker считает его зависшим.
Причины 3 и 4 — самые частые для rebalance-loop в проде.
1. Медленный handler¶
Самая частая причина. Handler обрабатывает одно сообщение дольше
max.poll.interval.ms (default 300s в Sarama / Watermill). Во время
обработки consumer не может сделать poll() за следующей партией —
broker считает его зависшим, начинает rebalance. После rebalance'а
consumer «возвращается» с новой сессией, но в handler'е та же тяжёлая
логика — цикл повторяется.
Проверка.
- Tempo-trace по одному сообщению: сколько длится
handler.Handlespan? - Метрика
messages_handler_duration_seconds_bucket— есть ли хвост100s?
- В логах: «processing message id=X» без парного «finished message id=X» на протяжении минут.
Фикс.
Handler должен отвечать быстро — не вся работа, а только «принять команду». Тяжёлую часть — в отдельный воркер:
// ПЛОХО: handler делает всё внутри себя
func (h *Handler) Handle(msg *message.Message) error {
ctx := msg.Context()
if err := h.svc.ProcessAllImages(ctx, payload); err != nil { // 10 минут
return err
}
return nil
}
// ХОРОШО: handler сохраняет команду, воркер обрабатывает асинхронно
func (h *Handler) Handle(msg *message.Message) error {
ctx := msg.Context()
return h.svc.EnqueueJob(ctx, payload) // < 100ms: INSERT в БД
}
«Тяжёлый» воркер отдельно читает задания из БД (или из отдельной
Kafka-topic'и) и обрабатывает их. Consumer на основном топике —
лёгкий, быстрый, всегда успевает poll().
Подъём max.poll.interval.ms — маскирующее решение. Если handler
реально должен обрабатывать 5 минут одно сообщение, подними лимит;
но чаще оказывается, что handler должен делать < 1s, и он делает
минуты из-за неоптимизации.
2. GC pauses¶
Go-GC не блокирует поток goroutines > миллисекунд в современных версиях, но при 4GB heap и плотной нагрузке паузы могут достигать секунд. Сервис с такими паузами пропускает heartbeat'ы.
Проверка.
- Метрика
go_gc_duration_seconds— 99th процентиль > 100ms → подозрительно. go_memstats_heap_inuse_bytes— если > 2GB и растёт, см.memory-leak.md.- Лог: в момент rebalance Check
GC forcedв stderr.
Фикс.
- Оптимизировать память: пул буферов, ограничить in-memory cache, уменьшить allocation rate.
GOGC— выше (150 вместо default 100) уменьшает частоту GC, но увеличивает heap.GOMEMLIMIT(Go 1.19+) — цели-лимит, GC активнее при приближении.
3. Неверный heartbeat¶
Watermill kafka-subscriber использует Sarama под капотом. Параметры:
| Параметр | Default | Что значит |
|---|---|---|
session.timeout.ms |
45s | Broker считает consumer мёртвым, если heartbeat не приходил столько |
heartbeat.interval.ms |
3s | Consumer шлёт heartbeat каждые столько |
max.poll.interval.ms |
300s (5 минут) | Между двумя Poll() не должно пройти больше |
Правило: heartbeat.interval.ms < session.timeout.ms / 3. Отклонение
→ broker делает rebalance при одном пропущенном heartbeat.
Heartbeat в современных клиентах — отдельная goroutine, она
продолжает работать, пока handler обрабатывает сообщение. Так что
долгий handler не блокирует heartbeat. Но он блокирует poll →
max.poll.interval.ms нарушается.
4. Новая реплика поднимается / падает¶
Симптом. Rebalance при каждом деплое или autoscale-событии.
Это штатно: когда pod входит/выходит из group, rebalance неизбежен. Но:
- Deploy strategy не должен дёргать rebalance каждые 30 секунд. Rolling deploy — да, 1-2 rebalance'а, стабилизация, всё ок.
CrashLoopBackOffpod'а = бесконечные rebalance'ы. Починка — разобрать, почему pod падает (логи, exit code).- HPA с агрессивными порогами скейлит вверх-вниз каждую минуту → то же
самое. Настрой
behavior.scaleDown.stabilizationWindowSecondsхотя бы 300.
Фикс.
CrashLoop→ читаемkubectl logs <pod> --previous, фиксим.- HPA-флаппинг → увеличить stabilization window, пересмотреть thresholds.
- Cooperative rebalance (см. §6) — снижает «боль» нормальных rebalance'ов, стоит включить.
5. Network flap между consumer и broker¶
Симптом. В логах consumer «connection reset», «i/o timeout» по kafka-порту, rebalance.
Причина. Сетевая проблема между pod'ом сервиса и брокером: NetworkPolicy change, broker под нагрузкой, node-проблема.
Проверка.
# из pod'а сервиса
kubectl exec -it <pod> -- sh -c "nc -zv <broker-host> 9092"
# метрики broker'а: CPU, disk IO, network
# Kafka UI / Grafana Dashboard на Kafka broker'ах
Фикс. Обычно не в ведении backend-owner'а: эскалация к
инфра-команде. Пока ждёшь — увеличить session.timeout.ms до 60–90s,
чтобы мелкие flap'ы не триггерили rebalance.
6. Статическое членство (static membership)¶
group.instance.id — фича Kafka 2.3+, позволяет pod'у при рестарте
вернуться в group под тем же identity без rebalance. Это особенно
ценно при rolling-deploy.
Настройка в Watermill (Sarama config):
saramaCfg := sarama.NewConfig()
saramaCfg.Version = sarama.V2_6_0_0 // минимум 2.3
saramaCfg.Consumer.Group.InstanceId = os.Getenv("HOSTNAME")
// + установить session.timeout.ms > grace period деплоя
saramaCfg.Consumer.Group.Session.Timeout = 60 * time.Second
Эффект: при рестарте pod'а за < 60s broker «подождёт» возвращения, без rebalance. При действительном исчезновении pod'а (> 60s) — штатный rebalance.
Не применяй, если pod'ы эфемерны и часто пересоздаются с новыми именами — тогда static membership теряет смысл.
Cooperative rebalance protocol (KIP-429, Sarama 1.28+) — параллельно включи:
Отличие от default (eager): при rebalance старые members не теряют свои партиции, только передают часть новому. Меньше «проседания» процессинга при деплое.
Параметры клиента¶
Сводная таблица, когда что крутить:
| Симптом | Параметр | Как изменить |
|---|---|---|
| Rebalance при deploy | group.instance.id (static) + session.timeout.ms > grace |
Включить |
| Handler иногда > 5 минут | max.poll.interval.ms |
Увеличить до handler_p99 × 3 |
| Network flap | session.timeout.ms |
60–90s |
| Rebalance при scale-out | rebalance.strategy = cooperative-sticky |
Включить |
| Слишком много heartbeat-трафика | heartbeat.interval.ms |
Увеличить, но не больше session.timeout / 3 |
Максимальные значения, за которые лучше не переходить без согласования с Kafka-owner'ом:
session.timeout.ms≤ 120s — больше = долго ждать смерти реального умершего consumer'а → лаг.max.poll.interval.ms≤ 600s — больше указывает на то, что handler должен быть разделён (§1).
Все эти параметры устанавливаются в internal/config/ сервис-репо и
проезжают через env. Изменение — через PR.
Как правильно скейлить¶
num.partitions топика задаёт потолок параллелизма consumer'ов. Если
num.partitions = 4, запустить 8 реплик сервиса — 4 будут idle.
- Scale-out реплик ≤ num.partitions. Всё лишнее — деньги на ветер.
- Увеличение num.partitions — имеет
последствия для ordering,
см.
kafka-consumer-stuck.md§4. - Key-hash балансировка — если один partition key горячий (10%
сообщений приходит с одним
user_id), увеличение партиций не поможет: один партишн — одна горячая партия. Решение —partition_keyс более широким spread'ом (hash от uuid, не от бизнес-ID).
Что не делать¶
- Не ставить `max.poll.interval.ms = 1h «чтобы rebalance не был». Это значит: при реальном зависании handler'а broker будет ждать час, пока начнёт rebalance. Лаг вырастет до часов.
- Не делать
group.instance.id = "consumer-1"(статическое имя). Если 3 реплики → все три имеют одну ID → broker считает, что «один consumer залогинился 3 раза». Имя должно быть уникально per-реплика:os.Getenv("HOSTNAME")или подобное. - Не использовать один consumer-group name для нескольких
сервисов — см.
kafka-consumer-stuck.md. - Не игнорировать постоянный rebalance как «так работает». Rebalance каждые 30 секунд = consumer не работает эффективно, каждое 5-е сообщение будет обработано с задержкой.
- Не крутить параметры без мониторинга. Сначала метрики на consumer lag, latency handler'а, rebalance count; потом крути.
Чеклист¶
- Подтвердил: rebalance, а не просто stuck —
MEMBERSменяются, в логах «rebalancing». - Смотрел p99 handler duration: если > 60s — §1.
- Проверил
go_gc_duration_seconds: нет пауз > 100ms. - Проверил pod-stability: нет CrashLoop, HPA не флаппит.
- Network-проверка: consumer достаёт broker без
connection reset. - Параметры клиента посмотрены в
internal/config/: соответствуют табличке §Параметры клиента. - Если deploy триггерит rebalance каждый раз — включён
group.instance.id+cooperative-sticky. - Если тяжёлый handler реально нужен — вынесен в async-воркер, consumer делает только INSERT / enqueue.
-
num.partitions≥ replica count. - После фикса:
kafka_consumer_lagстабилизируется, state группыStable> 10 минут подряд.
Связанные разделы¶
kafka-consumer-stuck.md— общий runbook по застрявшему consumer'у, если не rebalance — иди туда.../conventions/events.md— middleware stack, envelope, retry.../patterns/idempotent-consumer.md— дедупликация; при частых rebalance'ах дубли неизбежны, handler обязан быть idempotent.../patterns/outbox.md— partition count и ordering guarantees.memory-leak.md— если GC-паузы вызывают rebalance из-за тяжёлого heap.../how-to/debug-outbox-lag.md— если source-сервис (publisher) лагает; consumer сам может быть в норме.