Перейти к содержанию

Идемпотентный consumer

Deep-dive по идемпотентности: как сделать так, чтобы повторная доставка сообщения не приводила к повторной работе. Reference по событиям и middleware — ../conventions/events.md. Эта страница — про внутренности дедупликации, Redis-хранилище, выбор окна, поведение при сбоях, idempotency на уровне бизнес-логики.

Содержание

Проблема

Kafka даёт at-least-once семантику. Это значит, что одно и то же сообщение может быть доставлено consumer'у 2+ раз. Источники повторов:

  • Retry на ошибке. Handler вернул error → middleware.Retry переотправил сообщение → при успехе только второго захода, первый побочный эффект уже случился.
  • Rebalance в consumer-group. Kafka перераспределяет партиции между репликами → partial offset commit → следующая реплика перечитывает «хвост».
  • Forwarder переопубликовал. Он записал в Kafka, но не успел пометить строку в outbox как acked (см. outbox.md) → при рестарте поднял ту же строку снова.
  • Рестарт consumer'а между обработкой и commit'ом offset'а. Handler отработал, offset не закоммитился → следующая инкарнация перечитает.

Итог: handler обязан корректно обрабатывать повторную доставку. Нельзя рассчитывать, что «у нас не будет retry» или «Kafka настроена аккуратно». Это свойство транспорта, не настройки.

Стандарт: Watermill middleware.Deduplicator

В нашем consumer middleware stack (см. ../conventions/events.md) последним по порядку идёт Deduplicator. Он хранит в Redis множество уже обработанных Message.UUID и при повторной доставке проглатывает сообщение до вызова handler'а — возвращает успех, offset коммитится, downstream работа не происходит.

Ключ дедупликации — Message.UUID (это ULID из envelope, см. ../conventions/events.md §Envelope). Не Event-Type, не Correlation-Id, не aggregate_id. Именно UUID — уникальный для этого конкретного сообщения, генерируется на publisher-сайде один раз.

Хранилище — Redis. Shared между репликами одного сервиса: если consumer-реплика A обработала сообщение и записала ключ, реплика B при повторной доставке увидит его и пропустит. Без Redis (in-memory) это не работает: реплики не видят state друг друга.

TTL по умолчанию — 24 часа. Обоснование — §Окно ниже.

Архитектура

Путь одного сообщения через middleware-стек router'а: Deduplicator — последний узел перед handler'ом. Он делает атомарный SETNX в Redis по Message.UUID: если ключ новый — handler вызывается; если ключ уже существует — сообщение проглатывается как дубль и ACK'ается.

flowchart TD
    K[Kafka message]
    R[Router middleware stack]
    D{Deduplicator}
    Redis[(Redis SETNX<br/>dedup:uuid)]
    H[Handler]
    A[ACK]
    NA[NACK → Retry/DLQ]
    K --> R
    R --> D
    D -->|Check UUID| Redis
    Redis -->|новый| D
    D -->|первый раз| H
    Redis -->|duplicate| D
    D -->|уже обработан| A
    H -->|success| A
    H -->|error| NA

Детали по реализации Repository — §RedisDedupRepository; по порядку middleware — §Setup и ../conventions/events.md.

Setup

import (
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/message/router/middleware"
)

func newDeduplicator(redisClient *redis.Client, logger watermill.LoggerAdapter) message.HandlerMiddleware {
    dedup := middleware.Deduplicator{
        KeyFactory: func(msg *message.Message) (string, bool, error) {
            // false в boolean — "не уникальный ключ, не надо логировать дубль"
            // true — делать предупреждение про дубли.
            return msg.UUID, true, nil
        },
        Repository: newRedisDedupRepo(redisClient),
        Timeout:    24 * time.Hour,
    }
    return dedup.Middleware
}

router.AddMiddleware(
    middleware.CorrelationID,
    middleware.Recoverer,
    poisonQueue(kafkaPub, dlqTopic),
    retryMiddleware.Middleware,
    newDeduplicator(redisClient, watermillLogger), // последний
)

Порядок — критичен. Deduplicator после Retry, а не до. Иначе retry внутри одного обработчика будет падать на «уже обработано» при первой же неудаче.

RedisDedupRepository

Минимальная реализация:

package eventdedup

import (
    "context"
    "errors"
    "time"

    "github.com/redis/go-redis/v9"
)

type RedisRepo struct {
    client *redis.Client
    prefix string
}

