Skip to Content
PatternsTransactional outbox

Transactional Outbox

Deep-dive по паттерну. Если ты ищешь «как опубликовать событие по шагам» — открой ../how-to/add-kafka-event. Если ищешь «какие поля в envelope» — открой ../conventions/events. Эта страница отвечает на другой вопрос: как outbox работает под капотом, как его масштабировать, мониторить и как вытащить сервис из состояния, когда что-то сломалось.

TL;DR

Сверка для быстрого review или когда нужно вспомнить основы:

  • Зачем: устраняет dual-write между Postgres и Kafka (см. §Что это). Без outbox — либо «коммит без события», либо «событие без коммита».
  • Гарантия: at-least-once в Kafka. Consumer обязан быть идемпотентным (см. idempotent-consumer). Exactly-once в Postgres→Kafka не бывает; не планируется.
  • Запись: InTx { repo.CreateTx(tx) + outboxPub.PublishTx(tx) }. Одна транзакция, две строки. Внутри InTxтолько Postgres, никаких HTTP / Kafka / Redis вызовов.
  • Публикация: forwarder = watermill-sql subscriber + kafka publisher + components/forwarder. Свой outbox-worker не пишем (см. §Anti-patterns).
  • Schema таблицы: offset_consumed (BIGSERIAL), offset_acked (BIGINT, NULL до публикации), uuid, payload, metadata, transaction_id (xid8). Partial-index WHERE offset_acked IS NULL. IS NULL, не = NULL (см. §Schema).
  • Ordering: per-aggregate порядок в Kafka даёт partition_key = aggregate_id, не outbox сам по себе.
  • Cleanup: acked-строки удаляются через 7 дней (CronJob, батчами по 10k). Unacked никогда не удаляются.
  • SLO: forwarder publish p99 < 5s, backlog < 100 устойчиво, heartbeat < 30s. Alerts — см. ../conventions/slo-and-budget#outbox-forwarder.
  • Runbook на lag: ../how-to/debug-outbox-lag.

Содержание

Что это и зачем

Outbox решает проблему dual-write: бизнес-операция пишет в Postgres и публикует событие в Kafka, и эти два действия должны быть согласованы. Без outbox возможны две поломки:

  1. Закоммитили транзакцию в БД → перед Publish процесс упал / сеть оборвалась → событие потеряно, downstream-сервисы не увидят факт, который уже отражён в БД источника.
  2. Опубликовали сначала в Kafka → транзакция откатилась → событие фантомное, downstream отреагирует на изменение, которого нет.

Outbox устраняет оба случая. В одной транзакции с бизнес-записью пишется строка в таблицу outbox в той же БД. Отдельный процесс (forwarder) забирает строки и публикует в Kafka с at-least-once гарантиями. Downstream обязан быть идемпотентным (см. idempotent-consumer) — это плата за простоту и атомарность записи в источнике.

Гарантии:

  • Атомарность записи бизнес-состояния и факта события — обеспечивает БД через общую транзакцию.
  • At-least-once доставки — обеспечивает forwarder: пока строка не помечена как published, её будут переопубликовывать.
  • Порядок per-aggregate в Kafka — обеспечивает partition key (см. §Ordering), а не outbox сам по себе.

Exactly-once здесь нет и не будет. Kafka даёт at-least-once, outbox повторяет эту семантику на своём уровне.

Когда использовать

Правило по умолчанию: всегда через outbox, если событие отражает изменение доменного состояния сервиса. Любой факт, который потом захочет прочитать другой сервис или аналитика, должен пройти через outbox: user.registered, user.banned, review.updated, photo.uploaded, moderation.approved.

Второе правило: если ты написал в БД и дальше дёрнул Kafka напрямую — ты создал dual-write. Даже если «всё работает на стенде». Починить эту логическую ошибку позже будет дороже, чем написать через outbox с самого начала.

Когда НЕ использовать

  • Ephemeral сигналы без привязки к DB-state. Пример: WebSocket-события «пользователь печатает», «активен на этой странице», live-позиция курсора. У них нет соответствующего изменения в БД, и потеря одной такой нотификации не ломает систему.
  • Fan-out внутри одного сервиса. Если publisher и subscriber живут в одном процессе и запись в БД им не нужна как общий факт — используй gochannel in-process pub/sub, не гоняй событие через Kafka.
  • Технические метрики. Prometheus counter, не Kafka event.

Архитектура

