Skip to Content
PatternsIdempotent consumer

Идемпотентный consumer

Deep-dive по идемпотентности: как сделать так, чтобы повторная доставка сообщения не приводила к повторной работе. Reference по событиям и middleware — ../conventions/events. Эта страница — про внутренности дедупликации, Redis-хранилище, выбор окна, поведение при сбоях, idempotency на уровне бизнес-логики.

Содержание

Проблема

Kafka даёт at-least-once семантику. Это значит, что одно и то же сообщение может быть доставлено consumer’у 2+ раз. Источники повторов:

  • Retry на ошибке. Handler вернул error → middleware.Retry переотправил сообщение → при успехе только второго захода, первый побочный эффект уже случился.
  • Rebalance в consumer-group. Kafka перераспределяет партиции между репликами → partial offset commit → следующая реплика перечитывает «хвост».
  • Forwarder переопубликовал. Он записал в Kafka, но не успел пометить строку в outbox как acked (см. outbox) → при рестарте поднял ту же строку снова.
  • Рестарт consumer’а между обработкой и commit’ом offset’а. Handler отработал, offset не закоммитился → следующая инкарнация перечитает.

Итог: handler обязан корректно обрабатывать повторную доставку. Нельзя рассчитывать, что «у нас не будет retry» или «Kafka настроена аккуратно». Это свойство транспорта, не настройки.

Стандарт: Watermill middleware.Deduplicator

В нашем consumer middleware stack (см. ../conventions/events) последним по порядку идёт Deduplicator. Он хранит в Redis множество уже обработанных Message.UUID и при повторной доставке проглатывает сообщение до вызова handler’а — возвращает успех, offset коммитится, downstream работа не происходит.

Ключ дедупликации — Message.UUID (это ULID из envelope, см. ../conventions/events.md §Envelope). Не Event-Type, не Correlation-Id, не aggregate_id. Именно UUID — уникальный для этого конкретного сообщения, генерируется на publisher-сайде один раз.

Хранилище — Redis. Shared между репликами одного сервиса: если consumer-реплика A обработала сообщение и записала ключ, реплика B при повторной доставке увидит его и пропустит. Без Redis (in-memory) это не работает: реплики не видят state друг друга.

TTL по умолчанию — 24 часа. Обоснование — §Окно ниже.

Архитектура

Путь одного сообщения через middleware-стек router’а: Deduplicator — последний узел перед handler’ом. Он делает атомарный SETNX в Redis по Message.UUID: если ключ новый — handler вызывается; если ключ уже существует — сообщение проглатывается как дубль и ACK’ается.

Детали по реализации Repository — §RedisDedupRepository; по порядку middleware — §Setup и ../conventions/events.

Setup