func NewRedisRepo(client *redis.Client, prefix string) *RedisRepo {
    return &RedisRepo{client: client, prefix: prefix}
}

// IsDuplicate атомарно пытается установить ключ с TTL.
// Возвращает true, если ключ уже существовал (=дубль),
// false — если ключ установлен этим вызовом (= первое появление).
func (r *RedisRepo) IsDuplicate(ctx context.Context, key string, ttl time.Duration) (bool, error) {
    full := r.prefix + ":" + key
    set, err := r.client.SetNX(ctx, full, "1", ttl).Result()
    if err != nil {
        return false, err
    }
    return !set, nil
}

Ключевой примитив — SETNX (Set If Not Exists). Это атомарная операция: если ключ не существовал — устанавливается и возвращает «установили»; если существовал — не трогаем, возвращаем «уже был». Никакой race condition между «прочитать», «решить», «записать» — всё атомарно.

prefix<service>:dedup:<handler>. Пример: notification:dedup:on-review-created. Префикс нужен, чтобы ключи разных handler'ов не перетирались (если один handler пропустил сообщение как дубль, другой handler при ретре мог бы подумать, что и он уже его обрабатывал).

Эта реализация живёт в pkg/eventdedup/ сервис-репо. Пока не вынесено в shared-lib — синхронизация копированием (см. правила в ../conventions/db-pgx.md §pkg).

Окно (TTL)

TTL = сколько хранить ключ в Redis. После истечения ключ исчезает, повтор сообщения с тем же UUID будет пропущен через handler как новый.

Выбор окна:

Тип сообщений TTL Обоснование
Обычные domain events 24h Стандарт. Покрывает: retry на ошибке (5 попыток × 30s = 2.5 мин), rebalance (секунды), forwarder-gap (минуты), длительный инцидент Kafka (часы).
Critical events (user.banned, consent-revoke) 7d Если мы случайно обработаем user.banned дважды — второй раз это безопасный no-op, но нужно гарантировать, что даже после недельной отладочной паузы consumer'а повтор не пойдёт в downstream.
High-volume analytics 1h или меньше Если допустим небольшой процент дублей в аналитике и объём сообщений критичен для Redis-памяти. Только для событий, где дубль = небольшая статистическая ошибка, не бизнес-эффект.

Никогда не ставь TTL меньше 10 минут. Retry-окно внутри одного handler'а при 5 попытках с MaxInterval: 30s + jitter может растянуться до 3–5 минут, плюс consumer-rebalance. TTL в 1 минуту поломает retry-путь: второй заход отработает как «новый», не как retry.

Fail-open vs fail-closed

Что делать, если Redis недоступен?

Fail-open (default)

Redis вернул ошибку → Deduplicator пропускает сообщение через handler без проверки. Преимущество — сервис продолжает работать. Риск — под retry может случиться дубль, если handler не идемпотентен на БД-уровне.

Это выбор по умолчанию для большинства сервисов. Availability важнее, чем 100% гарантия отсутствия дублей на секундах Redis-outage.

Fail-closed

Redis вернул ошибку → IsDuplicate возвращает error → Deduplicator пропускает ошибку дальше → Retry middleware переотправит → если Redis не восстановился, сообщение через 5 попыток уйдёт в DLQ.

Выбирается для сервисов, где дубль = серьёзный бизнес-эффект: списание денег, отправка платного push'а внешнему провайдеру за $0.01 каждый раз, операции, которые нельзя откатить. У нас таких сервисов пока нет; появится — его handler явно отмечается как fail-closed, и это конфигурируется per-handler.

Выбор фиксируется в конфиге сервиса:

repo := eventdedup.NewRedisRepo(redisClient, "notification:dedup:on-review-created")
dedup := middleware.Deduplicator{
    KeyFactory: func(m *message.Message) (string, bool, error) { return m.UUID, true, nil },
    Repository: eventdedup.FailOpen(repo), // или FailClosed(repo)
    Timeout:    24 * time.Hour,
}

FailOpen/FailClosed — тонкие wrapper'ы, превращающие ошибку Redis либо в «not-a-duplicate, proceed», либо в пробрасывание ошибки наверх.

Альтернативы и почему не используем

Таблица dedup_records per сервис

