Идемпотентный consumer¶
Deep-dive по идемпотентности: как сделать так, чтобы повторная доставка
сообщения не приводила к повторной работе. Reference по событиям и
middleware — ../conventions/events.md. Эта
страница — про внутренности дедупликации, Redis-хранилище, выбор окна,
поведение при сбоях, idempotency на уровне бизнес-логики.
Содержание¶
- Проблема
- Стандарт: Watermill
middleware.Deduplicator - Архитектура
- Setup
RedisDedupRepository- Окно (TTL)
- Fail-open vs fail-closed
- Альтернативы и почему не используем
- Idempotency на уровне бизнес-логики
- Correlation-Id vs Event-Id
- Monitoring
- Testing
- Anti-patterns
- FAQ
- Связанные разделы
Проблема¶
Kafka даёт at-least-once семантику. Это значит, что одно и то же сообщение может быть доставлено consumer'у 2+ раз. Источники повторов:
- Retry на ошибке. Handler вернул error →
middleware.Retryпереотправил сообщение → при успехе только второго захода, первый побочный эффект уже случился. - Rebalance в consumer-group. Kafka перераспределяет партиции между репликами → partial offset commit → следующая реплика перечитывает «хвост».
- Forwarder переопубликовал. Он записал в Kafka, но не успел
пометить строку в outbox как acked (см.
outbox.md) → при рестарте поднял ту же строку снова. - Рестарт consumer'а между обработкой и commit'ом offset'а. Handler отработал, offset не закоммитился → следующая инкарнация перечитает.
Итог: handler обязан корректно обрабатывать повторную доставку. Нельзя рассчитывать, что «у нас не будет retry» или «Kafka настроена аккуратно». Это свойство транспорта, не настройки.
Стандарт: Watermill middleware.Deduplicator¶
В нашем consumer middleware stack (см.
../conventions/events.md) последним по
порядку идёт Deduplicator. Он хранит в Redis множество уже
обработанных Message.UUID и при повторной доставке проглатывает
сообщение до вызова handler'а — возвращает успех, offset коммитится,
downstream работа не происходит.
Ключ дедупликации — Message.UUID (это ULID из envelope, см.
../conventions/events.md §Envelope). Не
Event-Type, не Correlation-Id, не aggregate_id. Именно UUID —
уникальный для этого конкретного сообщения, генерируется на
publisher-сайде один раз.
Хранилище — Redis. Shared между репликами одного сервиса: если consumer-реплика A обработала сообщение и записала ключ, реплика B при повторной доставке увидит его и пропустит. Без Redis (in-memory) это не работает: реплики не видят state друг друга.
TTL по умолчанию — 24 часа. Обоснование — §Окно ниже.
Архитектура¶
Путь одного сообщения через middleware-стек router'а: Deduplicator —
последний узел перед handler'ом. Он делает атомарный SETNX в Redis
по Message.UUID: если ключ новый — handler вызывается; если ключ
уже существует — сообщение проглатывается как дубль и ACK'ается.
flowchart TD
K[Kafka message]
R[Router middleware stack]
D{Deduplicator}
Redis[(Redis SETNX<br/>dedup:uuid)]
H[Handler]
A[ACK]
NA[NACK → Retry/DLQ]
K --> R
R --> D
D -->|Check UUID| Redis
Redis -->|новый| D
D -->|первый раз| H
Redis -->|duplicate| D
D -->|уже обработан| A
H -->|success| A
H -->|error| NA
Детали по реализации Repository — §RedisDedupRepository; по порядку
middleware — §Setup и ../conventions/events.md.
Setup¶
import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
)
func newDeduplicator(redisClient *redis.Client, logger watermill.LoggerAdapter) message.HandlerMiddleware {
dedup := middleware.Deduplicator{
KeyFactory: func(msg *message.Message) (string, bool, error) {
// false в boolean — "не уникальный ключ, не надо логировать дубль"
// true — делать предупреждение про дубли.
return msg.UUID, true, nil
},
Repository: newRedisDedupRepo(redisClient),
Timeout: 24 * time.Hour,
}
return dedup.Middleware
}
router.AddMiddleware(
middleware.CorrelationID,
middleware.Recoverer,
poisonQueue(kafkaPub, dlqTopic),
retryMiddleware.Middleware,
newDeduplicator(redisClient, watermillLogger), // последний
)
Порядок — критичен. Deduplicator после Retry, а не до. Иначе retry внутри одного обработчика будет падать на «уже обработано» при первой же неудаче.
RedisDedupRepository¶
Минимальная реализация:
package eventdedup
import (
"context"
"errors"
"time"
"github.com/redis/go-redis/v9"
)
type RedisRepo struct {
client *redis.Client
prefix string
}
func NewRedisRepo(client *redis.Client, prefix string) *RedisRepo {
return &RedisRepo{client: client, prefix: prefix}
}
// IsDuplicate атомарно пытается установить ключ с TTL.
// Возвращает true, если ключ уже существовал (=дубль),
// false — если ключ установлен этим вызовом (= первое появление).
func (r *RedisRepo) IsDuplicate(ctx context.Context, key string, ttl time.Duration) (bool, error) {
full := r.prefix + ":" + key
set, err := r.client.SetNX(ctx, full, "1", ttl).Result()
if err != nil {
return false, err
}
return !set, nil
}
Ключевой примитив — SETNX (Set If Not Exists). Это атомарная
операция: если ключ не существовал — устанавливается и возвращает
«установили»; если существовал — не трогаем, возвращаем «уже был».
Никакой race condition между «прочитать», «решить», «записать» — всё
атомарно.
prefix — <service>:dedup:<handler>. Пример:
notification:dedup:on-review-created. Префикс нужен, чтобы ключи
разных handler'ов не перетирались (если один handler пропустил
сообщение как дубль, другой handler при ретре мог бы подумать, что и
он уже его обрабатывал).
Эта реализация живёт в pkg/eventdedup/ сервис-репо. Пока не вынесено
в shared-lib — синхронизация копированием (см. правила в
../conventions/db-pgx.md §pkg).
Окно (TTL)¶
TTL = сколько хранить ключ в Redis. После истечения ключ исчезает, повтор сообщения с тем же UUID будет пропущен через handler как новый.
Выбор окна:
| Тип сообщений | TTL | Обоснование |
|---|---|---|
| Обычные domain events | 24h | Стандарт. Покрывает: retry на ошибке (5 попыток × 30s = 2.5 мин), rebalance (секунды), forwarder-gap (минуты), длительный инцидент Kafka (часы). |
Critical events (user.banned, consent-revoke) |
7d | Если мы случайно обработаем user.banned дважды — второй раз это безопасный no-op, но нужно гарантировать, что даже после недельной отладочной паузы consumer'а повтор не пойдёт в downstream. |
| High-volume analytics | 1h или меньше | Если допустим небольшой процент дублей в аналитике и объём сообщений критичен для Redis-памяти. Только для событий, где дубль = небольшая статистическая ошибка, не бизнес-эффект. |
Никогда не ставь TTL меньше 10 минут. Retry-окно внутри одного
handler'а при 5 попытках с MaxInterval: 30s + jitter может
растянуться до 3–5 минут, плюс consumer-rebalance. TTL в 1 минуту
поломает retry-путь: второй заход отработает как «новый», не как
retry.
Fail-open vs fail-closed¶
Что делать, если Redis недоступен?
Fail-open (default)¶
Redis вернул ошибку → Deduplicator пропускает сообщение через handler без проверки. Преимущество — сервис продолжает работать. Риск — под retry может случиться дубль, если handler не идемпотентен на БД-уровне.
Это выбор по умолчанию для большинства сервисов. Availability важнее, чем 100% гарантия отсутствия дублей на секундах Redis-outage.
Fail-closed¶
Redis вернул ошибку → IsDuplicate возвращает error → Deduplicator
пропускает ошибку дальше → Retry middleware переотправит → если Redis
не восстановился, сообщение через 5 попыток уйдёт в DLQ.
Выбирается для сервисов, где дубль = серьёзный бизнес-эффект: списание денег, отправка платного push'а внешнему провайдеру за $0.01 каждый раз, операции, которые нельзя откатить. У нас таких сервисов пока нет; появится — его handler явно отмечается как fail-closed, и это конфигурируется per-handler.
Выбор фиксируется в конфиге сервиса:
repo := eventdedup.NewRedisRepo(redisClient, "notification:dedup:on-review-created")
dedup := middleware.Deduplicator{
KeyFactory: func(m *message.Message) (string, bool, error) { return m.UUID, true, nil },
Repository: eventdedup.FailOpen(repo), // или FailClosed(repo)
Timeout: 24 * time.Hour,
}
FailOpen/FailClosed — тонкие wrapper'ы, превращающие ошибку Redis
либо в «not-a-duplicate, proceed», либо в пробрасывание ошибки
наверх.
Альтернативы и почему не используем¶
Таблица dedup_records per сервис¶
Пример антипаттерна: в notification/internal/service/dedup.go сейчас
сделано «Redis SETNX, при ошибке fallback в Postgres-таблицу
dedup_records». Мотивация понятна: Redis умрёт → не потеряем
идемпотентность. Но:
- Таблица
dedup_records— ещё один источник владения, своя миграция, свой cleanup, свой retention. - Проверка через Postgres на каждый event — latency в разы выше, чем Redis SETNX (БД-connection, транзакция, FS-write). На фоне нагрузки на ту же БД от handler'а это двойная нагрузка.
- Ключ дедупликации в этой реализации собирается из бизнес-полей
(
user_id + event_type + ref_id), не изMessage.UUID. Это ненадёжно: см. §Anti-patterns про dedup по business-key. - Каждый сервис реализует свой таблично-fallback механизм, несовместимо между сервисами, не переиспользуется.
Правильный путь — fail-open Redis через стандартный Deduplicator. Redis-outage на минуты — приемлемый риск; на часы — серьёзный инцидент, и тогда спасать ты будешь не дедупликацией, а стабильностью Redis.
Idempotency-key от клиента на API-level¶
Клиент шлёт Idempotency-Key в HTTP-header, сервер проверяет — если
уже видел, возвращает сохранённый ответ. Это другой паттерн: он
защищает от client-side retries на HTTP-уровне. К consumer-side
дедупликации прямого отношения не имеет. Нужно отдельно, для публичных
write endpoint'ов (не здесь).
Natural idempotency через UPSERT¶
Если бизнес-операция handler'а — INSERT ... ON CONFLICT DO NOTHING
или UPDATE ... WHERE status != 'done', повторный вызов безопасен
сам по себе. В таких случаях строго говоря Deduplicator не нужен.
Но: downstream effects могут быть не в БД. Push в FCM не идемпотентен: каждый вызов = новое уведомление на телефоне пользователя. Если handler и пишет в БД, и шлёт push, UPSERT защитит только БД, push уйдёт дважды.
Вывод: Deduplicator — defense in depth. Используем всегда, даже
при идемпотентной бизнес-логике. Лишний SETNX в Redis — не стоящая
экономии оптимизация.
Idempotency на уровне бизнес-логики¶
Deduplicator — не замена корректного дизайна handler'а. Даже с Deduplicator'ом handler должен быть идемпотентен by design, по двум причинам:
- Redis-outage под fail-open → Deduplicator пропускает сообщение, handler должен не сломаться.
- TTL истёк → после окна дубль может прийти снова, handler должен корректно это пережить.
Приёмы:
INSERT ... ON CONFLICT DO NOTHING¶
const q = `
INSERT INTO notification.sent_notifications (event_id, user_id, sent_at)
VALUES ($1, $2, NOW())
ON CONFLICT (event_id) DO NOTHING`
tag, err := tx.Exec(ctx, q, eventID, userID)
Если строка уже была (дубль), DO NOTHING — повторный insert
бесшумно не происходит. tag.RowsAffected() покажет 0 — можно
решить, отправлять ли push. Обычно: если RowsAffected=0, значит,
событие уже обрабатывалось — push пропускаем.
Проверка status перед update¶
const q = `
UPDATE review.reviews
SET status = 'published', published_at = NOW()
WHERE id = $1 AND status = 'pending'`
Повторный UPDATE с тем же id не перезапишет уже опубликованный
review (условие status = 'pending' не выполнится). Без этой проверки
повторный update мог бы подтереть published_at более свежим значением.
External side effects — check-then-act¶
Для внешних вызовов (FCM push, SMS-gateway) — отмечаем факт отправки до вызова и проверяем перед повтором:
err := s.db.InTx(ctx, func(tx pgx.Tx) error {
res, err := tx.Exec(ctx,
`INSERT INTO notification.push_sent (event_id, user_id, sent_at)
VALUES ($1, $2, NOW())
ON CONFLICT (event_id) DO NOTHING`,
eventID, userID)
if err != nil {
return err
}
if res.RowsAffected() == 0 {
// уже отправляли — не делаем повторный FCM-вызов
return nil
}
// отметили факт, теперь безопасно слать push
return s.fcm.Send(ctx, userID, payload)
})
Тонкость: если fcm.Send упадёт после того, как insert'нулась запись
push_sent, transaction откатится и мы не отправим push. Это
acceptable trade-off: лучше не отправить при ошибке FCM, чем отправить
несколько раз. Retry на следующем заходе увидит пустую push_sent,
попробует снова.
Правило: handler — идемпотентен¶
На ревью новый handler должен ответить на вопрос: «что будет, если меня вызвать с тем же сообщением дважды подряд?» Ответ: «ничего нового не произойдёт». Если ответ — «дважды отправим письмо / дважды спишем / дважды обновим счётчик» — handler надо переделать.
Correlation-Id vs Event-Id¶
Разница часто путается. Обе — идентификаторы в envelope, но разные по смыслу.
- Event-Id (
Message.UUIDв Watermill) — уникальный для каждого сообщения. Генерируется один раз на публикацию, как ULID. Одинuser.registeredдля пользователя 42 = один Event-Id. Дубль этого же сообщения на consumer-сайде придёт с тем же Event-Id — поэтому Deduplicator дедуплицирует именно по нему. - Correlation-Id (
Metadata["Correlation-Id"]) — общий для всей логической цепочки. Один HTTP-запрос → 3 события → 2 downstream события в других сервисах → все 5 несут один Correlation-Id. Не уникален per message.
Не дедуплицируй по Correlation-Id. Это приведёт к тому, что второе и третье события одной цепочки будут выглядеть как дубли первого — и будут молча проглочены.
Детали в ../glossary.md.
Monitoring¶
Метрики¶
messages_deduplicated_total{service, topic, handler}
— counter, сколько сообщений Deduplicator отверг как дубли.
messages_processed_total{service, topic, handler, result}
— counter, общее количество; result ∈ {success, error, deduplicated}.
deduplication_check_duration_seconds{service}
— histogram, latency Redis SETNX.
deduplication_errors_total{service, reason}
— counter, reason ∈ {redis_timeout, redis_conn, marshal}.
Middleware, выставляющий эти метрики, — в pkg/eventmw/ сервис-репо.
Подключается к Deduplicator.Repository wrapper'ом.
Alerting¶
- Рост дублей. Alert:
rate(messages_deduplicated_total[5m]) / rate(messages_processed_total[5m]) > 0.1в течение 10 минут → ticket. Нормальный фон — меньше 1% (редкий retry). 10%+ = upstream publish дублирует систематически (баг в forwarder'е, баг в publisher'е). - Redis-outage. Alert:
rate(deduplication_errors_total[5m]) > 0→ ticket. Под fail-open не ломает сервис, но надо разобраться. - Латенси Redis. Alert:
histogram_quantile(0.99, deduplication_check_duration_seconds) > 0.05(50ms на p99) → ticket. Redis должен отвечать быстро; медленный Redis тормозит все handler'ы.
Testing¶
Unit¶
Подставь fake DedupRepo, проверь, что второй вызов handler'а с тем же
UUID возвращает true (дубль).
type fakeRepo struct {
seen map[string]bool
}
func (r *fakeRepo) IsDuplicate(ctx context.Context, key string, ttl time.Duration) (bool, error) {
if r.seen[key] {
return true, nil
}
r.seen[key] = true
return false, nil
}
func TestDeduplicator_SecondCallIsDuplicate(t *testing.T) {
repo := &fakeRepo{seen: map[string]bool{}}
dup, _ := repo.IsDuplicate(ctx, "evt-1", time.Hour)
if dup {
t.Fatal("first call must not be duplicate")
}
dup, _ = repo.IsDuplicate(ctx, "evt-1", time.Hour)
if !dup {
t.Fatal("second call must be duplicate")
}
}
Integration через gochannel¶
Проверяем интеграцию handler'а + Deduplicator'а + router'а в in-memory режиме:
func TestRouter_DuplicatesIgnored(t *testing.T) {
pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil)
defer pubsub.Close()
fakeSvc := &fakeNotificationService{}
repo := &fakeRepo{seen: map[string]bool{}}
dedup := middleware.Deduplicator{
KeyFactory: func(m *message.Message) (string, bool, error) { return m.UUID, true, nil },
Repository: repo,
Timeout: time.Hour,
}
router, _ := message.NewRouter(message.RouterConfig{}, nil)
router.AddMiddleware(dedup.Middleware)
router.AddNoPublisherHandler("t", "topic", pubsub, newHandler(fakeSvc).Handle)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { _ = router.Run(ctx) }()
<-router.Running()
for i := 0; i < 3; i++ {
msg := message.NewMessage("evt-1", []byte(`{"x":1}`))
msg.Metadata.Set("Event-Type", "foo")
_ = pubsub.Publish("topic", msg)
}
eventually(t, 2*time.Second, func() bool { return fakeSvc.handled == 1 })
// Три публикации с одним UUID → handler вызван один раз.
}
Integration с реальным Redis¶
Testcontainers Redis + RedisRepo. Проверь: два SETNX с одним ключом
— второй возвращает «уже был». TTL истекает — после истечения можно
снова. См.
../conventions/testing.md про
testcontainers.
Anti-patterns¶
Нет dedup и handler не идемпотентен¶
Классическая ошибка новичка. Retry на первой же временной ошибке →
handler отработал дважды → в БД два ряда, в FCM два push'а. Обнаружится
только под продовой нагрузкой. Всегда Deduplicator + идемпотентный
handler.
Dedup по business-key вместо Message.UUID¶
Два разных логических события с одним и тем же business-key совпадут.
Например, если случилось два review.updated для одного ревью подряд
(legitimate — пользователь правил дважды), второе будет молча
проглочено. Баг, который очень тяжело поймать.
Единственное исключение: если бизнес требует, чтобы между
событиями для одного key было окно дедупликации (например, user.
visited.place с окном 1 час — чтобы не спамить). Тогда это не dedup,
а rate-limiter по business-key, отдельная функция, не
Deduplicator.
Коротое TTL (5 минут)¶
Не покрывает retry-окно. middleware.Retry с 5 попытками и
MaxInterval: 30s + jitter растягивается до 3–5 минут. 5-минутное TTL
— на грани, любое залипание (Kafka rebalance 1 мин, Redis latency
spike 30s) пробивает окно. Минимум 1 час, по умолчанию — 24 часа.
Local in-process cache вместо Redis¶
При multi-pod деплое каждая реплика имеет свой seen. Событие пришло
на реплику A, обработалось, offset не закоммитился, Kafka сделала
rebalance, то же сообщение попало на реплику B — у неё seen пустой,
обработает снова. Нужен shared store, Redis или БД.
Pod-local с fallback в БД¶
Смесь предыдущих двух: local cache + при miss проверка в Postgres. Сложная логика, маленькая польза. Redis один раз настроенный и общий — проще и надёжнее.
Dedup before Retry¶
// Плохо: dedup перед retry
router.AddMiddleware(
dedup.Middleware, // первый
retry.Middleware, // второй
)
Retry внутри одного handler-вызова генерирует «вторую попытку» того же сообщения (тот же UUID) → dedup считает её дублем → retry не срабатывает. Dedup всегда после Retry в цепочке middleware.
Deduplicator подключили, handler не идемпотентен¶
Работает, пока Redis доступен. В день, когда Redis ляжет на 10 минут, fail-open пропустит несколько сообщений через неидемпотентный handler → дубликаты. Deduplicator — первая линия, идемпотентный handler — вторая.
FAQ¶
«Можно ли добиться exactly-once?» В распределённой системе с сетевыми сбоями — нет, реально невозможно. Best achievable — at-least- once + idempotency на consumer-сайде. Это и даёт «effectively once» для наблюдателя.
«Почему не использовать kafka.transactional.id для exactly-once?»
Kafka transactional producer даёт exactly-once только в цепочке
Kafka→Kafka (например, stream processor). Наш кейс — outbox (Postgres
→ Kafka → Postgres) — за границы transactional-семантики Kafka, не
покрывается. Нужна своя idempotency.
«Zero duplicates возможен?» Нет. Процент дублей можно снизить до
долей процента, но не до нуля. Замеряй messages_deduplicated_total /
messages_processed_total и ставь alert на аномалии.
«Можно ли использовать consumer offset как idempotency ключ?» Нет. Offset — позиция в партиции, не уникальный id сообщения. Rebalance изменяет соответствие offset → сообщение.
«А если Redis полностью недоступен неделю?» При fail-open сервис работает, обработки дублей могут быть. Основной план — fix Redis. Нет смысла строить dedup-fallback в Postgres, который работает неделю — это отдельная задача с отдельной сложностью, и её redis-outage на неделю ты всё равно заметишь по остальным симптомам.
«Что делать, если handler частично успешен — БД обновлена, push не
отправлен?» Это отдельная задача про distributed side effects.
Решается через outbox+retry для каждого побочного эффекта отдельно
(push-queue как свой outbox). Здесь это выходит за рамки темы; но
сама консистентность handler'а внутри одной БД-транзакции —
обеспечивается InTx.
Связанные разделы¶
outbox.md— at-least-once на publisher-сайде; дубли, которые дойдут до consumer'а, именно здесь рождаются.cqrs.md— CQRS handler'ы тоже нуждаются в Deduplicator'е.../conventions/events.md— полный middleware stack, порядок, envelope.../how-to/add-kafka-event.md— чеклист включает idempotency-стратегию.../troubleshooting/kafka-consumer-stuck.md— что делать, если consumer завис и лаг растёт.../glossary.md— Deduplicator, Event-Id, Correlation-Id, Idempotency.