Transactional Outbox
Deep-dive по паттерну. Если ты ищешь «как опубликовать событие по шагам» —
открой ../how-to/add-kafka-event. Если
ищешь «какие поля в envelope» — открой
../conventions/events. Эта страница
отвечает на другой вопрос: как outbox работает под капотом, как его
масштабировать, мониторить и как вытащить сервис из состояния, когда
что-то сломалось.
TL;DR
Сверка для быстрого review или когда нужно вспомнить основы:
- Зачем: устраняет dual-write между Postgres и Kafka (см. §Что это). Без outbox — либо «коммит без события», либо «событие без коммита».
- Гарантия: at-least-once в Kafka. Consumer обязан быть идемпотентным (см. idempotent-consumer). Exactly-once в Postgres→Kafka не бывает; не планируется.
- Запись:
InTx { repo.CreateTx(tx) + outboxPub.PublishTx(tx) }. Одна транзакция, две строки. ВнутриInTx— только Postgres, никаких HTTP / Kafka / Redis вызовов. - Публикация: forwarder =
watermill-sql subscriber+kafka publisher+components/forwarder. Свой outbox-worker не пишем (см. §Anti-patterns). - Schema таблицы:
offset_consumed(BIGSERIAL),offset_acked(BIGINT, NULL до публикации),uuid,payload,metadata,transaction_id(xid8). Partial-indexWHERE offset_acked IS NULL.IS NULL, не= NULL(см. §Schema). - Ordering: per-aggregate порядок в Kafka даёт
partition_key = aggregate_id, не outbox сам по себе. - Cleanup: acked-строки удаляются через 7 дней (CronJob, батчами по 10k). Unacked никогда не удаляются.
- SLO: forwarder publish p99 < 5s, backlog < 100 устойчиво,
heartbeat < 30s. Alerts — см.
../conventions/slo-and-budget#outbox-forwarder. - Runbook на lag:
../how-to/debug-outbox-lag.
Содержание
- Что это и зачем
- Когда использовать
- Когда НЕ использовать
- Архитектура
- Schema outbox-таблицы
- Publisher side
- Forwarder
- Ordering guarantees
- Monitoring
- Cleanup и retention
- Failure modes
- Testing
- Anti-patterns
- Migration: из no-outbox в outbox
- FAQ
- Связанные разделы
Что это и зачем
Outbox решает проблему dual-write: бизнес-операция пишет в Postgres и публикует событие в Kafka, и эти два действия должны быть согласованы. Без outbox возможны две поломки:
- Закоммитили транзакцию в БД → перед
Publishпроцесс упал / сеть оборвалась → событие потеряно, downstream-сервисы не увидят факт, который уже отражён в БД источника. - Опубликовали сначала в Kafka → транзакция откатилась → событие фантомное, downstream отреагирует на изменение, которого нет.
Outbox устраняет оба случая. В одной транзакции с бизнес-записью пишется
строка в таблицу outbox в той же БД. Отдельный процесс (forwarder)
забирает строки и публикует в Kafka с at-least-once гарантиями.
Downstream обязан быть идемпотентным (см.
idempotent-consumer) — это плата за
простоту и атомарность записи в источнике.
Гарантии:
- Атомарность записи бизнес-состояния и факта события — обеспечивает БД через общую транзакцию.
- At-least-once доставки — обеспечивает forwarder: пока строка не помечена как published, её будут переопубликовывать.
- Порядок per-aggregate в Kafka — обеспечивает partition key (см. §Ordering), а не outbox сам по себе.
Exactly-once здесь нет и не будет. Kafka даёт at-least-once, outbox повторяет эту семантику на своём уровне.
Когда использовать
Правило по умолчанию: всегда через outbox, если событие отражает
изменение доменного состояния сервиса. Любой факт, который потом
захочет прочитать другой сервис или аналитика, должен пройти через
outbox: user.registered, user.banned, review.updated,
photo.uploaded, moderation.approved.
Второе правило: если ты написал в БД и дальше дёрнул Kafka напрямую — ты создал dual-write. Даже если «всё работает на стенде». Починить эту логическую ошибку позже будет дороже, чем написать через outbox с самого начала.
Когда НЕ использовать
- Ephemeral сигналы без привязки к DB-state. Пример: WebSocket-события «пользователь печатает», «активен на этой странице», live-позиция курсора. У них нет соответствующего изменения в БД, и потеря одной такой нотификации не ломает систему.
- Fan-out внутри одного сервиса. Если publisher и subscriber живут в
одном процессе и запись в БД им не нужна как общий факт — используй
gochannelin-process pub/sub, не гоняй событие через Kafka. - Технические метрики. Prometheus counter, не Kafka event.
Архитектура
Ключевые свойства:
- Forwarder — отдельный worker-loop; либо внутри того же процесса сервиса
(goroutine), либо отдельный Deployment. Мы по умолчанию запускаем
goroutine в
cmd/server/main.go— это проще и достаточно. - Transport из outbox в Kafka — через
watermill-sql(subscriber) +watermill-kafka(publisher), склеенныеwatermill/components/forwarder. - Сам forwarder код не пишем — используем готовый из Watermill. Попытка
написать свой (как в репозитории сервиса
user,internal/event/worker.go) сразу становится антипаттерном: см. §Anti-patterns.
Schema outbox-таблицы
Схема совпадает с DefaultPostgreSQLSchema из watermill-sql/v3. DDL
кладётся отдельной миграцией в каждый сервис-репо. Таблица создаётся в
БД сервиса в схеме public (Postgres default), без префикса с именем
сервиса:
-- review/migrations/XXX_outbox.up.sql
BEGIN;
SELECT pg_advisory_xact_lock(hashtext('review_migrations'));
CREATE TABLE outbox (
offset_acked BIGINT,
offset_consumed BIGSERIAL NOT NULL,
uuid VARCHAR(36) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
payload JSON,
metadata JSON,
transaction_id xid8 NOT NULL DEFAULT pg_current_xact_id(),
CONSTRAINT outbox_pk PRIMARY KEY (transaction_id, offset_consumed)
);
CREATE INDEX idx_outbox_acked
ON outbox (offset_acked)
WHERE offset_acked IS NULL;
COMMIT;Пояснение полей:
offset_consumed— локальный монотонный номер в пределах таблицы (BIGSERIAL). Используется для сортировки при чтении.offset_acked— выставляется в значениеoffset_consumedпосле успешной публикации.NULL— ещё не опубликовано.uuid— ULID (Message.UUID), тот же, что доедет до Kafka и будет ключом в Deduplicator’е на consumer’е. Строится черезeventmeta.New(см.../conventions/events).payload— JSON-тело события (ReviewCreatedV1, сериализованный в JSON).metadata— JSON с envelope-заголовками (Event-Type,Schema-Version,Correlation-Id,Source-Service,Published-At,traceparent).transaction_id—xid8, идентификатор Postgres-транзакции. Именно он в связке сoffset_consumedдаёт корректный порядок чтения forwarder’ом: строки из одной транзакции читаются пачкой, строки из незакоммиченных транзакций пропускаются до момента их commit’а.PRIMARY KEY (transaction_id, offset_consumed)— уникальность строки и естественный порядок для cursor-based чтения.
Partial-index по offset_acked IS NULL нужен, чтобы forwarder мог
сканировать только неопубликованные строки, а не всю таблицу.
⚠
IS NULL, не= NULL. В SQLoffset_acked = NULL— это всегдаUNKNOWN(не true), поэтому такой предикат никогда не сматчит ни одной строки — ни вWHERE, ни в partial-index. Код, случайно написавшийWHERE offset_acked = NULL, молча не увидит unacked-строки (forwarder остановится, backlog расти), а Postgres не выдаст ошибку.Правило: во всех запросах к
outbox(partial-index, forwarder- selects, cleanup, debug-query из runbook’а) используй толькоIS NULL/IS NOT NULL.linter-ruleв code-review: любое= NULL/!= NULLв SQL — ошибка, independently от таблицы.
Имя таблицы — всегда outbox, живёт в БД сервиса (в схеме public, без
префикса с именем сервиса). Каждый сервис — отдельная БД, поэтому имена
таблиц не конфликтуют. Одна таблица outbox на сервис. Одна таблица
outbox на несколько сервисов — антипаттерн (см. §Anti-patterns).
Publisher side
Запись в outbox происходит внутри той же транзакции, что и бизнес-запись. Код в service-слое:
func (s *ReviewService) Create(ctx context.Context, cmd CreateReviewCommand) (*domain.Review, error) {
var r *domain.Review
err := s.db.InTx(ctx, func(tx pgx.Tx) error {
saved, err := s.reviews.CreateTx(ctx, tx, cmd.UserID, cmd.PlaceID, cmd.Rating, cmd.Text)
if err != nil {
return err
}
r = saved
payload := event.ReviewCreatedV1{
ReviewID: saved.ID,
PlaceID: saved.PlaceID,
UserID: saved.UserID,
Rating: saved.Rating,
}
msg, err := eventmeta.New(ctx, eventmeta.Envelope{
EventType: "review.created",
SchemaVersion: "1",
Source: "review",
Payload: payload,
})
if err != nil {
return err
}
return s.outboxPublisher.PublishTx(ctx, tx, "outbox", msg)
})
if err != nil {
return nil, fmt.Errorf("create review: %w", err)
}
return r, nil
}Разобьём по кусочкам:
eventmeta.Newстроит*message.Messageс заполненнымMessage.UUID(ULID), envelope-metadata (Event-Type,Schema-Version,Correlation-Id,Source-Service,Published-At,traceparentиз OTel). Код helper’а — в../conventions/events.s.outboxPublisher— этоwatermill-sqlPublisher, настроенный писать в таблицуoutbox. МетодPublishTx— тонкий wrapper, который пишет строку, используя переданныйpgx.Tx.- Первый аргумент
"outbox"вPublishTx— не Kafka-topic. Это логическое «имя канала» внутри outbox-таблицы. Мы всегда передаём"outbox"— по одной на сервис. Реальный Kafka-topic определится позже, в forwarder’е, по envelope-metadata. InTxкоммитит или откатывает обе записи атомарно. ЕслиPublishупал — откатится и вставка вreviews. Если вставка вreviewsупала — вставка в outbox тоже не произошла.
Правило: внутри InTx никогда не делай прямых Kafka-вызовов,
HTTP-вызовов, внешних побочных эффектов. Только БД-операции через
переданный tx. Детали — ../conventions/db-pgx.
Forwarder
Forwarder — компонент watermill/components/forwarder. Он читает
строки из outbox-таблицы и публикует в Kafka.
Setup
import (
watermillSQL "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
"github.com/ThreeDotsLabs/watermill/components/forwarder"
"github.com/ThreeDotsLabs/watermill/message"
)
// 1. Subscriber поверх Postgres — читает outbox.
sqlSub, err := watermillSQL.NewSubscriber(stdDB, watermillSQL.SubscriberConfig{
SchemaAdapter: watermillSQL.DefaultPostgreSQLSchema{
GenerateMessagesTableName: func(topic string) string { return "outbox" },
},
OffsetsAdapter: watermillSQL.DefaultPostgreSQLOffsetsAdapter{},
InitializeSchema: false, // схема создаётся миграцией
PollInterval: 500 * time.Millisecond,
ConsumerGroup: "review-outbox-forwarder",
}, watermillLogger)
// 2. Kafka publisher — пишет в реальный Kafka.
kafkaPub, err := kafka.NewPublisher(
kafka.PublisherConfig{Brokers: cfg.Kafka.Brokers, Marshaler: kafka.DefaultMarshaler{}},
watermillLogger,
)
// 3. Router для применения middleware к forwarder'у.
router, err := message.NewRouter(message.RouterConfig{}, watermillLogger)
// 4. Сам forwarder.
fwd, err := forwarder.NewForwarder(sqlSub, kafkaPub, watermillLogger, forwarder.Config{
ForwarderTopic: "outbox", // то же имя, что используется в PublishTx
Router: router,
Middlewares: []message.HandlerMiddleware{
middleware.Recoverer,
otelMiddleware.Trace(tracer), // пробрасывает traceparent в Kafka
outboxMetricsMiddleware, // publish_duration + errors
},
})
go func() { _ = fwd.Run(ctx) }()
go func() { _ = router.Run(ctx) }()Параметры, на которые обращай внимание:
PollInterval— как часто forwarder делает запрос к outbox. Default 500ms. Снижение (100ms) даёт меньшую end-to-end задержку, но увеличивает нагрузку на БД. Увеличение (2s) экономит ресурсы, но задерживает публикацию на столько же.ConsumerGroup— имя consumer-group’ыwatermill-sqlдля offsets-таблицы. У разных forwarder’ов должна быть разная группа, чтобы они не перебивали друг другу offset’ы.ForwarderTopic— должен совпадать с topic’ом, который мы передаём вPublishTxна publisher-стороне.
Как forwarder выбирает Kafka-topic
Forwarder читает строку из outbox и публикует её в Kafka. Имя Kafka- топика берётся из envelope-metadata. Стандартный маппинг:
Kafka topic = "kazmaps." + Source-Service + "." + Event-TypeПример: Source-Service=review, Event-Type=review.created →
kazmaps.review.review.
Маппинг реализуется через middleware forwarder’а: handler читает
msg.Metadata, выставляет целевой topic через механизм
forwarder.Config.DecorateMessage (или через явный publisher-wrapper,
который принимает topic как функцию от *message.Message). Формат
имён — ../conventions/events.
Конкурентность и advisory locks
watermill-sql при чтении outbox использует SELECT ... FOR UPDATE SKIP LOCKED. Это безопасно при нескольких forwarder-инстансах: две
реплики могут параллельно читать разные блоки строк, не блокируя
друг друга. Внутри watermill-sql поверх этого есть advisory-lock,
защищающий offsets-таблицу от гонок при обновлении позиции.
Это не значит, что надо специально поднимать несколько forwarder’ов.
По умолчанию — один forwarder на сервис (одна реплика
cmd/server/main.go, одна goroutine). Если throughput outbox превысит
возможности одной реплики — тогда отдельный forwarder-deployment,
но это редкий кейс.
Ordering guarantees
Это частый источник недопонимания.
- Между разными транзакциями в пределах одного outbox: строки
упорядочены по
(transaction_id, offset_consumed), forwarder публикует в этом порядке. Если транзакция T2 закоммитилась после T1, её события уйдут в Kafka после событий T1. - Внутри одной транзакции: порядок по
offset_consumed— в каком порядке вызывалиPublishTx, в таком порядке окажется в Kafka. - Per-aggregate ordering в Kafka — не гарантируется автоматически
outbox’ом. Если тебе важно, чтобы
review.updatedдля одного ревью пришёл послеreview.createdдля того же ревью, ты должен обеспечить две вещи:- Оба события идут через outbox одного и того же сервиса (✓ по определению — сервис-владелец aggregate’а).
- В Kafka partition key =
aggregate_id. Kafka гарантирует порядок только внутри одной партиции. Еслиreview.createdиreview.updatedпопадут в разные партиции, consumer увидит их в произвольном порядке.
Partition key выставляется forwarder-middleware при публикации в Kafka:
outboxMiddleware := func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
if aggID := msg.Metadata.Get("Aggregate-Id"); aggID != "" {
msg.Metadata.Set("partition_key", aggID)
}
return h(msg)
}
}Kafka publisher читает partition_key и хэширует его в номер
партиции. Один aggregate → одна партиция → корректный порядок.
Не пытайся делать «свою логику упорядочивания» поверх outbox (таблица «ожидающих публикации», проверка last_seq и т.п.). Выбери правильный partition key, прими at-least-once и сделай consumer’ов идемпотентными.
Monitoring
Метрики
outbox_rows_total{service, status="unacked"}
— gauge, количество неопубликованных строк.
outbox_forwarder_lag_seconds{service}
— histogram, now() - MIN(created_at) по unacked, замеряется раз в тик.
outbox_forwarder_publish_duration_seconds{service, topic, result}
— histogram, длительность kafka.Publish.
outbox_forwarder_errors_total{service, reason}
— counter, reason ∈ {kafka_unavailable, marshal, db_read, db_ack}.
events_published_total{service, topic, event_type}
— counter, инкрементируется после успешной публикации.Выставляет эти метрики forwarder-middleware; код middleware живёт в
pkg/eventmw/ сервис-репо. Подключение — в setup-блоке (см.
../conventions/observability).
Alerting
Шаблоны alert-правил (per-сервис, лежат в infra-репо, здесь — как приёмочные критерии):
- Forwarder падает или не может публиковать. Alert:
rate(outbox_forwarder_errors_total[5m]) > 0в течение 10 минут → ticket. - Лаг растёт. Alert:
outbox_forwarder_lag_seconds > 60в течение 5 минут → ticket. Диагностика —../how-to/debug-outbox-lag. - Backlog накапливается. Alert:
outbox_rows_total{status="unacked"} > 1000→ ticket. - Совсем нет публикаций при наличии трафика. Alert:
rate(http_requests_total{endpoint="/v1/reviews", method="POST", status="201"}[5m]) > 0иrate(events_published_total{event_type="review.created"}[5m]) == 0→ страница.
Трейсинг
OTel-middleware в forwarder’е читает traceparent из envelope-metadata
и продолжает тот же trace, что создал HTTP-handler на входе.
В Tempo на одном waterfall видны: HTTP span → DB insert span →
kafka.Publish span. Это и есть основной способ разобрать «почему между
HTTP-ответом и появлением события в Kafka прошло 3 секунды».
Cleanup и retention
Таблица outbox не растёт бесконечно. Acked-строки удаляются регулярно, unacked — никогда. Цель раздела — определить retention window, способ удаления, защиту от bloat и мониторинг.
Retention window
Acked сообщения удаляются после 7 дней. Это компромисс:
- Достаточно для replay при инциденте. Типичный срок расследования «почему downstream-сервис не увидел событие» — 1–3 рабочих дня; запас на выходные и праздники даёт 7 дней.
- Не слишком долго для table bloat. При 10 тыс. событий в день таблица содержит до 70 тыс. строк на сервис — комфортно для индекса и VACUUM’а.
- Кратно retention Kafka-топика (по умолчанию тоже 7 дней), что даёт понятное правило: после истечения Kafka-retention replay невозможен в любом случае.
Меньше 3 дней — нельзя: инцидент в пятницу разбирается в понедельник, payload нужен. Больше 30 дней — избыточно, начинает страдать VACUUM и размер индекса.
Как удалять
CronJob в Kubernetes, расписание 0 3 * * * (ночью, после off-peak).
Job вызывает internal HTTP-endpoint сервиса /internal/outbox/cleanup
или напрямую выполняет SQL (зависит от сервиса — endpoint проще для
observability, SQL проще для инфры):
DELETE FROM outbox
WHERE offset_acked IS NOT NULL
AND created_at < NOW() - INTERVAL '7 days';Удаление — батчами по 10 000 строк, чтобы не держать длинную блокировку и не раздувать WAL одной большой транзакцией:
WITH victims AS (
SELECT transaction_id, offset_consumed
FROM outbox
WHERE offset_acked IS NOT NULL
AND created_at < NOW() - INTERVAL '7 days'
ORDER BY transaction_id, offset_consumed
LIMIT 10000
)
DELETE FROM outbox o
USING victims v
WHERE o.transaction_id = v.transaction_id
AND o.offset_consumed = v.offset_consumed;Job крутит этот DELETE в цикле, пока количество удалённых строк не станет меньше 10 000 (= мы добрали «хвост»). Между итерациями — пауза 500 ms, чтобы не доминировать на I/O.
Правило: никогда не удаляй unacked-строки. Если строка не опубликована — её должен обработать forwarder. Удалить = потерять событие безмолвно.
Пример CronJob YAML
apiVersion: batch/v1
kind: CronJob
metadata:
name: review-outbox-cleanup
namespace: review
spec:
schedule: "0 3 * * *"
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
jobTemplate:
spec:
backoffLimit: 2
activeDeadlineSeconds: 1800
template:
spec:
restartPolicy: OnFailure
containers:
- name: cleanup
image: kazmaps/review-service:stable
args: ["outbox-cleanup"]
env:
- name: OUTBOX_RETENTION_DAYS
value: "7"
- name: OUTBOX_BATCH_SIZE
value: "10000"
- name: OUTBOX_BATCH_SLEEP_MS
value: "500"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: review-db
key: url
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 256MiКлючевые поля:
schedule: "0 3 * * *"— ночной запуск.concurrencyPolicy: Forbid— если предыдущий cleanup ещё не доделал, новый не стартует. Длинный cleanup — сигнал о проблеме.backoffLimit: 2— две попытки на retry внутри одного Job’а, потом failure и alert.activeDeadlineSeconds: 1800— 30 минут hard cap. Не успел — что-то не так (size blowup, lock contention).restartPolicy: OnFailure— перезапускать контейнер при падении внутри Job’а.
Сервис (образ kazmaps/review-service) запускается с подкомандой
outbox-cleanup — это CLI-mode main’а, который читает env-переменные
ретеншена, выполняет DELETE-цикл и завершается. Детали CLI —
../conventions/configuration.
VACUUM
После массового DELETE Postgres не отдаёт место на диск автоматически —
строки становятся dead tuples, таблица продолжает расти. Отдельный
CronJob раз в неделю (например, 0 4 * * 0 — ночь воскресенья):
VACUUM (VERBOSE, ANALYZE) outbox;Параметры:
- Обычный
VACUUM(неFULL) — не блокирует запись, но не отдаёт место на диск, только помечает dead tuples как переиспользуемые. Для наших объёмов — достаточно. VACUUM FULLне использовать: он берёт exclusive lock и переписывает всю таблицу. На живой outbox с forwarder’ом это приведёт к лагу публикации на время VACUUM’а.ANALYZEобновляет статистику планировщика; полезно, если индексidx_outbox_ackedначинает использоваться subоptимально.
Если размер таблицы продолжает расти (см. метрики bloat ниже) несмотря
на VACUUM — запускаем pg_repack в отдельное maintenance-окно. Это уже
ручная процедура, не cron.
Метрики bloat
outbox_rows_total{service, status="acked"}
— gauge, количество acked-строк. Собирается периодическим
SELECT count(*) WHERE offset_acked IS NOT NULL.
outbox_rows_total{service, status="pending"}
— gauge, количество unacked-строк. SELECT count(*) WHERE offset_acked IS NULL.
outbox_table_bytes{service}
— gauge, pg_relation_size('outbox'). Из postgres_exporter.Metric-collector в сервисе раз в 60 секунд выполняет два count(*) и
expose’ит их в /metrics. pg_relation_size — через postgres_exporter
(живёт отдельно, часть инфры).
Alerts:
outbox_rows_total{status="acked"} > 1_000_000в любой момент → ticket. Cleanup сломан (CronJob упал, retention переконфигурирован на больше чем надо, DELETE блокируется).outbox_rows_total{status="pending"} > 10_000 for 5m→ page. Forwarder залип: публикация не успевает за входящим трафиком либо Kafka недоступен. Диагностика —../how-to/debug-outbox-lag.outbox_table_bytesрастёт несмотря на DELETE — запускаем ручнойpg_repackв maintenance-окно.
Правила сверху:
- Никогда не удаляй unacked-строки. См. выше.
- Не делай retention меньше 3 дней. Теряешь возможность replay при weekend-инциденте.
- Не клади cleanup в сам сервис (как goroutine внутри main’а). Это
инфраструктурная задача с отдельным lifecycle: отдельный image-tag не
нужен, зато отдельная observability и отдельный alert на failure’ы
Job’а. CronJob с образом сервиса и аргументом
outbox-cleanup— правильный компромисс: один релиз, разный режим запуска.
Failure modes
| Сценарий | Поведение | Что делать |
|---|---|---|
| Forwarder-goroutine упала (panic) | middleware.Recoverer поймал, goroutine перезапустилась (если wrapped в supervisor). Без supervisor — упала навсегда до рестарта pod’а. | Всегда оборачивай fwd.Run(ctx) в retry-loop или полагайся на middleware.Recoverer + log. Alert на forwarder_errors. |
| Kafka недоступен | Publish возвращает ошибку, строка в outbox остаётся unacked. Forwarder retry’ит на следующем тике. Лаг растёт. | Alert на lag → диагностика Kafka → восстановление. Строки дрейн’ятся после восстановления. |
| DB pool исчерпан | Forwarder ждёт свободное соединение → лаг растёт → alert. | Выдели forwarder’у отдельный pool (3–5 соединений), изолированный от HTTP-handler’ов. Конфиг — в internal/config/. |
| Две реплики forwarder’а | SELECT FOR UPDATE SKIP LOCKED + advisory lock защищают. Но дубли доставки в Kafka возможны на границах. | По умолчанию — один forwarder на сервис. Если нужен HA — полагайся на k8s перезапуск pod’а, а не на multi-writer. |
| Падение между DB commit и публикацией | Строка в outbox уже коммитнута, в Kafka ещё не ушло → forwarder возьмёт её на следующем тике. | Ничего делать не надо, это штатный путь at-least-once. |
| Kafka принял, но forwarder упал до UPDATE offset_acked | При рестарте forwarder прочитает ту же строку снова → дубль в Kafka. | Ничего делать не надо. Consumer дедуплицирует по Message.UUID через Deduplicator middleware (см. idempotent-consumer). |
Злой sql-запрос в outbox-таблице (DELETE вручную) | Потеряны события. | Запрещено делать руками. Только cleanup через CronJob. Доступ в прод DB — через pass-through, не direct. |
Testing
Unit через gochannel
Для теста бизнес-handler’а — in-memory pub-sub. Publisher подставляется
как gochannel.GoChannel, реальный outbox не нужен.
func TestReviewService_Create_PublishesEvent(t *testing.T) {
pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil)
defer pubsub.Close()
sub, _ := pubsub.Subscribe(context.Background(), "outbox")
svc := service.NewReviewService(fakeRepo, fakeDB, pubsub)
_, err := svc.Create(ctx, service.CreateReviewCommand{UserID: 42, PlaceID: 7, Rating: 5})
if err != nil {
t.Fatalf("create: %v", err)
}
select {
case msg := <-sub:
if got := msg.Metadata.Get("Event-Type"); got != "review.created" {
t.Fatalf("event-type: %q", got)
}
case <-time.After(2 * time.Second):
t.Fatal("event not published")
}
}Такой тест покрывает: события строятся, envelope корректен, publish вызывается. Он не проверяет, что строка попала в Postgres-таблицу — для этого integration-тест.
Integration через testcontainers
Поднимается реальный Postgres, применяются миграции, в тесте
service.Create → тест читает SELECT FROM outbox и убеждается,
что строка появилась с правильным payload’ом.
func TestReviewService_Create_WritesOutboxRow(t *testing.T) {
pool := setupPostgres(t) // testcontainers Postgres + migrations
stdDB := stdlib.OpenDBFromPool(pool) // *sql.DB поверх pgx для watermill-sql
outboxPub := newOutboxPublisher(t, stdDB)
svc := service.NewReviewService(
postgres.NewReviewRepo(pool),
pkgdb.New(pool),
outboxPub,
)
_, err := svc.Create(ctx, service.CreateReviewCommand{UserID: 42, PlaceID: 7, Rating: 5})
if err != nil {
t.Fatalf("create: %v", err)
}
var count int
err = pool.QueryRow(ctx,
`SELECT count(*) FROM outbox WHERE metadata->>'Event-Type' = 'review.created'`,
).Scan(&count)
if err != nil || count != 1 {
t.Fatalf("outbox rows: got %d, err %v", count, err)
}
}End-to-end
Testcontainers Postgres + testcontainers Kafka + forwarder внутри того же
теста. svc.Create → подождать сообщение из Kafka (через Watermill
subscriber на тестовой группе). Такой тест держим в build-tag’е
integration, запускается в отдельном Makefile-target’е.
Подробнее про testcontainers —
../conventions/testing.
Anti-patterns
Прямой kafka.Publisher.Publish в handler
// ПЛОХО: classic dual-write
func (s *ReviewService) Create(ctx context.Context, cmd CreateReviewCommand) (*domain.Review, error) {
r, err := s.reviews.Create(ctx, cmd) // БД
if err != nil { return nil, err }
return r, s.kafkaPub.Publish("kazmaps.review.review", buildMsg(r)) // Kafka
}Эта схема выглядит работающей на стенде и в unit-тестах. Она
ломается только в проде под сетевыми сбоями: Kafka недоступна 200ms, а
БД уже закоммичена → событие потеряно, никто не узнает. Пример такой
ошибки — в репозитории сервиса review, файл
internal/event/publisher.go: Publisher работает напрямую с
kafka.Publisher, вызывается из handler’а после commit’а
транзакции. Это надо переписать на outbox (см. §Migration).
Свой outbox-worker с ручным SELECT FOR UPDATE
В репозитории сервиса user, файлы
internal/repository/postgres/outbox.go и internal/event/worker.go —
ручной outbox: FetchUnpublished + MarkPublished по id-списку.
Почему это антипаттерн:
FetchUnpublishedиспользуетpool.Query(...)сFOR UPDATE SKIP LOCKED. Ноpgx.Queryбез явной транзакции возвращает соединение в пул после закрытия rows — locks снимаются в момент автокоммита. В результатеFOR UPDATEничего не защищает: вторая реплика увидит те же строки «unlocked» и опубликует их дубли.MarkPublishedиспользует другое соединение из пула, отдельной транзакцией — то есть нет гарантии, что строка, которую мы пометили как published, это именно та строка, которую забиралFetchUnpublished.- Этот баг не ловится юнит-тестами (pool=1 connection, локи ведут себя иначе) и не ловится integration-тестом на одной реплике. Ловится только под продовой нагрузкой на нескольких репликах → дубли в Kafka → consumer-сайд падает от duplicates или молча повторяет side-effect’ы.
Вместо этого: всегда watermill-sql subscriber + forwarder. Они
используют правильные транзакционные границы и offsets-таблицу для
tracking’а.
Нет monitoring
Событие потерялось, retention в Kafka 7 дней, через месяц downstream
сервис показывает «неправильные счётчики» и никто не понимает почему.
Alert на outbox_forwarder_lag_seconds и
outbox_rows_total{status="unacked"} закрывает этот класс проблем.
Агрессивный cleanup
DELETE FROM outbox WHERE created_at < NOW() - INTERVAL '1 hour' —
после любого инцидента невозможно replay события. Не меньше 3 дней,
рекомендуем 7.
Один общий outbox на все сервисы
«Возьмём один общий Postgres, одну таблицу outbox, все сервисы
пишут туда» — ломает database-per-service. Любой сбой в этой общей БД
блокирует все publish’ы всех сервисов. Каждый сервис — свой outbox в
своей БД.
PublishTx без InTx
// ПЛОХО: нет транзакции, есть dual-write
s.reviews.Create(ctx, r)
s.outboxPub.Publish("outbox", msg) // не PublishTx, не внутри txМы потеряли атомарность: если между этими двумя строчками упадёт
процесс, БД-строка есть, outbox-строки нет. Всегда InTx + PublishTx.
Tx снаружи InTx, ручной BEGIN/COMMIT
tx, _ := pool.Begin(ctx)
defer tx.Rollback(ctx)
// ... много кода ...
tx.Commit(ctx)Проблема — легко забыть return после ошибки, легко забыть defer.
Используй helper db.InTx — он управляет rollback/commit за тебя. См.
../conventions/db-pgx.
Migration: из no-outbox в outbox
План для сервиса, который сейчас публикует напрямую в Kafka (как
review):
- Миграция БД. Добавь таблицу
outboxпо DDL из §Schema. Отдельная миграция, expand-only (ничего не ломается у живого сервиса, таблица пустая). - Добавь
outboxPublisher. Вcmd/server/main.goсоздайwatermill-sqlpublisher, пропиши в DI в service-слой. - Dual-publish (временный шаг). В service-слое теперь и старый
прямой
kafka.Publish, и новыйoutboxPublisher.PublishTx. Это увеличит нагрузку: consumer получит дубли. ВключиDeduplicator-middleware на consumer-сайде, если он ещё не включён. - Включи forwarder. Запусти goroutine. Проверь логи/метрики:
events_published_totalинкрементится,outbox_rows_totalне копится. - Убери прямой publish. Теперь только outbox. В этот момент
поломки (если есть) станут видны через
outbox_lag_seconds— быстро, локально в сервисе, без последствий для Kafka. - Мониторь неделю. Убедись, что lag стабильный, ошибок нет, retention по cleanup настроен.
Шаги 3–4 критичны: нельзя просто «выключил прямой publish, включил outbox» — если что-то сломается в новой схеме, события теряются безмолвно. Dual-publish на время миграции даёт страховку.
FAQ
«Почему не CDC (Debezium)?» CDC читает WAL Postgres и вытаскивает изменения в Kafka без outbox-таблицы. Это элегантно, но требует: настроенный Kafka Connect, per-table whitelist, версионирование конфигов, отдельный lifecycle. Для наших объёмов прибавочная ценность не окупает сложности. Forwarder — меньше движущихся частей, внутри сервис-репо, без внешних зависимостей, кроме Kafka. Если когда-то выйдем на объёмы, где forwarder перестанет справляться — тогда отдельная задача на CDC. Пока нет.
«Можно ли Kafka использовать как outbox? Писать сразу в Kafka, а оттуда — в БД через consumer?» Нет, это возвращает dual-write: между publish в Kafka и commit в БД может упасть producer → событие опубликовано, БД не обновилась, consumer попытается применить изменение, которого нет в источнике. Outbox именно и решает этот случай.
«Outbox vs Event Sourcing?» Разное. Outbox — механизм доставки событий из источника в шину. Event Sourcing — хранение состояния как журнала событий (событие — первичный факт, состояние — производное). Комбинируются редко: если у тебя ES, тебе outbox не нужен, потому что журнал и есть outbox. У нас — обычные реляционные таблицы как источник правды, outbox как канал наружу.
«Publish делает RPC? Это не медленно?» PublishTx — это INSERT
в Postgres, не сетевой вызов в Kafka. Медленно только если outbox-
таблица заблокирована, что не штатно. На publisher-сайде задержка
измеряется сотнями микросекунд.
«А если мне нужна синхронная операция — ответить клиенту после того,
как событие дошло до consumer’а?» Это не outbox-use-case. Outbox —
про асинхронное распространение факта. Для синхронного взаимодействия
— HTTP internal endpoint (см.
api-composition).
«Почему не писать сразу envelope в payload-колонку, а metadata
отдельно?» Потому что watermill-sql именно так читает: разделение
payload/metadata — требование DefaultPostgreSQLSchema. Если
отойдёшь — придётся писать свой SchemaAdapter и поддерживать его.
Не надо.
Связанные разделы
../conventions/events— envelope, naming топиков, middleware stack, reference-уровень.../how-to/add-kafka-event— пошаговая процедура добавить новый тип события.../how-to/debug-outbox-lag— runbook, когда alert на lag сработал.idempotent-consumer— как consumer переживает at-least-once.cqrs— как поверх outbox+Kafka собрать типизированные handler’ы черезwatermill/components/cqrs.../conventions/db-pgx—InTx, миграции, advisory lock.../conventions/observability— метрики, трейсинг, алертинг.../conventions/shutdown— корректный drain forwarder’а при SIGTERM.../glossary— Forwarder, Envelope, Event-Id, Outbox.