Пример антипаттерна: в notification/internal/service/dedup.go сейчас сделано «Redis SETNX, при ошибке fallback в Postgres-таблицу dedup_records». Мотивация понятна: Redis умрёт → не потеряем идемпотентность. Но:

  • Таблица dedup_records — ещё один источник владения, своя миграция, свой cleanup, свой retention.
  • Проверка через Postgres на каждый event — latency в разы выше, чем Redis SETNX (БД-connection, транзакция, FS-write). На фоне нагрузки на ту же БД от handler'а это двойная нагрузка.
  • Ключ дедупликации в этой реализации собирается из бизнес-полей (user_id + event_type + ref_id), не из Message.UUID. Это ненадёжно: см. §Anti-patterns про dedup по business-key.
  • Каждый сервис реализует свой таблично-fallback механизм, несовместимо между сервисами, не переиспользуется.

Правильный путь — fail-open Redis через стандартный Deduplicator. Redis-outage на минуты — приемлемый риск; на часы — серьёзный инцидент, и тогда спасать ты будешь не дедупликацией, а стабильностью Redis.

Idempotency-key от клиента на API-level

Клиент шлёт Idempotency-Key в HTTP-header, сервер проверяет — если уже видел, возвращает сохранённый ответ. Это другой паттерн: он защищает от client-side retries на HTTP-уровне. К consumer-side дедупликации прямого отношения не имеет. Нужно отдельно, для публичных write endpoint'ов (не здесь).

Natural idempotency через UPSERT

Если бизнес-операция handler'а — INSERT ... ON CONFLICT DO NOTHING или UPDATE ... WHERE status != 'done', повторный вызов безопасен сам по себе. В таких случаях строго говоря Deduplicator не нужен.

Но: downstream effects могут быть не в БД. Push в FCM не идемпотентен: каждый вызов = новое уведомление на телефоне пользователя. Если handler и пишет в БД, и шлёт push, UPSERT защитит только БД, push уйдёт дважды.

Вывод: Deduplicator — defense in depth. Используем всегда, даже при идемпотентной бизнес-логике. Лишний SETNX в Redis — не стоящая экономии оптимизация.

Idempotency на уровне бизнес-логики

Deduplicator — не замена корректного дизайна handler'а. Даже с Deduplicator'ом handler должен быть идемпотентен by design, по двум причинам:

  1. Redis-outage под fail-open → Deduplicator пропускает сообщение, handler должен не сломаться.
  2. TTL истёк → после окна дубль может прийти снова, handler должен корректно это пережить.

Приёмы:

INSERT ... ON CONFLICT DO NOTHING

const q = `
    INSERT INTO notification.sent_notifications (event_id, user_id, sent_at)
    VALUES ($1, $2, NOW())
    ON CONFLICT (event_id) DO NOTHING`
tag, err := tx.Exec(ctx, q, eventID, userID)

Если строка уже была (дубль), DO NOTHING — повторный insert бесшумно не происходит. tag.RowsAffected() покажет 0 — можно решить, отправлять ли push. Обычно: если RowsAffected=0, значит, событие уже обрабатывалось — push пропускаем.

Проверка status перед update

const q = `
    UPDATE review.reviews
       SET status = 'published', published_at = NOW()
     WHERE id = $1 AND status = 'pending'`

Повторный UPDATE с тем же id не перезапишет уже опубликованный review (условие status = 'pending' не выполнится). Без этой проверки повторный update мог бы подтереть published_at более свежим значением.

External side effects — check-then-act

Для внешних вызовов (FCM push, SMS-gateway) — отмечаем факт отправки до вызова и проверяем перед повтором:

err := s.db.InTx(ctx, func(tx pgx.Tx) error {
    res, err := tx.Exec(ctx,
        `INSERT INTO notification.push_sent (event_id, user_id, sent_at)
         VALUES ($1, $2, NOW())
         ON CONFLICT (event_id) DO NOTHING`,
        eventID, userID)
    if err != nil {
        return err
    }
    if res.RowsAffected() == 0 {
        // уже отправляли — не делаем повторный FCM-вызов
        return nil
    }
    // отметили факт, теперь безопасно слать push
    return s.fcm.Send(ctx, userID, payload)
})

Тонкость: если fcm.Send упадёт после того, как insert'нулась запись push_sent, transaction откатится и мы не отправим push. Это acceptable trade-off: лучше не отправить при ошибке FCM, чем отправить несколько раз. Retry на следующем заходе увидит пустую push_sent, попробует снова.

Правило: handler — идемпотентен

