Перейти к содержанию

Kafka consumer: rebalance loop

Runbook: consumer-group постоянно перераспределяет партиции между репликами, сообщения идут рывками или не идут вообще, в логах Group X rebalancing. Симптом смежный со «consumer застрял», но причина другая — broker считает consumer мёртвым и передаёт его партиции другой реплике, та тоже не справляется, цикл повторяется.

Общий runbook по стуку consumer'а — в kafka-consumer-stuck.md. Эта страница — специально про rebalance: как отличить, где смотреть, какие параметры крутить. Reference по Watermill + Kafka — в ../conventions/events.md.

Содержание

Симптомы

  • kafka_consumer_lag колеблется: то растёт, то частично опадает, никогда не стабилизируется близко к 0.
  • В логах сервиса повторяются сообщения:
group coordinator dead, marking as unknown
attempting to join group ...
group rebalancing: new members joined
partitions revoked: [...]
partitions assigned: [...]

Цикл повторяется каждые 30–60 секунд. - kafka-consumer-groups --describe --group <name> показывает разные CONSUMER-ID / HOST на последовательных запусках — члены группы меняются. - Метрика messages_processed_total идёт рывками: 200 сообщений проехали → тишина на 30 секунд → ещё 200.

Быстрая проверка: это rebalance?

Отличия от «просто застрявшего» consumer'а:

Stuck Rebalance loop
Лаг Растёт монотонно Колеблется
Сообщения «едут» Нет Рывками
MEMBERS в describe Стабильные Меняются
Логи Тишина или постоянный error «joining group», «revoke», «assign»
state consumer-group'ы Stable PreparingRebalance / CompletingRebalance

Проверка state:

kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \
    --describe --group <name> --state

Жизненный цикл consumer в group

Чтобы понять, почему происходит rebalance, нужна модель:

stateDiagram-v2
    [*] --> Joining: consumer starts
    Joining --> Stable: coordinator assigned partitions
    Stable --> Rebalancing: member timeout / join / leave
    Rebalancing --> Stable: assignment done
    Stable --> [*]: consumer stops

Rebalance триггерится:

  1. Новый consumer вступил в group — deploy новой реплики, autoscaler добавил pod.
  2. Consumer вышел из group — pod упал, SIGTERM, rolling restart.
  3. Consumer не шлёт heartbeat > session.timeout.ms — broker считает его мёртвым.
  4. Consumer не делает poll() > max.poll.interval.ms — broker считает его зависшим.

Причины 3 и 4 — самые частые для rebalance-loop в проде.

1. Медленный handler

Самая частая причина. Handler обрабатывает одно сообщение дольше max.poll.interval.ms (default 300s в Sarama / Watermill). Во время обработки consumer не может сделать poll() за следующей партией — broker считает его зависшим, начинает rebalance. После rebalance'а consumer «возвращается» с новой сессией, но в handler'е та же тяжёлая логика — цикл повторяется.

Проверка.

  • Tempo-trace по одному сообщению: сколько длится handler.Handle span?
  • Метрика messages_handler_duration_seconds_bucket — есть ли хвост

    100s?

  • В логах: «processing message id=X» без парного «finished message id=X» на протяжении минут.

Фикс.

Handler должен отвечать быстро — не вся работа, а только «принять команду». Тяжёлую часть — в отдельный воркер:

// ПЛОХО: handler делает всё внутри себя
func (h *Handler) Handle(msg *message.Message) error {
    ctx := msg.Context()
    if err := h.svc.ProcessAllImages(ctx, payload); err != nil { // 10 минут
        return err
    }
    return nil
}

// ХОРОШО: handler сохраняет команду, воркер обрабатывает асинхронно
func (h *Handler) Handle(msg *message.Message) error {
    ctx := msg.Context()
    return h.svc.EnqueueJob(ctx, payload) // < 100ms: INSERT в БД
}