Ключевые свойства:

  • Forwarder — отдельный worker-loop; либо внутри того же процесса сервиса (goroutine), либо отдельный Deployment. Мы по умолчанию запускаем goroutine в cmd/server/main.go — это проще и достаточно.
  • Transport из outbox в Kafka — через watermill-sql (subscriber) + watermill-kafka (publisher), склеенные watermill/components/forwarder.
  • Сам forwarder код не пишем — используем готовый из Watermill. Попытка написать свой (как в репозитории сервиса user, internal/event/worker.go) сразу становится антипаттерном: см. §Anti-patterns.

Schema outbox-таблицы

Схема совпадает с DefaultPostgreSQLSchema из watermill-sql/v3. DDL кладётся отдельной миграцией в каждый сервис-репо. Таблица создаётся в БД сервиса в схеме public (Postgres default), без префикса с именем сервиса:

-- review/migrations/XXX_outbox.up.sql BEGIN; SELECT pg_advisory_xact_lock(hashtext('review_migrations')); CREATE TABLE outbox ( offset_acked BIGINT, offset_consumed BIGSERIAL NOT NULL, uuid VARCHAR(36) NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), payload JSON, metadata JSON, transaction_id xid8 NOT NULL DEFAULT pg_current_xact_id(), CONSTRAINT outbox_pk PRIMARY KEY (transaction_id, offset_consumed) ); CREATE INDEX idx_outbox_acked ON outbox (offset_acked) WHERE offset_acked IS NULL; COMMIT;

Пояснение полей:

  • offset_consumed — локальный монотонный номер в пределах таблицы (BIGSERIAL). Используется для сортировки при чтении.
  • offset_acked — выставляется в значение offset_consumed после успешной публикации. NULL — ещё не опубликовано.
  • uuid — ULID (Message.UUID), тот же, что доедет до Kafka и будет ключом в Deduplicator’е на consumer’е. Строится через eventmeta.New (см. ../conventions/events).
  • payload — JSON-тело события (ReviewCreatedV1, сериализованный в JSON).
  • metadata — JSON с envelope-заголовками (Event-Type, Schema-Version, Correlation-Id, Source-Service, Published-At, traceparent).
  • transaction_idxid8, идентификатор Postgres-транзакции. Именно он в связке с offset_consumed даёт корректный порядок чтения forwarder’ом: строки из одной транзакции читаются пачкой, строки из незакоммиченных транзакций пропускаются до момента их commit’а.
  • PRIMARY KEY (transaction_id, offset_consumed) — уникальность строки и естественный порядок для cursor-based чтения.

Partial-index по offset_acked IS NULL нужен, чтобы forwarder мог сканировать только неопубликованные строки, а не всю таблицу.

IS NULL, не = NULL. В SQL offset_acked = NULL — это всегда UNKNOWN (не true), поэтому такой предикат никогда не сматчит ни одной строки — ни в WHERE, ни в partial-index. Код, случайно написавший WHERE offset_acked = NULL, молча не увидит unacked-строки (forwarder остановится, backlog расти), а Postgres не выдаст ошибку.

Правило: во всех запросах к outbox (partial-index, forwarder- selects, cleanup, debug-query из runbook’а) используй только IS NULL / IS NOT NULL. linter-rule в code-review: любое = NULL / != NULL в SQL — ошибка, independently от таблицы.

Имя таблицы — всегда outbox, живёт в БД сервиса (в схеме public, без префикса с именем сервиса). Каждый сервис — отдельная БД, поэтому имена таблиц не конфликтуют. Одна таблица outbox на сервис. Одна таблица outbox на несколько сервисов — антипаттерн (см. §Anti-patterns).

Publisher side

Запись в outbox происходит внутри той же транзакции, что и бизнес-запись. Код в service-слое:

func (s *ReviewService) Create(ctx context.Context, cmd CreateReviewCommand) (*domain.Review, error) { var r *domain.Review err := s.db.InTx(ctx, func(tx pgx.Tx) error { saved, err := s.reviews.CreateTx(ctx, tx, cmd.UserID, cmd.PlaceID, cmd.Rating, cmd.Text) if err != nil { return err } r = saved payload := event.ReviewCreatedV1{ ReviewID: saved.ID, PlaceID: saved.PlaceID, UserID: saved.UserID, Rating: saved.Rating, } msg, err := eventmeta.New(ctx, eventmeta.Envelope{ EventType: "review.created", SchemaVersion: "1", Source: "review", Payload: payload, }) if err != nil { return err } return s.outboxPublisher.PublishTx(ctx, tx, "outbox", msg) }) if err != nil { return nil, fmt.Errorf("create review: %w", err) } return r, nil }

