Перейти к содержанию

Consumer застрял

Consumer не потребляет сообщения, лаг растёт, обработчик молчит. Эта страница — runbook для такого инцидента. Описание штатной работы consumer'а, middleware stack, идемпотентности — в ../conventions/events.md и ../patterns/idempotent-consumer.md.

Содержание

Как увидеть, что consumer застрял

Метрики

kafka_consumer_lag{topic, consumer_group}
    — гейдж, current_offset_high_watermark − committed_offset.
    Растёт линейно → consumer не успевает или не работает совсем.
messages_processed_total{service, topic, handler, result="success"}
    — rate должен быть ≥ rate публикации. Упал в 0 при непустом топике
    → consumer не получает сообщения.
messages_processed_total{service, topic, handler, result="error"}
    — резкий рост при постоянном лаге → handler падает в каждом вызове.

Логи

В нормальном режиме handler пишет хотя бы INFO-запись при успехе и ERROR при сбое. Тишина grep по topic=<name> в логах сервиса за последние N минут — признак того, что сообщения не доходят до handler'а.

Внешние инструменты

# суммарный лаг consumer-group'ы
kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \
    --describe --group <consumer-group>

# содержимое топика с хвоста
kafka-console-consumer.sh --bootstrap-server "$KAFKA_BROKERS" \
    --topic <topic> --max-messages 5 --from-beginning

# конфигурация топика (partition count, replication)
kafka-topics.sh --bootstrap-server "$KAFKA_BROKERS" --describe --topic <topic>

В Kafka UI те же данные доступны через web. Смотри CURRENT-OFFSET и LOG-END-OFFSET per-partition: растёт ли LOG-END-OFFSET (публикация идёт), обновляется ли CURRENT-OFFSET (consumer двигается).

Типовые причины

1. Handler падает в panic → бесконечный retry

Симптом. В логах повторяющийся стектрейс panic'а, result="error" растёт, лаг не спадает.

Причина. middleware.Recoverer ловит panic и превращает в error, middleware.Retry переотправляет, handler снова паникует. Без PoisonQueue в стеке это бесконечно — consumer «стоит» на одном и том же сообщении.

Проверка. Порядок middleware в wiring сервиса:

router.AddMiddleware(
    middleware.CorrelationID,
    middleware.Recoverer,
    poisonQueue(kafkaPub, dlqTopic), // ← обязательно ДО Retry
    middleware.Retry{...}.Middleware,
    deduplicator(redisClient),
)

См. ../conventions/events.md §Consumer.

Фикс. 1. Подключи PoisonQueue перед Retry. После исчерпания попыток сообщение уйдёт в <topic>.dlq. 2. Пофикси root cause panic'а — обычно это nil-dereference от невалидного payload'а или отсутствующего поля metadata. 3. Деплой → consumer двинется дальше.

2. Handler блокируется на внешнем I/O без таймаута

Симптом. Логи показывают «начал обработку», но не показывают «закончил обработку». Goroutine-дамп (через /debug/pprof/goroutine) покажет handler в http.Client.Do или pgx.Pool.Acquire без движения.

Причина. Downstream HTTP / БД / Redis упёрся в TCP-ожидание без ctx с таймаутом.

Фикс.

func (h *Handler) Handle(msg *message.Message) error {
    ctx, cancel := context.WithTimeout(msg.Context(), 10*time.Second)
    defer cancel()
    return h.svc.Do(ctx, ...)
}

Все внешние клиенты в сервисе — через ctx. Правила — в ../conventions/shutdown.md и ../conventions/observability.md.

3. Rebalance loop (session timeout)

Симптом. В логах сервиса / брокера — повторяющиеся «Group X rebalancing». kafka-consumer-groups --describe показывает CONSUMER-ID / HOST меняющиеся каждые 30–60 секунд.

Причина. Handler обрабатывает одно сообщение дольше, чем session.timeout.ms (default 45s у современных клиентов) или дольше max.poll.interval.ms (default 5m). Kafka считает consumer «мёртвым», передаёт его партиции другому члену группы, тот тоже не успевает — цикл повторяется, сообщения не продвигаются.

Фикс. - Тяжёлую синхронную работу разнеси на фоновый worker: handler только сохраняет команду в БД / Redis-очередь, возвращает nil; worker обрабатывает её отдельно. - Либо подними max.poll.interval.ms / session.timeout.ms в конфигурации Kafka-клиента (крайняя мера — маскирует медленный handler, не решает причину). - Проверь, нет ли time.Sleep в handler'е (запах: ../conventions/testing.md — там же применимо к прод-коду).

4. Одна партиция на всех

Симптом. Несмотря на 3 реплики сервиса, работает только одна — остальные стоят. kafka-consumer-groups --describe показывает, что все партиции у одного CONSUMER-ID.

Причина. Топик создан с num.partitions=1. Kafka не может распределить больше consumer'ов, чем партиций.

Фикс. - Повысь num.partitions у топика. Важно: увеличение партиций меняет hashing partition key — сообщения с тем же aggregate-id после операции могут уходить в другую партицию, что ломает per-aggregate ordering (см. ../patterns/outbox.md §Ordering). Делай это осознанно. - Количество партиций задаёт верхнюю границу параллелизма consumer'ов: если нужно 4 реплики — минимум 4 партиции.

5. Двойное использование consumer-group

Симптом. Два разных сервиса (или один сервис и debug-скрипт) случайно используют одну и ту же consumer_group. Оба «съедают» сообщения друг у друга — каждый видит только половину.

Проверка. В kafka-consumer-groups --describe --group <name> видно MEMBERS из разных хостов, которые не должны быть в одной группе.