import ( "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" ) func newDeduplicator(redisClient *redis.Client, logger watermill.LoggerAdapter) message.HandlerMiddleware { dedup := middleware.Deduplicator{ KeyFactory: func(msg *message.Message) (string, bool, error) { // false в boolean — "не уникальный ключ, не надо логировать дубль" // true — делать предупреждение про дубли. return msg.UUID, true, nil }, Repository: newRedisDedupRepo(redisClient), Timeout: 24 * time.Hour, } return dedup.Middleware } router.AddMiddleware( middleware.CorrelationID, middleware.Recoverer, poisonQueue(kafkaPub, dlqTopic), retryMiddleware.Middleware, newDeduplicator(redisClient, watermillLogger), // последний )

Порядок — критичен. Deduplicator после Retry, а не до. Иначе retry внутри одного обработчика будет падать на «уже обработано» при первой же неудаче.

RedisDedupRepository

Минимальная реализация:

package eventdedup import ( "context" "errors" "time" "github.com/redis/go-redis/v9" ) type RedisRepo struct { client *redis.Client prefix string } func NewRedisRepo(client *redis.Client, prefix string) *RedisRepo { return &RedisRepo{client: client, prefix: prefix} } // IsDuplicate атомарно пытается установить ключ с TTL. // Возвращает true, если ключ уже существовал (=дубль), // false — если ключ установлен этим вызовом (= первое появление). func (r *RedisRepo) IsDuplicate(ctx context.Context, key string, ttl time.Duration) (bool, error) { full := r.prefix + ":" + key set, err := r.client.SetNX(ctx, full, "1", ttl).Result() if err != nil { return false, err } return !set, nil }

Ключевой примитив — SETNX (Set If Not Exists). Это атомарная операция: если ключ не существовал — устанавливается и возвращает «установили»; если существовал — не трогаем, возвращаем «уже был». Никакой race condition между «прочитать», «решить», «записать» — всё атомарно.

prefix<service>:dedup:<handler>. Пример: notification:dedup:on-review-created. Префикс нужен, чтобы ключи разных handler’ов не перетирались (если один handler пропустил сообщение как дубль, другой handler при ретре мог бы подумать, что и он уже его обрабатывал).

Эта реализация живёт в pkg/eventdedup/ сервис-репо. Пока не вынесено в shared-lib — синхронизация копированием (см. правила в ../conventions/project-layout.md §pkg).

Окно (TTL)

TTL = сколько хранить ключ в Redis. После истечения ключ исчезает, повтор сообщения с тем же UUID будет пропущен через handler как новый.

Выбор окна:

Тип сообщенийTTLОбоснование
Обычные domain events24hСтандарт. Покрывает: retry на ошибке (5 попыток × 30s = 2.5 мин), rebalance (секунды), forwarder-gap (минуты), длительный инцидент Kafka (часы).
Critical events (user.banned, consent-revoke)7dЕсли мы случайно обработаем user.banned дважды — второй раз это безопасный no-op, но нужно гарантировать, что даже после недельной отладочной паузы consumer’а повтор не пойдёт в downstream.
High-volume analytics1h или меньшеЕсли допустим небольшой процент дублей в аналитике и объём сообщений критичен для Redis-памяти. Только для событий, где дубль = небольшая статистическая ошибка, не бизнес-эффект.

Никогда не ставь TTL меньше 10 минут. Retry-окно внутри одного handler’а при 5 попытках с MaxInterval: 30s + jitter может растянуться до 3–5 минут, плюс consumer-rebalance. TTL в 1 минуту поломает retry-путь: второй заход отработает как «новый», не как retry.

TTL дедуп-ключа per use-case

Таблица ниже — справочник для типовых сценариев. Сортировка по возрастанию TTL:

Use caseTTLОбоснование
In-process batch retry (e.g., HTTP inline retry на 3 попытки)5 минутДостаточно для 3 попыток backoff до 60с; риск потери окна низкий.
Kafka consumer обычной pipeline1 часПокрывает rebalance (≤ 5 мин) + repeat delivery после restart broker.
Cross-service webhook (внешний producer неизвестен)24 часаDefault; покрывает длинные сетевые инциденты и ручной replay.
Financial-like операция (once-ever гарантия важна)7 сутокПокрывает длинный инцидент + ручное разбирательство, потом UPSERT в БД гарантирует дальнейшую защиту.
Event replay из архива30 сутокСоответствует окну retention Kafka-топика.

Правило:

  • Никогда не ниже 5 минут — меньшее окно не покрывает retry с backoff’ом даже в простейшей HTTP-цепочке.
  • Никогда не выше retention Kafka-топика — дедуп-ключ бесполезен для сообщений, которые уже не могут прийти из топика.

Более длинное окно всегда допустимо; более короткое — только с обоснованием в коде (комментарий рядом с конфигом Deduplicator’а) и подтверждённой idempotency на БД-уровне (UPSERT по natural key).

Fail-open vs fail-closed

Что делать, если Redis недоступен?

Дефолт — fail-CLOSED

Redis вернул ошибку → IsDuplicate возвращает error → Deduplicator пропускает ошибку наверх → Retry middleware переотправит → если Redis не восстановился после 5 попыток, сообщение уйдёт в DLQ (см. §Fail-open при Redis down про CB поверх этого поведения).

Это безопасный дефолт, потому что:

  • Redis-outage обычно короткий (секунды-минуты), DLQ за это время не переполнится.
  • Большинство handler’ов не имеют bullet-proof DB-level идемпотентности для всех побочных эффектов. Есть ON CONFLICT DO NOTHING в main-path, но side effects (FCM push, SMS, внешние платные вызовы) не защищены.
  • Простой handler без DB-level идемпотентности + fail-open = прямой дубликат данных при любой Redis-паузе.

Fail-OPEN — только с DB-level идемпотентностью

Fail-open допустим только если handler защищён от дубля на уровне Postgres:

  • INSERT ... ON CONFLICT DO NOTHING по natural key (event_id, (aggregate_id, event_type, seq) и т.п.), и
  • side effects гейтятся результатом этого insert’а (RowsAffected=0 → push не отправляем — см. §External side effects).

Без этого fail-open = тихие дубликаты в бизнес-данных и во внешних системах. Включать fail-open на handler’е без DB-level защиты — запрещено; на ревью такой PR заворачивается.

Выбор фиксируется в конфиге сервиса:

repo := eventdedup.NewRedisRepo(redisClient, "notification:dedup:on-review-created") dedup := middleware.Deduplicator{ KeyFactory: func(m *message.Message) (string, bool, error) { return m.UUID, true, nil }, Repository: eventdedup.FailClosed(repo), // дефолт; FailOpen — только с DB-level защитой Timeout: 24 * time.Hour, }

FailOpen/FailClosed — тонкие wrapper’ы, превращающие ошибку Redis либо в «not-a-duplicate, proceed», либо в пробрасывание ошибки наверх. Для сервисов с эскалацией при long outage используется CB-обёртка из §Реализация через circuit breaker, которая автоматически переходит fail-closed → fail-open при серии timeout’ов.

Fail-open при Redis down

Warning. Fail-open без DB-level идемпотентности = дубликаты бизнес-данных. Не включать без ревью lead-инженера backend и явного подтверждения, что handler защищён UPSERT по natural key.

В предыдущем разделе описан семантический выбор fail-open vs fail-closed. Этот раздел — про runtime-поведение, когда Redis реально недоступен, и про правильную защиту через circuit breaker.

Дефолт — fail-CLOSED

Если Redis возвращает ошибку (timeout, connection refused) — handler возвращает error, сообщение не коммитится в Kafka, Retry-middleware переотправит его, в худшем случае — в DLQ. Это безопасно при коротких сбоях (Redis перезагружается, network glitch).

Fail-closed — правильный дефолт, потому что:

  • Большинство консьюмеров не имеют DB-level идемпотентности «bullet- proof» уровня — есть ON CONFLICT DO NOTHING в main-path, но нет защиты для side effects (FCM push, внешние вызовы).
  • Redis outage обычно короткий (секунды-минуты), DLQ за эти минуты не переполнится.

Когда переходить на fail-OPEN

Fail-open оправдан, когда:

  • Long Redis outage (> 5 минут) — lag consumer’ов начинает расти, DLQ копится, downstream пропускает свежие события.
  • Business decision «лучше продублировать, чем потерять свежесть» — например, для notifications-fanout, где лучше дважды показать уведомление, чем не показать вовсе.
  • Handler имеет DB-level идемпотентность через natural key (INSERT ... ON CONFLICT DO NOTHING по (aggregate_id, event_type, seq) или UPSERT с business-ключом). Тогда дубль на consumer-side не приводит к дубликату данных.

Без DB-level идемпотентности fail-open запрещён. Fail-closed и разбор инцидента вручную.

Реализация через circuit breaker

Автоматический переход fail-closed → fail-open — circuit breaker вокруг Redis SETNX:

  • 5 подряд timeout’ов → CB open. Cooldown 30s.
  • В HalfOpen — один probe-запрос; успех → closed, провал → open ещё на 30s.
  • При open CB:
    • Consumer продолжает обработку без дедупликации.
    • Каждое обработанное сообщение логируется с dedup=bypassed на WARN-уровне.
    • Счётчик consumer_dedup_bypassed_total{topic, consumer_group} инкрементится; alert выставлен на > 0 (любое срабатывание — повод посмотреть).
    • Бизнес-handler обязан иметь DB-level идемпотентность (UPSERT / ON CONFLICT DO NOTHING по natural key).

Decorator на DedupRepository:

package eventdedup import ( "context" "errors" "log/slog" "time" "github.com/prometheus/client_golang/prometheus" "github.com/sony/gobreaker/v2" ) var ( dedupBypassed = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "consumer_dedup_bypassed_total", Help: "Сообщений, обработанных без дедупа из-за Redis CB open.", }, []string{"topic", "consumer_group"}, ) ) type FailOpenMiddleware struct { inner DedupRepository breaker *gobreaker.CircuitBreaker[bool] logger *slog.Logger topic string group string } func NewFailOpenMiddleware(inner DedupRepository, topic, group string, logger *slog.Logger) *FailOpenMiddleware { cb := gobreaker.NewCircuitBreaker[bool](gobreaker.Settings{ Name: "redis-dedup-" + group, MaxRequests: 1, Interval: 60 * time.Second, Timeout: 30 * time.Second, ReadyToTrip: func(counts gobreaker.Counts) bool { return counts.ConsecutiveFailures >= 5 }, OnStateChange: func(name string, from, to gobreaker.State) { logger.Warn("dedup circuit breaker state change", "name", name, "from", from.String(), "to", to.String()) }, }) return &FailOpenMiddleware{inner: inner, breaker: cb, logger: logger, topic: topic, group: group} } func (m *FailOpenMiddleware) IsDuplicate(ctx context.Context, key string, ttl time.Duration) (bool, error) { dup, err := m.breaker.Execute(func() (bool, error) { return m.inner.IsDuplicate(ctx, key, ttl) }) if err != nil { if errors.Is(err, gobreaker.ErrOpenState) { // CB open: fail-open, пропускаем без дедупа dedupBypassed.WithLabelValues(m.topic, m.group).Inc() m.logger.WarnContext(ctx, "dedup bypassed: circuit breaker open", "dedup", "bypassed", "key", key, "topic", m.topic) return false, nil // "не дубль" — пропускаем в handler } // Ошибка Redis, но CB ещё не открыт — пропускаем наверх (fail-closed) return false, err } return dup, nil }

Подключение в setup:

redisRepo := eventdedup.NewRedisRepo(redisClient, "notification:dedup:on-review-created") failOpen := eventdedup.NewFailOpenMiddleware(redisRepo, "kazmaps.review.review", "notification-on-review-created", logger) dedup := middleware.Deduplicator{ KeyFactory: func(m *message.Message) (string, bool, error) { return m.UUID, true, nil }, Repository: failOpen, // носит в себе CB Timeout: 24 * time.Hour, }

Alert:

  • consumer_dedup_bypassed_total > 0 — page. Любой bypass = возможные дубли в downstream; нужно фиксить Redis и проверить, что DB-level идемпотентность реально сработала (нет ли дубликатов в БД, FCM- логах).

Альтернативы и почему не используем

Таблица dedup_records per сервис

Пример антипаттерна: в notification/internal/service/dedup.go сейчас сделано «Redis SETNX, при ошибке fallback в Postgres-таблицу dedup_records». Мотивация понятна: Redis умрёт → не потеряем идемпотентность. Но:

  • Таблица dedup_records — ещё один источник владения, своя миграция, свой cleanup, свой retention.
  • Проверка через Postgres на каждый event — latency в разы выше, чем Redis SETNX (БД-connection, транзакция, FS-write). На фоне нагрузки на ту же БД от handler’а это двойная нагрузка.
  • Ключ дедупликации в этой реализации собирается из бизнес-полей (user_id + event_type + ref_id), не из Message.UUID. Это ненадёжно: см. §Anti-patterns про dedup по business-key.
  • Каждый сервис реализует свой таблично-fallback механизм, несовместимо между сервисами, не переиспользуется.

Правильный путь — fail-open Redis через стандартный Deduplicator. Redis-outage на минуты — приемлемый риск; на часы — серьёзный инцидент, и тогда спасать ты будешь не дедупликацией, а стабильностью Redis.

Idempotency-key от клиента на API-level

Клиент шлёт Idempotency-Key в HTTP-header, сервер проверяет — если уже видел, возвращает сохранённый ответ. Это другой паттерн: он защищает от client-side retries на HTTP-уровне. К consumer-side дедупликации прямого отношения не имеет. Нужно отдельно, для публичных write endpoint’ов (не здесь).

Natural idempotency через UPSERT

Если бизнес-операция handler’а — INSERT ... ON CONFLICT DO NOTHING или UPDATE ... WHERE status != 'done', повторный вызов безопасен сам по себе. В таких случаях строго говоря Deduplicator не нужен.

Но: downstream effects могут быть не в БД. Push в FCM не идемпотентен: каждый вызов = новое уведомление на телефоне пользователя. Если handler и пишет в БД, и шлёт push, UPSERT защитит только БД, push уйдёт дважды.

Вывод: Deduplicator — defense in depth. Используем всегда, даже при идемпотентной бизнес-логике. Лишний SETNX в Redis — не стоящая экономии оптимизация.

Idempotency на уровне бизнес-логики

Deduplicator — не замена корректного дизайна handler’а. Даже с Deduplicator’ом handler должен быть идемпотентен by design, по двум причинам:

  1. Redis-outage под fail-open → Deduplicator пропускает сообщение, handler должен не сломаться.
  2. TTL истёк → после окна дубль может прийти снова, handler должен корректно это пережить.

Приёмы:

INSERT ... ON CONFLICT DO NOTHING

const q = ` INSERT INTO sent_notifications (event_id, user_id, sent_at) VALUES ($1, $2, NOW()) ON CONFLICT (event_id) DO NOTHING` tag, err := tx.Exec(ctx, q, eventID, userID)

Если строка уже была (дубль), DO NOTHING — повторный insert бесшумно не происходит. tag.RowsAffected() покажет 0 — можно решить, отправлять ли push. Обычно: если RowsAffected=0, значит, событие уже обрабатывалось — push пропускаем.

Проверка status перед update

const q = ` UPDATE reviews SET status = 'published', published_at = NOW() WHERE id = $1 AND status = 'pending'`

Повторный UPDATE с тем же id не перезапишет уже опубликованный review (условие status = 'pending' не выполнится). Без этой проверки повторный update мог бы подтереть published_at более свежим значением.

External side effects — check-then-act

Для внешних вызовов (FCM push, SMS-gateway) — отмечаем факт отправки до вызова и проверяем перед повтором:

err := s.db.InTx(ctx, func(tx pgx.Tx) error { res, err := tx.Exec(ctx, `INSERT INTO push_sent (event_id, user_id, sent_at) VALUES ($1, $2, NOW()) ON CONFLICT (event_id) DO NOTHING`, eventID, userID) if err != nil { return err } if res.RowsAffected() == 0 { // уже отправляли — не делаем повторный FCM-вызов return nil } // отметили факт, теперь безопасно слать push return s.fcm.Send(ctx, userID, payload) })

Тонкость: если fcm.Send упадёт после того, как insert’нулась запись push_sent, transaction откатится и мы не отправим push. Это acceptable trade-off: лучше не отправить при ошибке FCM, чем отправить несколько раз. Retry на следующем заходе увидит пустую push_sent, попробует снова.

Правило: handler — идемпотентен

На ревью новый handler должен ответить на вопрос: «что будет, если меня вызвать с тем же сообщением дважды подряд?» Ответ: «ничего нового не произойдёт». Если ответ — «дважды отправим письмо / дважды спишем / дважды обновим счётчик» — handler надо переделать.

Correlation-Id vs Event-Id

Разница часто путается. Обе — идентификаторы в envelope, но разные по смыслу.

  • Event-Id (Message.UUID в Watermill) — уникальный для каждого сообщения. Генерируется один раз на публикацию, как ULID. Один user.registered для пользователя 42 = один Event-Id. Дубль этого же сообщения на consumer-сайде придёт с тем же Event-Id — поэтому Deduplicator дедуплицирует именно по нему.
  • Correlation-Id (Metadata["Correlation-Id"]) — общий для всей логической цепочки. Один HTTP-запрос → 3 события → 2 downstream события в других сервисах → все 5 несут один Correlation-Id. Не уникален per message.

Не дедуплицируй по Correlation-Id. Это приведёт к тому, что второе и третье события одной цепочки будут выглядеть как дубли первого — и будут молча проглочены.

Детали в ../glossary.

Monitoring

Метрики

messages_deduplicated_total{service, topic, handler} — counter, сколько сообщений Deduplicator отверг как дубли. messages_processed_total{service, topic, handler, result} — counter, общее количество; result ∈ {success, error, deduplicated}. deduplication_check_duration_seconds{service} — histogram, latency Redis SETNX. deduplication_errors_total{service, reason} — counter, reason ∈ {redis_timeout, redis_conn, marshal}.

Middleware, выставляющий эти метрики, — в pkg/eventmw/ сервис-репо. Подключается к Deduplicator.Repository wrapper’ом.

Alerting

  • Рост дублей. Alert: rate(messages_deduplicated_total[5m]) / rate(messages_processed_total[5m]) > 0.1 в течение 10 минут → ticket. Нормальный фон — меньше 1% (редкий retry). 10%+ = upstream publish дублирует систематически (баг в forwarder’е, баг в publisher’е).
  • Redis-outage. Alert: rate(deduplication_errors_total[5m]) > 0 → ticket. Под fail-open не ломает сервис, но надо разобраться.
  • Латенси Redis. Alert: histogram_quantile(0.99, deduplication_check_duration_seconds) > 0.05 (50ms на p99) → ticket. Redis должен отвечать быстро; медленный Redis тормозит все handler’ы.

Testing

Unit

Подставь fake DedupRepo, проверь, что второй вызов handler’а с тем же UUID возвращает true (дубль).

type fakeRepo struct { seen map[string]bool } func (r *fakeRepo) IsDuplicate(ctx context.Context, key string, ttl time.Duration) (bool, error) { if r.seen[key] { return true, nil } r.seen[key] = true return false, nil } func TestDeduplicator_SecondCallIsDuplicate(t *testing.T) { repo := &fakeRepo{seen: map[string]bool{}} dup, _ := repo.IsDuplicate(ctx, "evt-1", time.Hour) if dup { t.Fatal("first call must not be duplicate") } dup, _ = repo.IsDuplicate(ctx, "evt-1", time.Hour) if !dup { t.Fatal("second call must be duplicate") } }

Integration через gochannel

Проверяем интеграцию handler’а + Deduplicator’а + router’а в in-memory режиме:

func TestRouter_DuplicatesIgnored(t *testing.T) { pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil) defer pubsub.Close() fakeSvc := &fakeNotificationService{} repo := &fakeRepo{seen: map[string]bool{}} dedup := middleware.Deduplicator{ KeyFactory: func(m *message.Message) (string, bool, error) { return m.UUID, true, nil }, Repository: repo, Timeout: time.Hour, } router, _ := message.NewRouter(message.RouterConfig{}, nil) router.AddMiddleware(dedup.Middleware) router.AddNoPublisherHandler("t", "topic", pubsub, newHandler(fakeSvc).Handle) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { _ = router.Run(ctx) }() <-router.Running() for i := 0; i < 3; i++ { msg := message.NewMessage("evt-1", []byte(`{"x":1}`)) msg.Metadata.Set("Event-Type", "foo") _ = pubsub.Publish("topic", msg) } eventually(t, 2*time.Second, func() bool { return fakeSvc.handled == 1 }) // Три публикации с одним UUID → handler вызван один раз. }

Integration с реальным Redis

Testcontainers Redis + RedisRepo. Проверь: два SETNX с одним ключом — второй возвращает «уже был». TTL истекает — после истечения можно снова. См. ../conventions/testing про testcontainers.

Anti-patterns

Нет dedup и handler не идемпотентен

Классическая ошибка новичка. Retry на первой же временной ошибке → handler отработал дважды → в БД два ряда, в FCM два push’а. Обнаружится только под продовой нагрузкой. Всегда Deduplicator + идемпотентный handler.

Dedup по business-key вместо Message.UUID

// Плохо key := fmt.Sprintf("user:%d:event:%s:ref:%d", userID, eventType, refID)

Два разных логических события с одним и тем же business-key совпадут. Например, если случилось два review.updated для одного ревью подряд (legitimate — пользователь правил дважды), второе будет молча проглочено. Баг, который очень тяжело поймать.

Единственное исключение: если бизнес требует, чтобы между событиями для одного key было окно дедупликации (например, user. visited.place с окном 1 час — чтобы не спамить). Тогда это не dedup, а rate-limiter по business-key, отдельная функция, не Deduplicator.

Коротое TTL (5 минут)

Не покрывает retry-окно. middleware.Retry с 5 попытками и MaxInterval: 30s + jitter растягивается до 3–5 минут. 5-минутное TTL — на грани, любое залипание (Kafka rebalance 1 мин, Redis latency spike 30s) пробивает окно. Минимум 1 час, по умолчанию — 24 часа.

Local in-process cache вместо Redis

var seenMu sync.Mutex var seen = map[string]bool{} // ПЛОХО

При multi-pod деплое каждая реплика имеет свой seen. Событие пришло на реплику A, обработалось, offset не закоммитился, Kafka сделала rebalance, то же сообщение попало на реплику B — у неё seen пустой, обработает снова. Нужен shared store, Redis или БД.

Pod-local с fallback в БД

Смесь предыдущих двух: local cache + при miss проверка в Postgres. Сложная логика, маленькая польза. Redis один раз настроенный и общий — проще и надёжнее.

Dedup before Retry

// Плохо: dedup перед retry router.AddMiddleware( dedup.Middleware, // первый retry.Middleware, // второй )

Retry внутри одного handler-вызова генерирует «вторую попытку» того же сообщения (тот же UUID) → dedup считает её дублем → retry не срабатывает. Dedup всегда после Retry в цепочке middleware.

Deduplicator подключили, handler не идемпотентен

Работает, пока Redis доступен. В день, когда Redis ляжет на 10 минут, fail-open пропустит несколько сообщений через неидемпотентный handler → дубликаты. Deduplicator — первая линия, идемпотентный handler — вторая.

FAQ

«Можно ли добиться exactly-once?» В распределённой системе с сетевыми сбоями — нет, реально невозможно. Best achievable — at-least- once + idempotency на consumer-сайде. Это и даёт «effectively once» для наблюдателя.

«Почему не использовать kafka.transactional.id для exactly-once?» Kafka transactional producer даёт exactly-once только в цепочке Kafka→Kafka (например, stream processor). Наш кейс — outbox (Postgres → Kafka → Postgres) — за границы transactional-семантики Kafka, не покрывается. Нужна своя idempotency.

«Zero duplicates возможен?» Нет. Процент дублей можно снизить до долей процента, но не до нуля. Замеряй messages_deduplicated_total / messages_processed_total и ставь alert на аномалии.

«Можно ли использовать consumer offset как idempotency ключ?» Нет. Offset — позиция в партиции, не уникальный id сообщения. Rebalance изменяет соответствие offset → сообщение.

«А если Redis полностью недоступен неделю?» При fail-open сервис работает, обработки дублей могут быть. Основной план — fix Redis. Нет смысла строить dedup-fallback в Postgres, который работает неделю — это отдельная задача с отдельной сложностью, и её redis-outage на неделю ты всё равно заметишь по остальным симптомам.

«Что делать, если handler частично успешен — БД обновлена, push не отправлен?» Это отдельная задача про distributed side effects. Решается через outbox+retry для каждого побочного эффекта отдельно (push-queue как свой outbox). Здесь это выходит за рамки темы; но сама консистентность handler’а внутри одной БД-транзакции — обеспечивается InTx.

Связанные разделы

  • outbox — at-least-once на publisher-сайде; дубли, которые дойдут до consumer’а, именно здесь рождаются.
  • cqrs — CQRS handler’ы тоже нуждаются в Deduplicator’е.
  • ../conventions/events — полный middleware stack, порядок, envelope.
  • ../how-to/add-kafka-event — чеклист включает idempotency-стратегию.
  • ../troubleshooting/kafka-consumer-stuck — что делать, если consumer завис и лаг растёт.
  • ../glossary — Deduplicator, Event-Id, Correlation-Id, Idempotency.
Last updated on