Разобьём по кусочкам:

  • eventmeta.New строит *message.Message с заполненным Message.UUID (ULID), envelope-metadata (Event-Type, Schema-Version, Correlation-Id, Source-Service, Published-At, traceparent из OTel). Код helper’а — в ../conventions/events.
  • s.outboxPublisher — это watermill-sql Publisher, настроенный писать в таблицу outbox. Метод PublishTx — тонкий wrapper, который пишет строку, используя переданный pgx.Tx.
  • Первый аргумент "outbox" в PublishTxне Kafka-topic. Это логическое «имя канала» внутри outbox-таблицы. Мы всегда передаём "outbox" — по одной на сервис. Реальный Kafka-topic определится позже, в forwarder’е, по envelope-metadata.
  • InTx коммитит или откатывает обе записи атомарно. Если Publish упал — откатится и вставка в reviews. Если вставка в reviews упала — вставка в outbox тоже не произошла.

Правило: внутри InTx никогда не делай прямых Kafka-вызовов, HTTP-вызовов, внешних побочных эффектов. Только БД-операции через переданный tx. Детали — ../conventions/db-pgx.

Forwarder

Forwarder — компонент watermill/components/forwarder. Он читает строки из outbox-таблицы и публикует в Kafka.

Setup

import ( watermillSQL "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" "github.com/ThreeDotsLabs/watermill/components/forwarder" "github.com/ThreeDotsLabs/watermill/message" ) // 1. Subscriber поверх Postgres — читает outbox. sqlSub, err := watermillSQL.NewSubscriber(stdDB, watermillSQL.SubscriberConfig{ SchemaAdapter: watermillSQL.DefaultPostgreSQLSchema{ GenerateMessagesTableName: func(topic string) string { return "outbox" }, }, OffsetsAdapter: watermillSQL.DefaultPostgreSQLOffsetsAdapter{}, InitializeSchema: false, // схема создаётся миграцией PollInterval: 500 * time.Millisecond, ConsumerGroup: "review-outbox-forwarder", }, watermillLogger) // 2. Kafka publisher — пишет в реальный Kafka. kafkaPub, err := kafka.NewPublisher( kafka.PublisherConfig{Brokers: cfg.Kafka.Brokers, Marshaler: kafka.DefaultMarshaler{}}, watermillLogger, ) // 3. Router для применения middleware к forwarder'у. router, err := message.NewRouter(message.RouterConfig{}, watermillLogger) // 4. Сам forwarder. fwd, err := forwarder.NewForwarder(sqlSub, kafkaPub, watermillLogger, forwarder.Config{ ForwarderTopic: "outbox", // то же имя, что используется в PublishTx Router: router, Middlewares: []message.HandlerMiddleware{ middleware.Recoverer, otelMiddleware.Trace(tracer), // пробрасывает traceparent в Kafka outboxMetricsMiddleware, // publish_duration + errors }, }) go func() { _ = fwd.Run(ctx) }() go func() { _ = router.Run(ctx) }()

Параметры, на которые обращай внимание:

  • PollInterval — как часто forwarder делает запрос к outbox. Default 500ms. Снижение (100ms) даёт меньшую end-to-end задержку, но увеличивает нагрузку на БД. Увеличение (2s) экономит ресурсы, но задерживает публикацию на столько же.
  • ConsumerGroup — имя consumer-group’ы watermill-sql для offsets-таблицы. У разных forwarder’ов должна быть разная группа, чтобы они не перебивали друг другу offset’ы.
  • ForwarderTopic — должен совпадать с topic’ом, который мы передаём в PublishTx на publisher-стороне.

Как forwarder выбирает Kafka-topic

Forwarder читает строку из outbox и публикует её в Kafka. Имя Kafka- топика берётся из envelope-metadata. Стандартный маппинг:

Kafka topic = "kazmaps." + Source-Service + "." + Event-Type

Пример: Source-Service=review, Event-Type=review.createdkazmaps.review.review.

Маппинг реализуется через middleware forwarder’а: handler читает msg.Metadata, выставляет целевой topic через механизм forwarder.Config.DecorateMessage (или через явный publisher-wrapper, который принимает topic как функцию от *message.Message). Формат имён — ../conventions/events.