На ревью новый handler должен ответить на вопрос: «что будет, если меня вызвать с тем же сообщением дважды подряд?» Ответ: «ничего нового не произойдёт». Если ответ — «дважды отправим письмо / дважды спишем / дважды обновим счётчик» — handler надо переделать.

Correlation-Id vs Event-Id

Разница часто путается. Обе — идентификаторы в envelope, но разные по смыслу.

  • Event-Id (Message.UUID в Watermill) — уникальный для каждого сообщения. Генерируется один раз на публикацию, как ULID. Один user.registered для пользователя 42 = один Event-Id. Дубль этого же сообщения на consumer-сайде придёт с тем же Event-Id — поэтому Deduplicator дедуплицирует именно по нему.
  • Correlation-Id (Metadata["Correlation-Id"]) — общий для всей логической цепочки. Один HTTP-запрос → 3 события → 2 downstream события в других сервисах → все 5 несут один Correlation-Id. Не уникален per message.

Не дедуплицируй по Correlation-Id. Это приведёт к тому, что второе и третье события одной цепочки будут выглядеть как дубли первого — и будут молча проглочены.

Детали в ../glossary.md.

Monitoring

Метрики

messages_deduplicated_total{service, topic, handler}
    — counter, сколько сообщений Deduplicator отверг как дубли.
messages_processed_total{service, topic, handler, result}
    — counter, общее количество; result ∈ {success, error, deduplicated}.
deduplication_check_duration_seconds{service}
    — histogram, latency Redis SETNX.
deduplication_errors_total{service, reason}
    — counter, reason ∈ {redis_timeout, redis_conn, marshal}.

Middleware, выставляющий эти метрики, — в pkg/eventmw/ сервис-репо. Подключается к Deduplicator.Repository wrapper'ом.

Alerting

  • Рост дублей. Alert: rate(messages_deduplicated_total[5m]) / rate(messages_processed_total[5m]) > 0.1 в течение 10 минут → ticket. Нормальный фон — меньше 1% (редкий retry). 10%+ = upstream publish дублирует систематически (баг в forwarder'е, баг в publisher'е).
  • Redis-outage. Alert: rate(deduplication_errors_total[5m]) > 0 → ticket. Под fail-open не ломает сервис, но надо разобраться.
  • Латенси Redis. Alert: histogram_quantile(0.99, deduplication_check_duration_seconds) > 0.05 (50ms на p99) → ticket. Redis должен отвечать быстро; медленный Redis тормозит все handler'ы.

Testing

Unit

Подставь fake DedupRepo, проверь, что второй вызов handler'а с тем же UUID возвращает true (дубль).

type fakeRepo struct {
    seen map[string]bool
}

func (r *fakeRepo) IsDuplicate(ctx context.Context, key string, ttl time.Duration) (bool, error) {
    if r.seen[key] {
        return true, nil
    }
    r.seen[key] = true
    return false, nil
}

func TestDeduplicator_SecondCallIsDuplicate(t *testing.T) {
    repo := &fakeRepo{seen: map[string]bool{}}
    dup, _ := repo.IsDuplicate(ctx, "evt-1", time.Hour)
    if dup {
        t.Fatal("first call must not be duplicate")
    }
    dup, _ = repo.IsDuplicate(ctx, "evt-1", time.Hour)
    if !dup {
        t.Fatal("second call must be duplicate")
    }
}

Integration через gochannel

Проверяем интеграцию handler'а + Deduplicator'а + router'а в in-memory режиме:

func TestRouter_DuplicatesIgnored(t *testing.T) {
    pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil)
    defer pubsub.Close()

    fakeSvc := &fakeNotificationService{}
    repo := &fakeRepo{seen: map[string]bool{}}
    dedup := middleware.Deduplicator{
        KeyFactory: func(m *message.Message) (string, bool, error) { return m.UUID, true, nil },
        Repository: repo,
        Timeout:    time.Hour,
    }

    router, _ := message.NewRouter(message.RouterConfig{}, nil)
    router.AddMiddleware(dedup.Middleware)
    router.AddNoPublisherHandler("t", "topic", pubsub, newHandler(fakeSvc).Handle)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    go func() { _ = router.Run(ctx) }()
    <-router.Running()

    for i := 0; i < 3; i++ {
        msg := message.NewMessage("evt-1", []byte(`{"x":1}`))
        msg.Metadata.Set("Event-Type", "foo")
        _ = pubsub.Publish("topic", msg)
    }

    eventually(t, 2*time.Second, func() bool { return fakeSvc.handled == 1 })
    // Три публикации с одним UUID → handler вызван один раз.
}