Фикс. Consumer-group имя — <service>-<handler>. Пример: notification-on-review-created. Никогда не используй обобщённые имена вроде consumer или default. Исправь конфиг, перезапусти сервис.

6. Deduplicator в fail-closed при недоступном Redis

Симптом. Каждое сообщение возвращает error с текстом про Redis. DLQ растёт быстро.

Причина. Сервис настроен как fail-closed по политике дедупликации (см. ../patterns/idempotent-consumer.md §Fail-open vs fail-closed), а Redis недоступен.

Фикс. - Восстанови Redis. - Если сервис не критичен к 100%-дедупликации — переключи на fail-open (дефолт для большинства сервисов). - Сообщения, ушедшие в DLQ за время incident'а, replay'ить по процедуре §Восстановление от DLQ.

7. Broker unreachable

Симптом. Логи показывают kafka: client has run out of available brokers to talk to, dial tcp: i/o timeout.

Проверка.

docker compose exec <service> sh -c "nc -zv <kafka-host> 9092"
kafka-topics.sh --bootstrap-server "$KAFKA_BROKERS" --list

Фикс. Брокер / сеть / DNS. Пока broker недоступен, consumer только логирует ошибки подключения — это штатно, после восстановления он сам подключится. Рестарт pod'а обычно не нужен.

8. DLQ забит, алерт не сработал

Симптом. Визуальный осмотр DLQ через Kafka UI показывает, что там тысячи сообщений, накопившихся за недели, а никто не заметил.

Причина. У <topic>.dlq нет consumer'а и нет алерта.

Фикс. - Заведи alert на rate(messages_poisoned_total{topic="<topic>.dlq"}[5m]) > 0 (как добавить метрику и alert — см. ../how-to/add-metric-and-alert.md). - Разберись, почему сообщения уходят в DLQ (payload, downstream, validation). - Replay'ни пачку, см. §Восстановление от DLQ.

Команды диагностики

# состояние consumer-group
kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \
    --describe --group <name>

# последние 10 сообщений в топике (проверить, что publisher жив)
kafka-console-consumer.sh --bootstrap-server "$KAFKA_BROKERS" \
    --topic <topic> --max-messages 10 --from-beginning

# структура топика
kafka-topics.sh --bootstrap-server "$KAFKA_BROKERS" --describe --topic <topic>

# логи сервиса, только уровень error/panic
docker compose logs -f <service> 2>&1 | grep -E 'level=error|panic|PANIC'

# goroutine-дамп pprof (если сервис exposes /debug/pprof)
go tool pprof -http=:8080 http://<pod>:<port>/debug/pprof/goroutine

# вручную сбросить offset на N назад (крайняя мера, только в dev/staging)
kafka-consumer-groups.sh --bootstrap-server "$KAFKA_BROKERS" \
    --group <name> --topic <topic> --reset-offsets --shift-by -N --execute

Сброс offset в prod — последнее средство. Он либо приведёт к повторной обработке (если сдвиг назад), либо к пропуску сообщений (если вперёд). Делай только после согласования с владельцем сервиса и под запись в incident log.

Порядок действий при лаге

  1. Смотрим kafka_consumer_lag: одна партиция или все? Одна — скорее всего handler на этом ключе падает. Все — глобальная проблема (broker, pod, middleware).
  2. Смотрим messages_processed_total{result="error"}: растёт → handler падает → логи → root cause.
  3. Смотрим rebalance события в логах брокера / клиента: частый rebalance → §3.
  4. Смотрим DLQ-рост: быстро растёт → payload или downstream ломается системно.
  5. Смотрим голые сетевые ошибки → §7, брокер недоступен.
  6. Если все метрики «нулевые» и нет ошибок, но лаг растёт — проверь, что consumer-group живая: kafka-consumer-groups --describe --group <name> → члены должны присутствовать. Если group пустая, а реплики сервиса запущены — значит, сервис даже не подписался (баг в wiring).

Восстановление от DLQ

  1. Инспекция: прочитай сколько-то DLQ-сообщений, убедись, в чём причина (payload malformed, downstream 500, validation).
  2. Фикс кода (если причина — баг) → деплой.
  3. Replay: скрипт cmd/dlq-replay/ (или аналогичный) читает сообщения из DLQ-топика и публикует их обратно в основной топик. Schema сохраняется, Message.UUID сохраняется — Deduplicator корректно обработает повтор.
  4. Проверь, что handler на основном топике справляется с новой партией. Если сразу же после replay'а сообщения снова уходят в DLQ — root cause не устранён.

Replay-утилита — отдельная программа в репозитории сервиса, её публикация идёт напрямую в Kafka (не через outbox): это инфра-задача, не бизнес-операция.

Anti-patterns

  • Автоматически дропать poisoned-сообщения. Если handler возвращает nil вместо error при «не понимаю payload» — сообщение тихо теряется, никто не узнает о баге. Всегда возвращай error, пусть идёт в DLQ.
  • Retry без DLQ. Без PoisonQueue в middleware-стэке retry бесконечный → consumer стоит → лаг растёт → не решается сам по себе.
  • Один consumer-group на несколько сервисов. «Случайно» два сервиса делают вид, что они одна группа — каждый получает случайную половину сообщений. Имя группы должно содержать имя сервиса и имя handler'а.
  • Игнорировать rebalance-логи. Rebalance каждые 30 секунд не «штатен», это симптом медленного handler'а, см. §3.
  • Поднимать session.timeout.ms до минут, чтобы скрыть медленный handler. Это маскировка, handler всё равно блокирует партицию. Выносить тяжёлую работу в фоновый worker.

Связанные разделы