Конкурентность и advisory locks

watermill-sql при чтении outbox использует SELECT ... FOR UPDATE SKIP LOCKED. Это безопасно при нескольких forwarder-инстансах: две реплики могут параллельно читать разные блоки строк, не блокируя друг друга. Внутри watermill-sql поверх этого есть advisory-lock, защищающий offsets-таблицу от гонок при обновлении позиции.

Это не значит, что надо специально поднимать несколько forwarder’ов. По умолчанию — один forwarder на сервис (одна реплика cmd/server/main.go, одна goroutine). Если throughput outbox превысит возможности одной реплики — тогда отдельный forwarder-deployment, но это редкий кейс.

Ordering guarantees

Это частый источник недопонимания.

  • Между разными транзакциями в пределах одного outbox: строки упорядочены по (transaction_id, offset_consumed), forwarder публикует в этом порядке. Если транзакция T2 закоммитилась после T1, её события уйдут в Kafka после событий T1.
  • Внутри одной транзакции: порядок по offset_consumed — в каком порядке вызывали PublishTx, в таком порядке окажется в Kafka.
  • Per-aggregate ordering в Kafkaне гарантируется автоматически outbox’ом. Если тебе важно, чтобы review.updated для одного ревью пришёл после review.created для того же ревью, ты должен обеспечить две вещи:
    1. Оба события идут через outbox одного и того же сервиса (✓ по определению — сервис-владелец aggregate’а).
    2. В Kafka partition key = aggregate_id. Kafka гарантирует порядок только внутри одной партиции. Если review.created и review.updated попадут в разные партиции, consumer увидит их в произвольном порядке.

Partition key выставляется forwarder-middleware при публикации в Kafka:

outboxMiddleware := func(h message.HandlerFunc) message.HandlerFunc { return func(msg *message.Message) ([]*message.Message, error) { if aggID := msg.Metadata.Get("Aggregate-Id"); aggID != "" { msg.Metadata.Set("partition_key", aggID) } return h(msg) } }

Kafka publisher читает partition_key и хэширует его в номер партиции. Один aggregate → одна партиция → корректный порядок.

Не пытайся делать «свою логику упорядочивания» поверх outbox (таблица «ожидающих публикации», проверка last_seq и т.п.). Выбери правильный partition key, прими at-least-once и сделай consumer’ов идемпотентными.

Monitoring

Метрики

outbox_rows_total{service, status="unacked"} — gauge, количество неопубликованных строк. outbox_forwarder_lag_seconds{service} — histogram, now() - MIN(created_at) по unacked, замеряется раз в тик. outbox_forwarder_publish_duration_seconds{service, topic, result} — histogram, длительность kafka.Publish. outbox_forwarder_errors_total{service, reason} — counter, reason ∈ {kafka_unavailable, marshal, db_read, db_ack}. events_published_total{service, topic, event_type} — counter, инкрементируется после успешной публикации.

Выставляет эти метрики forwarder-middleware; код middleware живёт в pkg/eventmw/ сервис-репо. Подключение — в setup-блоке (см. ../conventions/observability).

Alerting

Шаблоны alert-правил (per-сервис, лежат в infra-репо, здесь — как приёмочные критерии):

  • Forwarder падает или не может публиковать. Alert: rate(outbox_forwarder_errors_total[5m]) > 0 в течение 10 минут → ticket.
  • Лаг растёт. Alert: outbox_forwarder_lag_seconds > 60 в течение 5 минут → ticket. Диагностика — ../how-to/debug-outbox-lag.
  • Backlog накапливается. Alert: outbox_rows_total{status="unacked"} > 1000 → ticket.
  • Совсем нет публикаций при наличии трафика. Alert: rate(http_requests_total{endpoint="/v1/reviews", method="POST", status="201"}[5m]) > 0 и rate(events_published_total{event_type="review.created"}[5m]) == 0 → страница.

Трейсинг

OTel-middleware в forwarder’е читает traceparent из envelope-metadata и продолжает тот же trace, что создал HTTP-handler на входе. В Tempo на одном waterfall видны: HTTP span → DB insert span → kafka.Publish span. Это и есть основной способ разобрать «почему между HTTP-ответом и появлением события в Kafka прошло 3 секунды».

Cleanup и retention

Таблица outbox не растёт бесконечно. Acked-строки удаляются регулярно, unacked — никогда. Цель раздела — определить retention window, способ удаления, защиту от bloat и мониторинг.