Integration с реальным Redis

Testcontainers Redis + RedisRepo. Проверь: два SETNX с одним ключом — второй возвращает «уже был». TTL истекает — после истечения можно снова. См. ../conventions/testing.md про testcontainers.

Anti-patterns

Нет dedup и handler не идемпотентен

Классическая ошибка новичка. Retry на первой же временной ошибке → handler отработал дважды → в БД два ряда, в FCM два push'а. Обнаружится только под продовой нагрузкой. Всегда Deduplicator + идемпотентный handler.

Dedup по business-key вместо Message.UUID

// Плохо
key := fmt.Sprintf("user:%d:event:%s:ref:%d", userID, eventType, refID)

Два разных логических события с одним и тем же business-key совпадут. Например, если случилось два review.updated для одного ревью подряд (legitimate — пользователь правил дважды), второе будет молча проглочено. Баг, который очень тяжело поймать.

Единственное исключение: если бизнес требует, чтобы между событиями для одного key было окно дедупликации (например, user. visited.place с окном 1 час — чтобы не спамить). Тогда это не dedup, а rate-limiter по business-key, отдельная функция, не Deduplicator.

Коротое TTL (5 минут)

Не покрывает retry-окно. middleware.Retry с 5 попытками и MaxInterval: 30s + jitter растягивается до 3–5 минут. 5-минутное TTL — на грани, любое залипание (Kafka rebalance 1 мин, Redis latency spike 30s) пробивает окно. Минимум 1 час, по умолчанию — 24 часа.

Local in-process cache вместо Redis

var seenMu sync.Mutex
var seen = map[string]bool{} // ПЛОХО

При multi-pod деплое каждая реплика имеет свой seen. Событие пришло на реплику A, обработалось, offset не закоммитился, Kafka сделала rebalance, то же сообщение попало на реплику B — у неё seen пустой, обработает снова. Нужен shared store, Redis или БД.

Pod-local с fallback в БД

Смесь предыдущих двух: local cache + при miss проверка в Postgres. Сложная логика, маленькая польза. Redis один раз настроенный и общий — проще и надёжнее.

Dedup before Retry

// Плохо: dedup перед retry
router.AddMiddleware(
    dedup.Middleware,     // первый
    retry.Middleware,     // второй
)

Retry внутри одного handler-вызова генерирует «вторую попытку» того же сообщения (тот же UUID) → dedup считает её дублем → retry не срабатывает. Dedup всегда после Retry в цепочке middleware.

Deduplicator подключили, handler не идемпотентен

Работает, пока Redis доступен. В день, когда Redis ляжет на 10 минут, fail-open пропустит несколько сообщений через неидемпотентный handler → дубликаты. Deduplicator — первая линия, идемпотентный handler — вторая.

FAQ

«Можно ли добиться exactly-once?» В распределённой системе с сетевыми сбоями — нет, реально невозможно. Best achievable — at-least- once + idempotency на consumer-сайде. Это и даёт «effectively once» для наблюдателя.

«Почему не использовать kafka.transactional.id для exactly-once?» Kafka transactional producer даёт exactly-once только в цепочке Kafka→Kafka (например, stream processor). Наш кейс — outbox (Postgres → Kafka → Postgres) — за границы transactional-семантики Kafka, не покрывается. Нужна своя idempotency.

«Zero duplicates возможен?» Нет. Процент дублей можно снизить до долей процента, но не до нуля. Замеряй messages_deduplicated_total / messages_processed_total и ставь alert на аномалии.

«Можно ли использовать consumer offset как idempotency ключ?» Нет. Offset — позиция в партиции, не уникальный id сообщения. Rebalance изменяет соответствие offset → сообщение.

«А если Redis полностью недоступен неделю?» При fail-open сервис работает, обработки дублей могут быть. Основной план — fix Redis. Нет смысла строить dedup-fallback в Postgres, который работает неделю — это отдельная задача с отдельной сложностью, и её redis-outage на неделю ты всё равно заметишь по остальным симптомам.

«Что делать, если handler частично успешен — БД обновлена, push не отправлен?» Это отдельная задача про distributed side effects. Решается через outbox+retry для каждого побочного эффекта отдельно (push-queue как свой outbox). Здесь это выходит за рамки темы; но сама консистентность handler'а внутри одной БД-транзакции — обеспечивается InTx.

Связанные разделы