Идемпотентный consumer
Deep-dive по идемпотентности: как сделать так, чтобы повторная доставка
сообщения не приводила к повторной работе. Reference по событиям и
middleware — ../conventions/events. Эта
страница — про внутренности дедупликации, Redis-хранилище, выбор окна,
поведение при сбоях, idempotency на уровне бизнес-логики.
Содержание
- Проблема
- Стандарт: Watermill
middleware.Deduplicator - Архитектура
- Setup
RedisDedupRepository- Окно (TTL)
- TTL дедуп-ключа per use-case
- Fail-open vs fail-closed
- Fail-open при Redis down
- Альтернативы и почему не используем
- Idempotency на уровне бизнес-логики
- Correlation-Id vs Event-Id
- Monitoring
- Testing
- Anti-patterns
- FAQ
- Связанные разделы
Проблема
Kafka даёт at-least-once семантику. Это значит, что одно и то же сообщение может быть доставлено consumer’у 2+ раз. Источники повторов:
- Retry на ошибке. Handler вернул error →
middleware.Retryпереотправил сообщение → при успехе только второго захода, первый побочный эффект уже случился. - Rebalance в consumer-group. Kafka перераспределяет партиции между репликами → partial offset commit → следующая реплика перечитывает «хвост».
- Forwarder переопубликовал. Он записал в Kafka, но не успел
пометить строку в outbox как acked (см.
outbox) → при рестарте поднял ту же строку снова. - Рестарт consumer’а между обработкой и commit’ом offset’а. Handler отработал, offset не закоммитился → следующая инкарнация перечитает.
Итог: handler обязан корректно обрабатывать повторную доставку. Нельзя рассчитывать, что «у нас не будет retry» или «Kafka настроена аккуратно». Это свойство транспорта, не настройки.
Стандарт: Watermill middleware.Deduplicator
В нашем consumer middleware stack (см.
../conventions/events) последним по
порядку идёт Deduplicator. Он хранит в Redis множество уже
обработанных Message.UUID и при повторной доставке проглатывает
сообщение до вызова handler’а — возвращает успех, offset коммитится,
downstream работа не происходит.
Ключ дедупликации — Message.UUID (это ULID из envelope, см.
../conventions/events.md §Envelope). Не
Event-Type, не Correlation-Id, не aggregate_id. Именно UUID —
уникальный для этого конкретного сообщения, генерируется на
publisher-сайде один раз.
Хранилище — Redis. Shared между репликами одного сервиса: если consumer-реплика A обработала сообщение и записала ключ, реплика B при повторной доставке увидит его и пропустит. Без Redis (in-memory) это не работает: реплики не видят state друг друга.
TTL по умолчанию — 24 часа. Обоснование — §Окно ниже.
Архитектура
Путь одного сообщения через middleware-стек router’а: Deduplicator —
последний узел перед handler’ом. Он делает атомарный SETNX в Redis
по Message.UUID: если ключ новый — handler вызывается; если ключ
уже существует — сообщение проглатывается как дубль и ACK’ается.
Детали по реализации Repository — §RedisDedupRepository; по порядку
middleware — §Setup и ../conventions/events.
Setup
import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
)
func newDeduplicator(redisClient *redis.Client, logger watermill.LoggerAdapter) message.HandlerMiddleware {
dedup := middleware.Deduplicator{
KeyFactory: func(msg *message.Message) (string, bool, error) {
// false в boolean — "не уникальный ключ, не надо логировать дубль"
// true — делать предупреждение про дубли.
return msg.UUID, true, nil
},
Repository: newRedisDedupRepo(redisClient),
Timeout: 24 * time.Hour,
}
return dedup.Middleware
}
router.AddMiddleware(
middleware.CorrelationID,
middleware.Recoverer,
poisonQueue(kafkaPub, dlqTopic),
retryMiddleware.Middleware,
newDeduplicator(redisClient, watermillLogger), // последний
)Порядок — критичен. Deduplicator после Retry, а не до. Иначе retry внутри одного обработчика будет падать на «уже обработано» при первой же неудаче.
RedisDedupRepository
Минимальная реализация:
package eventdedup
import (
"context"
"errors"
"time"
"github.com/redis/go-redis/v9"
)
type RedisRepo struct {
client *redis.Client
prefix string
}
func NewRedisRepo(client *redis.Client, prefix string) *RedisRepo {
return &RedisRepo{client: client, prefix: prefix}
}
// IsDuplicate атомарно пытается установить ключ с TTL.
// Возвращает true, если ключ уже существовал (=дубль),
// false — если ключ установлен этим вызовом (= первое появление).
func (r *RedisRepo) IsDuplicate(ctx context.Context, key string, ttl time.Duration) (bool, error) {
full := r.prefix + ":" + key
set, err := r.client.SetNX(ctx, full, "1", ttl).Result()
if err != nil {
return false, err
}
return !set, nil
}Ключевой примитив — SETNX (Set If Not Exists). Это атомарная
операция: если ключ не существовал — устанавливается и возвращает
«установили»; если существовал — не трогаем, возвращаем «уже был».
Никакой race condition между «прочитать», «решить», «записать» — всё
атомарно.
prefix — <service>:dedup:<handler>. Пример:
notification:dedup:on-review-created. Префикс нужен, чтобы ключи
разных handler’ов не перетирались (если один handler пропустил
сообщение как дубль, другой handler при ретре мог бы подумать, что и
он уже его обрабатывал).
Эта реализация живёт в pkg/eventdedup/ сервис-репо. Пока не вынесено
в shared-lib — синхронизация копированием (см. правила в
../conventions/project-layout.md §pkg).
Окно (TTL)
TTL = сколько хранить ключ в Redis. После истечения ключ исчезает, повтор сообщения с тем же UUID будет пропущен через handler как новый.
Выбор окна:
| Тип сообщений | TTL | Обоснование |
|---|---|---|
| Обычные domain events | 24h | Стандарт. Покрывает: retry на ошибке (5 попыток × 30s = 2.5 мин), rebalance (секунды), forwarder-gap (минуты), длительный инцидент Kafka (часы). |
Critical events (user.banned, consent-revoke) | 7d | Если мы случайно обработаем user.banned дважды — второй раз это безопасный no-op, но нужно гарантировать, что даже после недельной отладочной паузы consumer’а повтор не пойдёт в downstream. |
| High-volume analytics | 1h или меньше | Если допустим небольшой процент дублей в аналитике и объём сообщений критичен для Redis-памяти. Только для событий, где дубль = небольшая статистическая ошибка, не бизнес-эффект. |
Никогда не ставь TTL меньше 10 минут. Retry-окно внутри одного
handler’а при 5 попытках с MaxInterval: 30s + jitter может
растянуться до 3–5 минут, плюс consumer-rebalance. TTL в 1 минуту
поломает retry-путь: второй заход отработает как «новый», не как
retry.
TTL дедуп-ключа per use-case
Таблица ниже — справочник для типовых сценариев. Сортировка по возрастанию TTL:
| Use case | TTL | Обоснование |
|---|---|---|
| In-process batch retry (e.g., HTTP inline retry на 3 попытки) | 5 минут | Достаточно для 3 попыток backoff до 60с; риск потери окна низкий. |
| Kafka consumer обычной pipeline | 1 час | Покрывает rebalance (≤ 5 мин) + repeat delivery после restart broker. |
| Cross-service webhook (внешний producer неизвестен) | 24 часа | Default; покрывает длинные сетевые инциденты и ручной replay. |
| Financial-like операция (once-ever гарантия важна) | 7 суток | Покрывает длинный инцидент + ручное разбирательство, потом UPSERT в БД гарантирует дальнейшую защиту. |
| Event replay из архива | 30 суток | Соответствует окну retention Kafka-топика. |
Правило:
- Никогда не ниже 5 минут — меньшее окно не покрывает retry с backoff’ом даже в простейшей HTTP-цепочке.
- Никогда не выше retention Kafka-топика — дедуп-ключ бесполезен для сообщений, которые уже не могут прийти из топика.
Более длинное окно всегда допустимо; более короткое — только с обоснованием в коде (комментарий рядом с конфигом Deduplicator’а) и подтверждённой idempotency на БД-уровне (UPSERT по natural key).
Fail-open vs fail-closed
Что делать, если Redis недоступен?
Дефолт — fail-CLOSED
Redis вернул ошибку → IsDuplicate возвращает error → Deduplicator
пропускает ошибку наверх → Retry middleware переотправит → если Redis
не восстановился после 5 попыток, сообщение уйдёт в DLQ (см.
§Fail-open при Redis down про CB поверх
этого поведения).
Это безопасный дефолт, потому что:
- Redis-outage обычно короткий (секунды-минуты), DLQ за это время не переполнится.
- Большинство handler’ов не имеют bullet-proof DB-level
идемпотентности для всех побочных эффектов. Есть
ON CONFLICT DO NOTHINGв main-path, но side effects (FCM push, SMS, внешние платные вызовы) не защищены. - Простой handler без DB-level идемпотентности + fail-open = прямой дубликат данных при любой Redis-паузе.
Fail-OPEN — только с DB-level идемпотентностью
Fail-open допустим только если handler защищён от дубля на уровне Postgres:
INSERT ... ON CONFLICT DO NOTHINGпо natural key (event_id,(aggregate_id, event_type, seq)и т.п.), и- side effects гейтятся результатом этого insert’а (
RowsAffected=0→ push не отправляем — см. §External side effects).
Без этого fail-open = тихие дубликаты в бизнес-данных и во внешних системах. Включать fail-open на handler’е без DB-level защиты — запрещено; на ревью такой PR заворачивается.
Выбор фиксируется в конфиге сервиса:
repo := eventdedup.NewRedisRepo(redisClient, "notification:dedup:on-review-created")
dedup := middleware.Deduplicator{
KeyFactory: func(m *message.Message) (string, bool, error) { return m.UUID, true, nil },
Repository: eventdedup.FailClosed(repo), // дефолт; FailOpen — только с DB-level защитой
Timeout: 24 * time.Hour,
}FailOpen/FailClosed — тонкие wrapper’ы, превращающие ошибку Redis
либо в «not-a-duplicate, proceed», либо в пробрасывание ошибки
наверх. Для сервисов с эскалацией при long outage используется
CB-обёртка из §Реализация через circuit breaker,
которая автоматически переходит fail-closed → fail-open при серии
timeout’ов.
Fail-open при Redis down
Warning. Fail-open без DB-level идемпотентности = дубликаты бизнес-данных. Не включать без ревью lead-инженера backend и явного подтверждения, что handler защищён UPSERT по natural key.
В предыдущем разделе описан семантический выбор fail-open vs fail-closed. Этот раздел — про runtime-поведение, когда Redis реально недоступен, и про правильную защиту через circuit breaker.
Дефолт — fail-CLOSED
Если Redis возвращает ошибку (timeout, connection refused) — handler возвращает error, сообщение не коммитится в Kafka, Retry-middleware переотправит его, в худшем случае — в DLQ. Это безопасно при коротких сбоях (Redis перезагружается, network glitch).
Fail-closed — правильный дефолт, потому что:
- Большинство консьюмеров не имеют DB-level идемпотентности «bullet-
proof» уровня — есть
ON CONFLICT DO NOTHINGв main-path, но нет защиты для side effects (FCM push, внешние вызовы). - Redis outage обычно короткий (секунды-минуты), DLQ за эти минуты не переполнится.
Когда переходить на fail-OPEN
Fail-open оправдан, когда:
- Long Redis outage (> 5 минут) — lag consumer’ов начинает расти, DLQ копится, downstream пропускает свежие события.
- Business decision «лучше продублировать, чем потерять свежесть» — например, для notifications-fanout, где лучше дважды показать уведомление, чем не показать вовсе.
- Handler имеет DB-level идемпотентность через natural key
(
INSERT ... ON CONFLICT DO NOTHINGпо(aggregate_id, event_type, seq)илиUPSERTс business-ключом). Тогда дубль на consumer-side не приводит к дубликату данных.
Без DB-level идемпотентности fail-open запрещён. Fail-closed и разбор инцидента вручную.
Реализация через circuit breaker
Автоматический переход fail-closed → fail-open — circuit breaker вокруг Redis SETNX:
- 5 подряд timeout’ов → CB open. Cooldown 30s.
- В HalfOpen — один probe-запрос; успех → closed, провал → open ещё на 30s.
- При open CB:
- Consumer продолжает обработку без дедупликации.
- Каждое обработанное сообщение логируется с
dedup=bypassedна WARN-уровне. - Счётчик
consumer_dedup_bypassed_total{topic, consumer_group}инкрементится; alert выставлен на> 0(любое срабатывание — повод посмотреть). - Бизнес-handler обязан иметь DB-level идемпотентность (UPSERT /
ON CONFLICT DO NOTHINGпо natural key).
Decorator на DedupRepository:
package eventdedup
import (
"context"
"errors"
"log/slog"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/sony/gobreaker/v2"
)
var (
dedupBypassed = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "consumer_dedup_bypassed_total",
Help: "Сообщений, обработанных без дедупа из-за Redis CB open.",
},
[]string{"topic", "consumer_group"},
)
)
type FailOpenMiddleware struct {
inner DedupRepository
breaker *gobreaker.CircuitBreaker[bool]
logger *slog.Logger
topic string
group string
}
func NewFailOpenMiddleware(inner DedupRepository, topic, group string, logger *slog.Logger) *FailOpenMiddleware {
cb := gobreaker.NewCircuitBreaker[bool](gobreaker.Settings{
Name: "redis-dedup-" + group,
MaxRequests: 1,
Interval: 60 * time.Second,
Timeout: 30 * time.Second,
ReadyToTrip: func(counts gobreaker.Counts) bool {
return counts.ConsecutiveFailures >= 5
},
OnStateChange: func(name string, from, to gobreaker.State) {
logger.Warn("dedup circuit breaker state change",
"name", name, "from", from.String(), "to", to.String())
},
})
return &FailOpenMiddleware{inner: inner, breaker: cb, logger: logger, topic: topic, group: group}
}
func (m *FailOpenMiddleware) IsDuplicate(ctx context.Context, key string, ttl time.Duration) (bool, error) {
dup, err := m.breaker.Execute(func() (bool, error) {
return m.inner.IsDuplicate(ctx, key, ttl)
})
if err != nil {
if errors.Is(err, gobreaker.ErrOpenState) {
// CB open: fail-open, пропускаем без дедупа
dedupBypassed.WithLabelValues(m.topic, m.group).Inc()
m.logger.WarnContext(ctx, "dedup bypassed: circuit breaker open",
"dedup", "bypassed", "key", key, "topic", m.topic)
return false, nil // "не дубль" — пропускаем в handler
}
// Ошибка Redis, но CB ещё не открыт — пропускаем наверх (fail-closed)
return false, err
}
return dup, nil
}Подключение в setup:
redisRepo := eventdedup.NewRedisRepo(redisClient, "notification:dedup:on-review-created")
failOpen := eventdedup.NewFailOpenMiddleware(redisRepo, "kazmaps.review.review",
"notification-on-review-created", logger)
dedup := middleware.Deduplicator{
KeyFactory: func(m *message.Message) (string, bool, error) { return m.UUID, true, nil },
Repository: failOpen, // носит в себе CB
Timeout: 24 * time.Hour,
}Alert:
consumer_dedup_bypassed_total > 0— page. Любой bypass = возможные дубли в downstream; нужно фиксить Redis и проверить, что DB-level идемпотентность реально сработала (нет ли дубликатов в БД, FCM- логах).
Альтернативы и почему не используем
Таблица dedup_records per сервис
Пример антипаттерна: в notification/internal/service/dedup.go сейчас
сделано «Redis SETNX, при ошибке fallback в Postgres-таблицу
dedup_records». Мотивация понятна: Redis умрёт → не потеряем
идемпотентность. Но:
- Таблица
dedup_records— ещё один источник владения, своя миграция, свой cleanup, свой retention. - Проверка через Postgres на каждый event — latency в разы выше, чем Redis SETNX (БД-connection, транзакция, FS-write). На фоне нагрузки на ту же БД от handler’а это двойная нагрузка.
- Ключ дедупликации в этой реализации собирается из бизнес-полей
(
user_id + event_type + ref_id), не изMessage.UUID. Это ненадёжно: см. §Anti-patterns про dedup по business-key. - Каждый сервис реализует свой таблично-fallback механизм, несовместимо между сервисами, не переиспользуется.
Правильный путь — fail-open Redis через стандартный Deduplicator. Redis-outage на минуты — приемлемый риск; на часы — серьёзный инцидент, и тогда спасать ты будешь не дедупликацией, а стабильностью Redis.
Idempotency-key от клиента на API-level
Клиент шлёт Idempotency-Key в HTTP-header, сервер проверяет — если
уже видел, возвращает сохранённый ответ. Это другой паттерн: он
защищает от client-side retries на HTTP-уровне. К consumer-side
дедупликации прямого отношения не имеет. Нужно отдельно, для публичных
write endpoint’ов (не здесь).
Natural idempotency через UPSERT
Если бизнес-операция handler’а — INSERT ... ON CONFLICT DO NOTHING
или UPDATE ... WHERE status != 'done', повторный вызов безопасен
сам по себе. В таких случаях строго говоря Deduplicator не нужен.
Но: downstream effects могут быть не в БД. Push в FCM не идемпотентен: каждый вызов = новое уведомление на телефоне пользователя. Если handler и пишет в БД, и шлёт push, UPSERT защитит только БД, push уйдёт дважды.
Вывод: Deduplicator — defense in depth. Используем всегда, даже
при идемпотентной бизнес-логике. Лишний SETNX в Redis — не стоящая
экономии оптимизация.
Idempotency на уровне бизнес-логики
Deduplicator — не замена корректного дизайна handler’а. Даже с Deduplicator’ом handler должен быть идемпотентен by design, по двум причинам:
- Redis-outage под fail-open → Deduplicator пропускает сообщение, handler должен не сломаться.
- TTL истёк → после окна дубль может прийти снова, handler должен корректно это пережить.
Приёмы:
INSERT ... ON CONFLICT DO NOTHING
const q = `
INSERT INTO sent_notifications (event_id, user_id, sent_at)
VALUES ($1, $2, NOW())
ON CONFLICT (event_id) DO NOTHING`
tag, err := tx.Exec(ctx, q, eventID, userID)Если строка уже была (дубль), DO NOTHING — повторный insert
бесшумно не происходит. tag.RowsAffected() покажет 0 — можно
решить, отправлять ли push. Обычно: если RowsAffected=0, значит,
событие уже обрабатывалось — push пропускаем.
Проверка status перед update
const q = `
UPDATE reviews
SET status = 'published', published_at = NOW()
WHERE id = $1 AND status = 'pending'`Повторный UPDATE с тем же id не перезапишет уже опубликованный
review (условие status = 'pending' не выполнится). Без этой проверки
повторный update мог бы подтереть published_at более свежим значением.
External side effects — check-then-act
Для внешних вызовов (FCM push, SMS-gateway) — отмечаем факт отправки до вызова и проверяем перед повтором:
err := s.db.InTx(ctx, func(tx pgx.Tx) error {
res, err := tx.Exec(ctx,
`INSERT INTO push_sent (event_id, user_id, sent_at)
VALUES ($1, $2, NOW())
ON CONFLICT (event_id) DO NOTHING`,
eventID, userID)
if err != nil {
return err
}
if res.RowsAffected() == 0 {
// уже отправляли — не делаем повторный FCM-вызов
return nil
}
// отметили факт, теперь безопасно слать push
return s.fcm.Send(ctx, userID, payload)
})Тонкость: если fcm.Send упадёт после того, как insert’нулась запись
push_sent, transaction откатится и мы не отправим push. Это
acceptable trade-off: лучше не отправить при ошибке FCM, чем отправить
несколько раз. Retry на следующем заходе увидит пустую push_sent,
попробует снова.
Правило: handler — идемпотентен
На ревью новый handler должен ответить на вопрос: «что будет, если меня вызвать с тем же сообщением дважды подряд?» Ответ: «ничего нового не произойдёт». Если ответ — «дважды отправим письмо / дважды спишем / дважды обновим счётчик» — handler надо переделать.
Correlation-Id vs Event-Id
Разница часто путается. Обе — идентификаторы в envelope, но разные по смыслу.
- Event-Id (
Message.UUIDв Watermill) — уникальный для каждого сообщения. Генерируется один раз на публикацию, как ULID. Одинuser.registeredдля пользователя 42 = один Event-Id. Дубль этого же сообщения на consumer-сайде придёт с тем же Event-Id — поэтому Deduplicator дедуплицирует именно по нему. - Correlation-Id (
Metadata["Correlation-Id"]) — общий для всей логической цепочки. Один HTTP-запрос → 3 события → 2 downstream события в других сервисах → все 5 несут один Correlation-Id. Не уникален per message.
Не дедуплицируй по Correlation-Id. Это приведёт к тому, что второе и третье события одной цепочки будут выглядеть как дубли первого — и будут молча проглочены.
Детали в ../glossary.
Monitoring
Метрики
messages_deduplicated_total{service, topic, handler}
— counter, сколько сообщений Deduplicator отверг как дубли.
messages_processed_total{service, topic, handler, result}
— counter, общее количество; result ∈ {success, error, deduplicated}.
deduplication_check_duration_seconds{service}
— histogram, latency Redis SETNX.
deduplication_errors_total{service, reason}
— counter, reason ∈ {redis_timeout, redis_conn, marshal}.Middleware, выставляющий эти метрики, — в pkg/eventmw/ сервис-репо.
Подключается к Deduplicator.Repository wrapper’ом.
Alerting
- Рост дублей. Alert:
rate(messages_deduplicated_total[5m]) / rate(messages_processed_total[5m]) > 0.1в течение 10 минут → ticket. Нормальный фон — меньше 1% (редкий retry). 10%+ = upstream publish дублирует систематически (баг в forwarder’е, баг в publisher’е). - Redis-outage. Alert:
rate(deduplication_errors_total[5m]) > 0→ ticket. Под fail-open не ломает сервис, но надо разобраться. - Латенси Redis. Alert:
histogram_quantile(0.99, deduplication_check_duration_seconds) > 0.05(50ms на p99) → ticket. Redis должен отвечать быстро; медленный Redis тормозит все handler’ы.
Testing
Unit
Подставь fake DedupRepo, проверь, что второй вызов handler’а с тем же
UUID возвращает true (дубль).
type fakeRepo struct {
seen map[string]bool
}
func (r *fakeRepo) IsDuplicate(ctx context.Context, key string, ttl time.Duration) (bool, error) {
if r.seen[key] {
return true, nil
}
r.seen[key] = true
return false, nil
}
func TestDeduplicator_SecondCallIsDuplicate(t *testing.T) {
repo := &fakeRepo{seen: map[string]bool{}}
dup, _ := repo.IsDuplicate(ctx, "evt-1", time.Hour)
if dup {
t.Fatal("first call must not be duplicate")
}
dup, _ = repo.IsDuplicate(ctx, "evt-1", time.Hour)
if !dup {
t.Fatal("second call must be duplicate")
}
}Integration через gochannel
Проверяем интеграцию handler’а + Deduplicator’а + router’а в in-memory режиме:
func TestRouter_DuplicatesIgnored(t *testing.T) {
pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil)
defer pubsub.Close()
fakeSvc := &fakeNotificationService{}
repo := &fakeRepo{seen: map[string]bool{}}
dedup := middleware.Deduplicator{
KeyFactory: func(m *message.Message) (string, bool, error) { return m.UUID, true, nil },
Repository: repo,
Timeout: time.Hour,
}
router, _ := message.NewRouter(message.RouterConfig{}, nil)
router.AddMiddleware(dedup.Middleware)
router.AddNoPublisherHandler("t", "topic", pubsub, newHandler(fakeSvc).Handle)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { _ = router.Run(ctx) }()
<-router.Running()
for i := 0; i < 3; i++ {
msg := message.NewMessage("evt-1", []byte(`{"x":1}`))
msg.Metadata.Set("Event-Type", "foo")
_ = pubsub.Publish("topic", msg)
}
eventually(t, 2*time.Second, func() bool { return fakeSvc.handled == 1 })
// Три публикации с одним UUID → handler вызван один раз.
}Integration с реальным Redis
Testcontainers Redis + RedisRepo. Проверь: два SETNX с одним ключом
— второй возвращает «уже был». TTL истекает — после истечения можно
снова. См.
../conventions/testing про
testcontainers.
Anti-patterns
Нет dedup и handler не идемпотентен
Классическая ошибка новичка. Retry на первой же временной ошибке →
handler отработал дважды → в БД два ряда, в FCM два push’а. Обнаружится
только под продовой нагрузкой. Всегда Deduplicator + идемпотентный
handler.
Dedup по business-key вместо Message.UUID
// Плохо
key := fmt.Sprintf("user:%d:event:%s:ref:%d", userID, eventType, refID)Два разных логических события с одним и тем же business-key совпадут.
Например, если случилось два review.updated для одного ревью подряд
(legitimate — пользователь правил дважды), второе будет молча
проглочено. Баг, который очень тяжело поймать.
Единственное исключение: если бизнес требует, чтобы между
событиями для одного key было окно дедупликации (например, user. visited.place с окном 1 час — чтобы не спамить). Тогда это не dedup,
а rate-limiter по business-key, отдельная функция, не
Deduplicator.
Коротое TTL (5 минут)
Не покрывает retry-окно. middleware.Retry с 5 попытками и
MaxInterval: 30s + jitter растягивается до 3–5 минут. 5-минутное TTL
— на грани, любое залипание (Kafka rebalance 1 мин, Redis latency
spike 30s) пробивает окно. Минимум 1 час, по умолчанию — 24 часа.
Local in-process cache вместо Redis
var seenMu sync.Mutex
var seen = map[string]bool{} // ПЛОХОПри multi-pod деплое каждая реплика имеет свой seen. Событие пришло
на реплику A, обработалось, offset не закоммитился, Kafka сделала
rebalance, то же сообщение попало на реплику B — у неё seen пустой,
обработает снова. Нужен shared store, Redis или БД.
Pod-local с fallback в БД
Смесь предыдущих двух: local cache + при miss проверка в Postgres. Сложная логика, маленькая польза. Redis один раз настроенный и общий — проще и надёжнее.
Dedup before Retry
// Плохо: dedup перед retry
router.AddMiddleware(
dedup.Middleware, // первый
retry.Middleware, // второй
)Retry внутри одного handler-вызова генерирует «вторую попытку» того же сообщения (тот же UUID) → dedup считает её дублем → retry не срабатывает. Dedup всегда после Retry в цепочке middleware.
Deduplicator подключили, handler не идемпотентен
Работает, пока Redis доступен. В день, когда Redis ляжет на 10 минут, fail-open пропустит несколько сообщений через неидемпотентный handler → дубликаты. Deduplicator — первая линия, идемпотентный handler — вторая.
FAQ
«Можно ли добиться exactly-once?» В распределённой системе с сетевыми сбоями — нет, реально невозможно. Best achievable — at-least- once + idempotency на consumer-сайде. Это и даёт «effectively once» для наблюдателя.
«Почему не использовать kafka.transactional.id для exactly-once?»
Kafka transactional producer даёт exactly-once только в цепочке
Kafka→Kafka (например, stream processor). Наш кейс — outbox (Postgres
→ Kafka → Postgres) — за границы transactional-семантики Kafka, не
покрывается. Нужна своя idempotency.
«Zero duplicates возможен?» Нет. Процент дублей можно снизить до
долей процента, но не до нуля. Замеряй messages_deduplicated_total / messages_processed_total и ставь alert на аномалии.
«Можно ли использовать consumer offset как idempotency ключ?» Нет. Offset — позиция в партиции, не уникальный id сообщения. Rebalance изменяет соответствие offset → сообщение.
«А если Redis полностью недоступен неделю?» При fail-open сервис работает, обработки дублей могут быть. Основной план — fix Redis. Нет смысла строить dedup-fallback в Postgres, который работает неделю — это отдельная задача с отдельной сложностью, и её redis-outage на неделю ты всё равно заметишь по остальным симптомам.
«Что делать, если handler частично успешен — БД обновлена, push не
отправлен?» Это отдельная задача про distributed side effects.
Решается через outbox+retry для каждого побочного эффекта отдельно
(push-queue как свой outbox). Здесь это выходит за рамки темы; но
сама консистентность handler’а внутри одной БД-транзакции —
обеспечивается InTx.
Связанные разделы
outbox— at-least-once на publisher-сайде; дубли, которые дойдут до consumer’а, именно здесь рождаются.cqrs— CQRS handler’ы тоже нуждаются в Deduplicator’е.../conventions/events— полный middleware stack, порядок, envelope.../how-to/add-kafka-event— чеклист включает idempotency-стратегию.../troubleshooting/kafka-consumer-stuck— что делать, если consumer завис и лаг растёт.../glossary— Deduplicator, Event-Id, Correlation-Id, Idempotency.