Retention window

Acked сообщения удаляются после 7 дней. Это компромисс:

  • Достаточно для replay при инциденте. Типичный срок расследования «почему downstream-сервис не увидел событие» — 1–3 рабочих дня; запас на выходные и праздники даёт 7 дней.
  • Не слишком долго для table bloat. При 10 тыс. событий в день таблица содержит до 70 тыс. строк на сервис — комфортно для индекса и VACUUM’а.
  • Кратно retention Kafka-топика (по умолчанию тоже 7 дней), что даёт понятное правило: после истечения Kafka-retention replay невозможен в любом случае.

Меньше 3 дней — нельзя: инцидент в пятницу разбирается в понедельник, payload нужен. Больше 30 дней — избыточно, начинает страдать VACUUM и размер индекса.

Как удалять

CronJob в Kubernetes, расписание 0 3 * * * (ночью, после off-peak). Job вызывает internal HTTP-endpoint сервиса /internal/outbox/cleanup или напрямую выполняет SQL (зависит от сервиса — endpoint проще для observability, SQL проще для инфры):

DELETE FROM outbox WHERE offset_acked IS NOT NULL AND created_at < NOW() - INTERVAL '7 days';

Удаление — батчами по 10 000 строк, чтобы не держать длинную блокировку и не раздувать WAL одной большой транзакцией:

WITH victims AS ( SELECT transaction_id, offset_consumed FROM outbox WHERE offset_acked IS NOT NULL AND created_at < NOW() - INTERVAL '7 days' ORDER BY transaction_id, offset_consumed LIMIT 10000 ) DELETE FROM outbox o USING victims v WHERE o.transaction_id = v.transaction_id AND o.offset_consumed = v.offset_consumed;

Job крутит этот DELETE в цикле, пока количество удалённых строк не станет меньше 10 000 (= мы добрали «хвост»). Между итерациями — пауза 500 ms, чтобы не доминировать на I/O.

Правило: никогда не удаляй unacked-строки. Если строка не опубликована — её должен обработать forwarder. Удалить = потерять событие безмолвно.

Пример CronJob YAML

apiVersion: batch/v1 kind: CronJob metadata: name: review-outbox-cleanup namespace: review spec: schedule: "0 3 * * *" concurrencyPolicy: Forbid successfulJobsHistoryLimit: 3 failedJobsHistoryLimit: 3 jobTemplate: spec: backoffLimit: 2 activeDeadlineSeconds: 1800 template: spec: restartPolicy: OnFailure containers: - name: cleanup image: kazmaps/review-service:stable args: ["outbox-cleanup"] env: - name: OUTBOX_RETENTION_DAYS value: "7" - name: OUTBOX_BATCH_SIZE value: "10000" - name: OUTBOX_BATCH_SLEEP_MS value: "500" - name: DATABASE_URL valueFrom: secretKeyRef: name: review-db key: url resources: requests: cpu: 100m memory: 128Mi limits: cpu: 500m memory: 256Mi

Ключевые поля:

  • schedule: "0 3 * * *" — ночной запуск.
  • concurrencyPolicy: Forbid — если предыдущий cleanup ещё не доделал, новый не стартует. Длинный cleanup — сигнал о проблеме.
  • backoffLimit: 2 — две попытки на retry внутри одного Job’а, потом failure и alert.
  • activeDeadlineSeconds: 1800 — 30 минут hard cap. Не успел — что-то не так (size blowup, lock contention).
  • restartPolicy: OnFailure — перезапускать контейнер при падении внутри Job’а.

Сервис (образ kazmaps/review-service) запускается с подкомандой outbox-cleanup — это CLI-mode main’а, который читает env-переменные ретеншена, выполняет DELETE-цикл и завершается. Детали CLI — ../conventions/configuration.

VACUUM

После массового DELETE Postgres не отдаёт место на диск автоматически — строки становятся dead tuples, таблица продолжает расти. Отдельный CronJob раз в неделю (например, 0 4 * * 0 — ночь воскресенья):

VACUUM (VERBOSE, ANALYZE) outbox;

Параметры:

  • Обычный VACUUM (не FULL) — не блокирует запись, но не отдаёт место на диск, только помечает dead tuples как переиспользуемые. Для наших объёмов — достаточно.
  • VACUUM FULL не использовать: он берёт exclusive lock и переписывает всю таблицу. На живой outbox с forwarder’ом это приведёт к лагу публикации на время VACUUM’а.
  • ANALYZE обновляет статистику планировщика; полезно, если индекс idx_outbox_acked начинает использоваться subоptимально.