«Тяжёлый» воркер отдельно читает задания из БД (или из отдельной Kafka-topic'и) и обрабатывает их. Consumer на основном топике — лёгкий, быстрый, всегда успевает poll().

Подъём max.poll.interval.ms — маскирующее решение. Если handler реально должен обрабатывать 5 минут одно сообщение, подними лимит; но чаще оказывается, что handler должен делать < 1s, и он делает минуты из-за неоптимизации.

2. GC pauses

Go-GC не блокирует поток goroutines > миллисекунд в современных версиях, но при 4GB heap и плотной нагрузке паузы могут достигать секунд. Сервис с такими паузами пропускает heartbeat'ы.

Проверка.

  • Метрика go_gc_duration_seconds — 99th процентиль > 100ms → подозрительно.
  • go_memstats_heap_inuse_bytes — если > 2GB и растёт, см. memory-leak.md.
  • Лог: в момент rebalance Check GC forced в stderr.

Фикс.

  • Оптимизировать память: пул буферов, ограничить in-memory cache, уменьшить allocation rate.
  • GOGC — выше (150 вместо default 100) уменьшает частоту GC, но увеличивает heap.
  • GOMEMLIMIT (Go 1.19+) — цели-лимит, GC активнее при приближении.

3. Неверный heartbeat

Watermill kafka-subscriber использует Sarama под капотом. Параметры:

Параметр Default Что значит
session.timeout.ms 45s Broker считает consumer мёртвым, если heartbeat не приходил столько
heartbeat.interval.ms 3s Consumer шлёт heartbeat каждые столько
max.poll.interval.ms 300s (5 минут) Между двумя Poll() не должно пройти больше

Правило: heartbeat.interval.ms < session.timeout.ms / 3. Отклонение → broker делает rebalance при одном пропущенном heartbeat.

Heartbeat в современных клиентах — отдельная goroutine, она продолжает работать, пока handler обрабатывает сообщение. Так что долгий handler не блокирует heartbeat. Но он блокирует pollmax.poll.interval.ms нарушается.

4. Новая реплика поднимается / падает

Симптом. Rebalance при каждом деплое или autoscale-событии.

Это штатно: когда pod входит/выходит из group, rebalance неизбежен. Но:

  • Deploy strategy не должен дёргать rebalance каждые 30 секунд. Rolling deploy — да, 1-2 rebalance'а, стабилизация, всё ок.
  • CrashLoopBackOff pod'а = бесконечные rebalance'ы. Починка — разобрать, почему pod падает (логи, exit code).
  • HPA с агрессивными порогами скейлит вверх-вниз каждую минуту → то же самое. Настрой behavior.scaleDown.stabilizationWindowSeconds хотя бы 300.

Фикс.

  • CrashLoop → читаем kubectl logs <pod> --previous, фиксим.
  • HPA-флаппинг → увеличить stabilization window, пересмотреть thresholds.
  • Cooperative rebalance (см. §6) — снижает «боль» нормальных rebalance'ов, стоит включить.

5. Network flap между consumer и broker

Симптом. В логах consumer «connection reset», «i/o timeout» по kafka-порту, rebalance.

Причина. Сетевая проблема между pod'ом сервиса и брокером: NetworkPolicy change, broker под нагрузкой, node-проблема.

Проверка.

# из pod'а сервиса
kubectl exec -it <pod> -- sh -c "nc -zv <broker-host> 9092"

# метрики broker'а: CPU, disk IO, network
# Kafka UI / Grafana Dashboard на Kafka broker'ах

Фикс. Обычно не в ведении backend-owner'а: эскалация к инфра-команде. Пока ждёшь — увеличить session.timeout.ms до 60–90s, чтобы мелкие flap'ы не триггерили rebalance.

6. Статическое членство (static membership)

group.instance.id — фича Kafka 2.3+, позволяет pod'у при рестарте вернуться в group под тем же identity без rebalance. Это особенно ценно при rolling-deploy.

Настройка в Watermill (Sarama config):

saramaCfg := sarama.NewConfig()
saramaCfg.Version = sarama.V2_6_0_0  // минимум 2.3
saramaCfg.Consumer.Group.InstanceId = os.Getenv("HOSTNAME")
// + установить session.timeout.ms > grace period деплоя
saramaCfg.Consumer.Group.Session.Timeout = 60 * time.Second

Эффект: при рестарте pod'а за < 60s broker «подождёт» возвращения, без rebalance. При действительном исчезновении pod'а (> 60s) — штатный rebalance.

Не применяй, если pod'ы эфемерны и часто пересоздаются с новыми именами — тогда static membership теряет смысл.

Cooperative rebalance protocol (KIP-429, Sarama 1.28+) — параллельно включи:

saramaCfg.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyCooperativeSticky()

Отличие от default (eager): при rebalance старые members не теряют свои партиции, только передают часть новому. Меньше «проседания» процессинга при деплое.

Параметры клиента

Сводная таблица, когда что крутить:

Симптом Параметр Как изменить
Rebalance при deploy group.instance.id (static) + session.timeout.ms > grace Включить
Handler иногда > 5 минут max.poll.interval.ms Увеличить до handler_p99 × 3
Network flap session.timeout.ms 60–90s
Rebalance при scale-out rebalance.strategy = cooperative-sticky Включить
Слишком много heartbeat-трафика heartbeat.interval.ms Увеличить, но не больше session.timeout / 3

Максимальные значения, за которые лучше не переходить без согласования с Kafka-owner'ом:

  • session.timeout.ms ≤ 120s — больше = долго ждать смерти реального умершего consumer'а → лаг.
  • max.poll.interval.ms ≤ 600s — больше указывает на то, что handler должен быть разделён (§1).

Все эти параметры устанавливаются в internal/config/ сервис-репо и проезжают через env. Изменение — через PR.

Как правильно скейлить

num.partitions топика задаёт потолок параллелизма consumer'ов. Если num.partitions = 4, запустить 8 реплик сервиса — 4 будут idle.

  • Scale-out реплик ≤ num.partitions. Всё лишнее — деньги на ветер.
  • Увеличение num.partitions — имеет последствия для ordering, см. kafka-consumer-stuck.md §4.
  • Key-hash балансировка — если один partition key горячий (10% сообщений приходит с одним user_id), увеличение партиций не поможет: один партишн — одна горячая партия. Решение — partition_key с более широким spread'ом (hash от uuid, не от бизнес-ID).

Что не делать

  • Не ставить `max.poll.interval.ms = 1h «чтобы rebalance не был». Это значит: при реальном зависании handler'а broker будет ждать час, пока начнёт rebalance. Лаг вырастет до часов.
  • Не делать group.instance.id = "consumer-1" (статическое имя). Если 3 реплики → все три имеют одну ID → broker считает, что «один consumer залогинился 3 раза». Имя должно быть уникально per-реплика: os.Getenv("HOSTNAME") или подобное.
  • Не использовать один consumer-group name для нескольких сервисов — см. kafka-consumer-stuck.md.
  • Не игнорировать постоянный rebalance как «так работает». Rebalance каждые 30 секунд = consumer не работает эффективно, каждое 5-е сообщение будет обработано с задержкой.
  • Не крутить параметры без мониторинга. Сначала метрики на consumer lag, latency handler'а, rebalance count; потом крути.

Чеклист

  • Подтвердил: rebalance, а не просто stuck — MEMBERS меняются, в логах «rebalancing».
  • Смотрел p99 handler duration: если > 60s — §1.
  • Проверил go_gc_duration_seconds: нет пауз > 100ms.
  • Проверил pod-stability: нет CrashLoop, HPA не флаппит.
  • Network-проверка: consumer достаёт broker без connection reset.
  • Параметры клиента посмотрены в internal/config/: соответствуют табличке §Параметры клиента.
  • Если deploy триггерит rebalance каждый раз — включён group.instance.id + cooperative-sticky.
  • Если тяжёлый handler реально нужен — вынесен в async-воркер, consumer делает только INSERT / enqueue.
  • num.partitions ≥ replica count.
  • После фикса: kafka_consumer_lag стабилизируется, state группы Stable > 10 минут подряд.

Связанные разделы