Если размер таблицы продолжает расти (см. метрики bloat ниже) несмотря на VACUUM — запускаем pg_repack в отдельное maintenance-окно. Это уже ручная процедура, не cron.

Метрики bloat

outbox_rows_total{service, status="acked"} — gauge, количество acked-строк. Собирается периодическим SELECT count(*) WHERE offset_acked IS NOT NULL. outbox_rows_total{service, status="pending"} — gauge, количество unacked-строк. SELECT count(*) WHERE offset_acked IS NULL. outbox_table_bytes{service} — gauge, pg_relation_size('outbox'). Из postgres_exporter.

Metric-collector в сервисе раз в 60 секунд выполняет два count(*) и expose’ит их в /metrics. pg_relation_size — через postgres_exporter (живёт отдельно, часть инфры).

Alerts:

  • outbox_rows_total{status="acked"} > 1_000_000 в любой момент → ticket. Cleanup сломан (CronJob упал, retention переконфигурирован на больше чем надо, DELETE блокируется).
  • outbox_rows_total{status="pending"} > 10_000 for 5m → page. Forwarder залип: публикация не успевает за входящим трафиком либо Kafka недоступен. Диагностика — ../how-to/debug-outbox-lag.
  • outbox_table_bytes растёт несмотря на DELETE — запускаем ручной pg_repack в maintenance-окно.

Правила сверху:

  • Никогда не удаляй unacked-строки. См. выше.
  • Не делай retention меньше 3 дней. Теряешь возможность replay при weekend-инциденте.
  • Не клади cleanup в сам сервис (как goroutine внутри main’а). Это инфраструктурная задача с отдельным lifecycle: отдельный image-tag не нужен, зато отдельная observability и отдельный alert на failure’ы Job’а. CronJob с образом сервиса и аргументом outbox-cleanup — правильный компромисс: один релиз, разный режим запуска.

Failure modes

СценарийПоведениеЧто делать
Forwarder-goroutine упала (panic)middleware.Recoverer поймал, goroutine перезапустилась (если wrapped в supervisor). Без supervisor — упала навсегда до рестарта pod’а.Всегда оборачивай fwd.Run(ctx) в retry-loop или полагайся на middleware.Recoverer + log. Alert на forwarder_errors.
Kafka недоступенPublish возвращает ошибку, строка в outbox остаётся unacked. Forwarder retry’ит на следующем тике. Лаг растёт.Alert на lag → диагностика Kafka → восстановление. Строки дрейн’ятся после восстановления.
DB pool исчерпанForwarder ждёт свободное соединение → лаг растёт → alert.Выдели forwarder’у отдельный pool (3–5 соединений), изолированный от HTTP-handler’ов. Конфиг — в internal/config/.
Две реплики forwarder’аSELECT FOR UPDATE SKIP LOCKED + advisory lock защищают. Но дубли доставки в Kafka возможны на границах.По умолчанию — один forwarder на сервис. Если нужен HA — полагайся на k8s перезапуск pod’а, а не на multi-writer.
Падение между DB commit и публикациейСтрока в outbox уже коммитнута, в Kafka ещё не ушло → forwarder возьмёт её на следующем тике.Ничего делать не надо, это штатный путь at-least-once.
Kafka принял, но forwarder упал до UPDATE offset_ackedПри рестарте forwarder прочитает ту же строку снова → дубль в Kafka.Ничего делать не надо. Consumer дедуплицирует по Message.UUID через Deduplicator middleware (см. idempotent-consumer).
Злой sql-запрос в outbox-таблице (DELETE вручную)Потеряны события.Запрещено делать руками. Только cleanup через CronJob. Доступ в прод DB — через pass-through, не direct.

Testing

Unit через gochannel

Для теста бизнес-handler’а — in-memory pub-sub. Publisher подставляется как gochannel.GoChannel, реальный outbox не нужен.

func TestReviewService_Create_PublishesEvent(t *testing.T) { pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil) defer pubsub.Close() sub, _ := pubsub.Subscribe(context.Background(), "outbox") svc := service.NewReviewService(fakeRepo, fakeDB, pubsub) _, err := svc.Create(ctx, service.CreateReviewCommand{UserID: 42, PlaceID: 7, Rating: 5}) if err != nil { t.Fatalf("create: %v", err) } select { case msg := <-sub: if got := msg.Metadata.Get("Event-Type"); got != "review.created" { t.Fatalf("event-type: %q", got) } case <-time.After(2 * time.Second): t.Fatal("event not published") } }

Такой тест покрывает: события строятся, envelope корректен, publish вызывается. Он не проверяет, что строка попала в Postgres-таблицу — для этого integration-тест.

Integration через testcontainers

Поднимается реальный Postgres, применяются миграции, в тесте service.Create → тест читает SELECT FROM outbox и убеждается, что строка появилась с правильным payload’ом.

func TestReviewService_Create_WritesOutboxRow(t *testing.T) { pool := setupPostgres(t) // testcontainers Postgres + migrations stdDB := stdlib.OpenDBFromPool(pool) // *sql.DB поверх pgx для watermill-sql outboxPub := newOutboxPublisher(t, stdDB) svc := service.NewReviewService( postgres.NewReviewRepo(pool), pkgdb.New(pool), outboxPub, ) _, err := svc.Create(ctx, service.CreateReviewCommand{UserID: 42, PlaceID: 7, Rating: 5}) if err != nil { t.Fatalf("create: %v", err) } var count int err = pool.QueryRow(ctx, `SELECT count(*) FROM outbox WHERE metadata->>'Event-Type' = 'review.created'`, ).Scan(&count) if err != nil || count != 1 { t.Fatalf("outbox rows: got %d, err %v", count, err) } }

End-to-end

Testcontainers Postgres + testcontainers Kafka + forwarder внутри того же теста. svc.Create → подождать сообщение из Kafka (через Watermill subscriber на тестовой группе). Такой тест держим в build-tag’е integration, запускается в отдельном Makefile-target’е.

Подробнее про testcontainers — ../conventions/testing.

Anti-patterns

Прямой kafka.Publisher.Publish в handler

// ПЛОХО: classic dual-write func (s *ReviewService) Create(ctx context.Context, cmd CreateReviewCommand) (*domain.Review, error) { r, err := s.reviews.Create(ctx, cmd) // БД if err != nil { return nil, err } return r, s.kafkaPub.Publish("kazmaps.review.review", buildMsg(r)) // Kafka }

Эта схема выглядит работающей на стенде и в unit-тестах. Она ломается только в проде под сетевыми сбоями: Kafka недоступна 200ms, а БД уже закоммичена → событие потеряно, никто не узнает. Пример такой ошибки — в репозитории сервиса review, файл internal/event/publisher.go: Publisher работает напрямую с kafka.Publisher, вызывается из handler’а после commit’а транзакции. Это надо переписать на outbox (см. §Migration).

Свой outbox-worker с ручным SELECT FOR UPDATE

В репозитории сервиса user, файлы internal/repository/postgres/outbox.go и internal/event/worker.go — ручной outbox: FetchUnpublished + MarkPublished по id-списку.

Почему это антипаттерн:

  • FetchUnpublished использует pool.Query(...) с FOR UPDATE SKIP LOCKED. Но pgx.Query без явной транзакции возвращает соединение в пул после закрытия rows — locks снимаются в момент автокоммита. В результате FOR UPDATE ничего не защищает: вторая реплика увидит те же строки «unlocked» и опубликует их дубли.
  • MarkPublished использует другое соединение из пула, отдельной транзакцией — то есть нет гарантии, что строка, которую мы пометили как published, это именно та строка, которую забирал FetchUnpublished.
  • Этот баг не ловится юнит-тестами (pool=1 connection, локи ведут себя иначе) и не ловится integration-тестом на одной реплике. Ловится только под продовой нагрузкой на нескольких репликах → дубли в Kafka → consumer-сайд падает от duplicates или молча повторяет side-effect’ы.

Вместо этого: всегда watermill-sql subscriber + forwarder. Они используют правильные транзакционные границы и offsets-таблицу для tracking’а.

Нет monitoring

Событие потерялось, retention в Kafka 7 дней, через месяц downstream сервис показывает «неправильные счётчики» и никто не понимает почему. Alert на outbox_forwarder_lag_seconds и outbox_rows_total{status="unacked"} закрывает этот класс проблем.

Агрессивный cleanup

DELETE FROM outbox WHERE created_at < NOW() - INTERVAL '1 hour' — после любого инцидента невозможно replay события. Не меньше 3 дней, рекомендуем 7.

Один общий outbox на все сервисы

«Возьмём один общий Postgres, одну таблицу outbox, все сервисы пишут туда» — ломает database-per-service. Любой сбой в этой общей БД блокирует все publish’ы всех сервисов. Каждый сервис — свой outbox в своей БД.

PublishTx без InTx

// ПЛОХО: нет транзакции, есть dual-write s.reviews.Create(ctx, r) s.outboxPub.Publish("outbox", msg) // не PublishTx, не внутри tx

Мы потеряли атомарность: если между этими двумя строчками упадёт процесс, БД-строка есть, outbox-строки нет. Всегда InTx + PublishTx.

Tx снаружи InTx, ручной BEGIN/COMMIT

tx, _ := pool.Begin(ctx) defer tx.Rollback(ctx) // ... много кода ... tx.Commit(ctx)

Проблема — легко забыть return после ошибки, легко забыть defer. Используй helper db.InTx — он управляет rollback/commit за тебя. См. ../conventions/db-pgx.

Migration: из no-outbox в outbox

План для сервиса, который сейчас публикует напрямую в Kafka (как review):

  1. Миграция БД. Добавь таблицу outbox по DDL из §Schema. Отдельная миграция, expand-only (ничего не ломается у живого сервиса, таблица пустая).
  2. Добавь outboxPublisher. В cmd/server/main.go создай watermill-sql publisher, пропиши в DI в service-слой.
  3. Dual-publish (временный шаг). В service-слое теперь и старый прямой kafka.Publish, и новый outboxPublisher.PublishTx. Это увеличит нагрузку: consumer получит дубли. Включи Deduplicator-middleware на consumer-сайде, если он ещё не включён.
  4. Включи forwarder. Запусти goroutine. Проверь логи/метрики: events_published_total инкрементится, outbox_rows_total не копится.
  5. Убери прямой publish. Теперь только outbox. В этот момент поломки (если есть) станут видны через outbox_lag_seconds — быстро, локально в сервисе, без последствий для Kafka.
  6. Мониторь неделю. Убедись, что lag стабильный, ошибок нет, retention по cleanup настроен.

Шаги 3–4 критичны: нельзя просто «выключил прямой publish, включил outbox» — если что-то сломается в новой схеме, события теряются безмолвно. Dual-publish на время миграции даёт страховку.

FAQ

«Почему не CDC (Debezium)?» CDC читает WAL Postgres и вытаскивает изменения в Kafka без outbox-таблицы. Это элегантно, но требует: настроенный Kafka Connect, per-table whitelist, версионирование конфигов, отдельный lifecycle. Для наших объёмов прибавочная ценность не окупает сложности. Forwarder — меньше движущихся частей, внутри сервис-репо, без внешних зависимостей, кроме Kafka. Если когда-то выйдем на объёмы, где forwarder перестанет справляться — тогда отдельная задача на CDC. Пока нет.

«Можно ли Kafka использовать как outbox? Писать сразу в Kafka, а оттуда — в БД через consumer?» Нет, это возвращает dual-write: между publish в Kafka и commit в БД может упасть producer → событие опубликовано, БД не обновилась, consumer попытается применить изменение, которого нет в источнике. Outbox именно и решает этот случай.

«Outbox vs Event Sourcing?» Разное. Outbox — механизм доставки событий из источника в шину. Event Sourcing — хранение состояния как журнала событий (событие — первичный факт, состояние — производное). Комбинируются редко: если у тебя ES, тебе outbox не нужен, потому что журнал и есть outbox. У нас — обычные реляционные таблицы как источник правды, outbox как канал наружу.

«Publish делает RPC? Это не медленно?» PublishTx — это INSERT в Postgres, не сетевой вызов в Kafka. Медленно только если outbox- таблица заблокирована, что не штатно. На publisher-сайде задержка измеряется сотнями микросекунд.

«А если мне нужна синхронная операция — ответить клиенту после того, как событие дошло до consumer’а?» Это не outbox-use-case. Outbox — про асинхронное распространение факта. Для синхронного взаимодействия — HTTP internal endpoint (см. api-composition).

«Почему не писать сразу envelope в payload-колонку, а metadata отдельно?» Потому что watermill-sql именно так читает: разделение payload/metadata — требование DefaultPostgreSQLSchema. Если отойдёшь — придётся писать свой SchemaAdapter и поддерживать его. Не надо.

Связанные разделы

Last updated on