Transactional Outbox¶
Deep-dive по паттерну. Если ты ищешь «как опубликовать событие по шагам» —
открой ../how-to/add-kafka-event.md. Если
ищешь «какие поля в envelope» — открой
../conventions/events.md. Эта страница
отвечает на другой вопрос: как outbox работает под капотом, как его
масштабировать, мониторить и как вытащить сервис из состояния, когда
что-то сломалось.
Содержание¶
- Что это и зачем
- Когда использовать
- Когда НЕ использовать
- Архитектура
- Schema outbox-таблицы
- Publisher side
- Forwarder
- Ordering guarantees
- Monitoring
- Cleanup
- Failure modes
- Testing
- Anti-patterns
- Migration: из no-outbox в outbox
- FAQ
- Связанные разделы
Что это и зачем¶
Outbox решает проблему dual-write: бизнес-операция пишет в Postgres и публикует событие в Kafka, и эти два действия должны быть согласованы. Без outbox возможны две поломки:
- Закоммитили транзакцию в БД → перед
Publishпроцесс упал / сеть оборвалась → событие потеряно, downstream-сервисы не увидят факт, который уже отражён в БД источника. - Опубликовали сначала в Kafka → транзакция откатилась → событие фантомное, downstream отреагирует на изменение, которого нет.
Outbox устраняет оба случая. В одной транзакции с бизнес-записью пишется
строка в таблицу outbox в той же БД. Отдельный процесс (forwarder)
забирает строки и публикует в Kafka с at-least-once гарантиями.
Downstream обязан быть идемпотентным (см.
idempotent-consumer.md) — это плата за
простоту и атомарность записи в источнике.
Гарантии:
- Атомарность записи бизнес-состояния и факта события — обеспечивает БД через общую транзакцию.
- At-least-once доставки — обеспечивает forwarder: пока строка не помечена как published, её будут переопубликовывать.
- Порядок per-aggregate в Kafka — обеспечивает partition key (см. §Ordering), а не outbox сам по себе.
Exactly-once здесь нет и не будет. Kafka даёт at-least-once, outbox повторяет эту семантику на своём уровне.
Когда использовать¶
Правило по умолчанию: всегда через outbox, если событие отражает
изменение доменного состояния сервиса. Любой факт, который потом
захочет прочитать другой сервис или аналитика, должен пройти через
outbox: user.registered, user.banned, review.updated,
photo.uploaded, moderation.approved.
Второе правило: если ты написал в БД и дальше дёрнул Kafka напрямую — ты создал dual-write. Даже если «всё работает на стенде». Починить эту логическую ошибку позже будет дороже, чем написать через outbox с самого начала.
Когда НЕ использовать¶
- Ephemeral сигналы без привязки к DB-state. Пример: WebSocket-события «пользователь печатает», «активен на этой странице», live-позиция курсора. У них нет соответствующего изменения в БД, и потеря одной такой нотификации не ломает систему.
- Fan-out внутри одного сервиса. Если publisher и subscriber живут в
одном процессе и запись в БД им не нужна как общий факт — используй
gochannelin-process pub/sub, не гоняй событие через Kafka. - Технические метрики. Prometheus counter, не Kafka event.
Архитектура¶
sequenceDiagram
autonumber
participant C as HTTP Client
participant H as Handler
participant S as Service
participant DB as Postgres
participant F as Forwarder
participant K as Kafka
participant D as Downstream
C->>H: POST /v1/reviews
H->>S: CreateReview(ctx, cmd)
S->>DB: BEGIN
S->>DB: INSERT INTO reviews ...
S->>DB: INSERT INTO outbox ...
S->>DB: COMMIT
S-->>H: Review{ID: 42}
H-->>C: 201 Created
Note over F,DB: Polling, ~500ms
F->>DB: SELECT FROM outbox WHERE offset_acked IS NULL
F->>K: Publish(topic, envelope)
F->>DB: UPDATE outbox SET offset_acked = NOW()
K-->>D: review.created
Ключевые свойства:
- Forwarder — отдельный worker-loop; либо внутри того же процесса сервиса
(goroutine), либо отдельный Deployment. Мы по умолчанию запускаем
goroutine в
cmd/server/main.go— это проще и достаточно. - Transport из outbox в Kafka — через
watermill-sql(subscriber) +watermill-kafka(publisher), склеенныеwatermill/components/forwarder. - Сам forwarder код не пишем — используем готовый из Watermill. Попытка
написать свой (как в репозитории сервиса
user,internal/event/worker.go) сразу становится антипаттерном: см. §Anti-patterns.
Schema outbox-таблицы¶
Схема совпадает с DefaultPostgreSQLSchema из watermill-sql/v3. DDL
кладётся отдельной миграцией в каждый сервис-репо в свою схему:
-- review/migrations/XXX_outbox.up.sql
BEGIN;
SELECT pg_advisory_xact_lock(hashtext('review_migrations'));
CREATE TABLE review.outbox (
offset_acked BIGINT,
offset_consumed BIGSERIAL NOT NULL,
uuid VARCHAR(36) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
payload JSON,
metadata JSON,
transaction_id xid8 NOT NULL DEFAULT pg_current_xact_id(),
CONSTRAINT outbox_pk PRIMARY KEY (transaction_id, offset_consumed)
);
CREATE INDEX idx_outbox_acked
ON review.outbox (offset_acked)
WHERE offset_acked IS NULL;
COMMIT;
Пояснение полей:
offset_consumed— локальный монотонный номер в пределах таблицы (BIGSERIAL). Используется для сортировки при чтении.offset_acked— выставляется в значениеoffset_consumedпосле успешной публикации.NULL— ещё не опубликовано.uuid— ULID (Message.UUID), тот же, что доедет до Kafka и будет ключом в Deduplicator'е на consumer'е. Строится черезeventmeta.New(см.../conventions/events.md).payload— JSON-тело события (ReviewCreatedV1, сериализованный в JSON).metadata— JSON с envelope-заголовками (Event-Type,Schema-Version,Correlation-Id,Source-Service,Published-At,traceparent).transaction_id—xid8, идентификатор Postgres-транзакции. Именно он в связке сoffset_consumedдаёт корректный порядок чтения forwarder'ом: строки из одной транзакции читаются пачкой, строки из незакоммиченных транзакций пропускаются до момента их commit'а.PRIMARY KEY (transaction_id, offset_consumed)— уникальность строки и естественный порядок для cursor-based чтения.
Partial-index по offset_acked IS NULL нужен, чтобы forwarder мог
сканировать только неопубликованные строки, а не всю таблицу.
Имя таблицы — всегда outbox, схема — <service> (review.outbox,
user.outbox, media.outbox). Одна таблица outbox на сервис. Одна
таблица outbox на несколько сервисов — антипаттерн (см. §Anti-patterns).
Publisher side¶
Запись в outbox происходит внутри той же транзакции, что и бизнес-запись. Код в service-слое:
func (s *ReviewService) Create(ctx context.Context, cmd CreateReviewCommand) (*domain.Review, error) {
var r *domain.Review
err := s.db.InTx(ctx, func(tx pgx.Tx) error {
saved, err := s.reviews.CreateTx(ctx, tx, cmd.UserID, cmd.PlaceID, cmd.Rating, cmd.Text)
if err != nil {
return err
}
r = saved
payload := event.ReviewCreatedV1{
ReviewID: saved.ID,
PlaceID: saved.PlaceID,
UserID: saved.UserID,
Rating: saved.Rating,
}
msg, err := eventmeta.New(ctx, eventmeta.Envelope{
EventType: "review.created",
SchemaVersion: "1",
Source: "review",
Payload: payload,
})
if err != nil {
return err
}
return s.outboxPublisher.PublishTx(ctx, tx, "outbox", msg)
})
if err != nil {
return nil, fmt.Errorf("create review: %w", err)
}
return r, nil
}
Разобьём по кусочкам:
eventmeta.Newстроит*message.Messageс заполненнымMessage.UUID(ULID), envelope-metadata (Event-Type,Schema-Version,Correlation-Id,Source-Service,Published-At,traceparentиз OTel). Код helper'а — в../conventions/events.md.s.outboxPublisher— этоwatermill-sqlPublisher, настроенный писать в таблицуreview.outbox. МетодPublishTx— тонкий wrapper, который пишет строку, используя переданныйpgx.Tx.- Первый аргумент
"outbox"вPublishTx— не Kafka-topic. Это логическое «имя канала» внутри outbox-таблицы. Мы всегда передаём"outbox"— по одной на сервис. Реальный Kafka-topic определится позже, в forwarder'е, по envelope-metadata. InTxкоммитит или откатывает обе записи атомарно. ЕслиPublishупал — откатится и вставка вreview.reviews. Если вставка вreviewsупала — вставка в outbox тоже не произошла.
Правило: внутри InTx никогда не делай прямых Kafka-вызовов,
HTTP-вызовов, внешних побочных эффектов. Только БД-операции через
переданный tx. Детали — ../conventions/db-pgx.md.
Forwarder¶
Forwarder — компонент watermill/components/forwarder. Он читает
строки из outbox-таблицы и публикует в Kafka.
Setup¶
import (
watermillSQL "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
"github.com/ThreeDotsLabs/watermill/components/forwarder"
"github.com/ThreeDotsLabs/watermill/message"
)
// 1. Subscriber поверх Postgres — читает outbox.
sqlSub, err := watermillSQL.NewSubscriber(stdDB, watermillSQL.SubscriberConfig{
SchemaAdapter: watermillSQL.DefaultPostgreSQLSchema{
GenerateMessagesTableName: func(topic string) string { return "review.outbox" },
},
OffsetsAdapter: watermillSQL.DefaultPostgreSQLOffsetsAdapter{},
InitializeSchema: false, // схема создаётся миграцией
PollInterval: 500 * time.Millisecond,
ConsumerGroup: "review-outbox-forwarder",
}, watermillLogger)
// 2. Kafka publisher — пишет в реальный Kafka.
kafkaPub, err := kafka.NewPublisher(
kafka.PublisherConfig{Brokers: cfg.Kafka.Brokers, Marshaler: kafka.DefaultMarshaler{}},
watermillLogger,
)
// 3. Router для применения middleware к forwarder'у.
router, err := message.NewRouter(message.RouterConfig{}, watermillLogger)
// 4. Сам forwarder.
fwd, err := forwarder.NewForwarder(sqlSub, kafkaPub, watermillLogger, forwarder.Config{
ForwarderTopic: "outbox", // то же имя, что используется в PublishTx
Router: router,
Middlewares: []message.HandlerMiddleware{
middleware.Recoverer,
otelMiddleware.Trace(tracer), // пробрасывает traceparent в Kafka
outboxMetricsMiddleware, // publish_duration + errors
},
})
go func() { _ = fwd.Run(ctx) }()
go func() { _ = router.Run(ctx) }()
Параметры, на которые обращай внимание:
PollInterval— как часто forwarder делает запрос к outbox. Default 500ms. Снижение (100ms) даёт меньшую end-to-end задержку, но увеличивает нагрузку на БД. Увеличение (2s) экономит ресурсы, но задерживает публикацию на столько же.ConsumerGroup— имя consumer-group'ыwatermill-sqlдля offsets-таблицы. У разных forwarder'ов должна быть разная группа, чтобы они не перебивали друг другу offset'ы.ForwarderTopic— должен совпадать с topic'ом, который мы передаём вPublishTxна publisher-стороне.
Как forwarder выбирает Kafka-topic¶
Forwarder читает строку из outbox и публикует её в Kafka. Имя Kafka- топика берётся из envelope-metadata. Стандартный маппинг:
Пример: Source-Service=review, Event-Type=review.created →
kazmaps.review.review.created.
Маппинг реализуется через middleware forwarder'а: handler читает
msg.Metadata, выставляет целевой topic через механизм
forwarder.Config.DecorateMessage (или через явный publisher-wrapper,
который принимает topic как функцию от *message.Message). Формат
имён — ../conventions/events.md.
Конкурентность и advisory locks¶
watermill-sql при чтении outbox использует SELECT ... FOR UPDATE
SKIP LOCKED. Это безопасно при нескольких forwarder-инстансах: две
реплики могут параллельно читать разные блоки строк, не блокируя
друг друга. Внутри watermill-sql поверх этого есть advisory-lock,
защищающий offsets-таблицу от гонок при обновлении позиции.
Это не значит, что надо специально поднимать несколько forwarder'ов.
По умолчанию — один forwarder на сервис (одна реплика
cmd/server/main.go, одна goroutine). Если throughput outbox превысит
возможности одной реплики — тогда отдельный forwarder-deployment,
но это редкий кейс.
Ordering guarantees¶
Это частый источник недопонимания.
- Между разными транзакциями в пределах одного outbox: строки
упорядочены по
(transaction_id, offset_consumed), forwarder публикует в этом порядке. Если транзакция T2 закоммитилась после T1, её события уйдут в Kafka после событий T1. - Внутри одной транзакции: порядок по
offset_consumed— в каком порядке вызывалиPublishTx, в таком порядке окажется в Kafka. - Per-aggregate ordering в Kafka — не гарантируется автоматически
outbox'ом. Если тебе важно, чтобы
review.updatedдля одного ревью пришёл послеreview.createdдля того же ревью, ты должен обеспечить две вещи: - Оба события идут через outbox одного и того же сервиса (✓ по определению — сервис-владелец aggregate'а).
- В Kafka partition key =
aggregate_id. Kafka гарантирует порядок только внутри одной партиции. Еслиreview.createdиreview.updatedпопадут в разные партиции, consumer увидит их в произвольном порядке.
Partition key выставляется forwarder-middleware при публикации в Kafka:
outboxMiddleware := func(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
if aggID := msg.Metadata.Get("Aggregate-Id"); aggID != "" {
msg.Metadata.Set("partition_key", aggID)
}
return h(msg)
}
}
Kafka publisher читает partition_key и хэширует его в номер
партиции. Один aggregate → одна партиция → корректный порядок.
Не пытайся делать «свою логику упорядочивания» поверх outbox (таблица «ожидающих публикации», проверка last_seq и т.п.). Выбери правильный partition key, прими at-least-once и сделай consumer'ов идемпотентными.
Monitoring¶
Метрики¶
outbox_rows_total{service, status="unacked"}
— gauge, количество неопубликованных строк.
outbox_forwarder_lag_seconds{service}
— histogram, now() - MIN(created_at) по unacked, замеряется раз в тик.
outbox_forwarder_publish_duration_seconds{service, topic, result}
— histogram, длительность kafka.Publish.
outbox_forwarder_errors_total{service, reason}
— counter, reason ∈ {kafka_unavailable, marshal, db_read, db_ack}.
events_published_total{service, topic, event_type}
— counter, инкрементируется после успешной публикации.
Выставляет эти метрики forwarder-middleware; код middleware живёт в
pkg/eventmw/ сервис-репо. Подключение — в setup-блоке (см.
../conventions/observability.md).
Alerting¶
Шаблоны alert-правил (per-сервис, лежат в infra-репо, здесь — как приёмочные критерии):
- Forwarder падает или не может публиковать. Alert:
rate(outbox_forwarder_errors_total[5m]) > 0в течение 10 минут → ticket. - Лаг растёт. Alert:
outbox_forwarder_lag_seconds > 60в течение 5 минут → ticket. Диагностика —../how-to/debug-outbox-lag.md. - Backlog накапливается. Alert:
outbox_rows_total{status="unacked"} > 1000→ ticket. - Совсем нет публикаций при наличии трафика. Alert:
rate(http_requests_total{endpoint="/v1/reviews", method="POST", status="201"}[5m]) > 0иrate(events_published_total{event_type="review.created"}[5m]) == 0→ страница.
Трейсинг¶
OTel-middleware в forwarder'е читает traceparent из envelope-metadata
и продолжает тот же trace, что создал HTTP-handler на входе.
В Tempo на одном waterfall видны: HTTP span → DB insert span →
kafka.Publish span. Это и есть основной способ разобрать «почему между
HTTP-ответом и появлением события в Kafka прошло 3 секунды».
Cleanup¶
Таблица outbox не растёт бесконечно. Acked-строки (offset_acked IS NOT
NULL) старше 7 дней удаляются CronJob'ом:
DELETE FROM review.outbox
WHERE offset_acked IS NOT NULL
AND created_at < NOW() - INTERVAL '7 days';
CronJob — k8s Job с расписанием раз в сутки. Живёт в infra-репо, конфигурируется per-сервис.
Правила:
- Никогда не удаляй unacked-строки. Если строка не опубликована — её должен обработать forwarder. Удалить = потерять событие.
- Не делай retention меньше 3 дней. Если был инцидент в пятницу и мы разбираемся в понедельник, нужна возможность посмотреть payload исходного события.
- Не клади cleanup в сам сервис. Это инфраструктурная задача, отдельный Job — отдельный lifecycle.
Failure modes¶
| Сценарий | Поведение | Что делать |
|---|---|---|
| Forwarder-goroutine упала (panic) | middleware.Recoverer поймал, goroutine перезапустилась (если wrapped в supervisor). Без supervisor — упала навсегда до рестарта pod'а. |
Всегда оборачивай fwd.Run(ctx) в retry-loop или полагайся на middleware.Recoverer + log. Alert на forwarder_errors. |
| Kafka недоступен | Publish возвращает ошибку, строка в outbox остаётся unacked. Forwarder retry'ит на следующем тике. Лаг растёт. |
Alert на lag → диагностика Kafka → восстановление. Строки дрейн'ятся после восстановления. |
| DB pool исчерпан | Forwarder ждёт свободное соединение → лаг растёт → alert. | Выдели forwarder'у отдельный pool (3–5 соединений), изолированный от HTTP-handler'ов. Конфиг — в internal/config/. |
| Две реплики forwarder'а | SELECT FOR UPDATE SKIP LOCKED + advisory lock защищают. Но дубли доставки в Kafka возможны на границах. |
По умолчанию — один forwarder на сервис. Если нужен HA — полагайся на k8s перезапуск pod'а, а не на multi-writer. |
| Падение между DB commit и публикацией | Строка в outbox уже коммитнута, в Kafka ещё не ушло → forwarder возьмёт её на следующем тике. | Ничего делать не надо, это штатный путь at-least-once. |
| Kafka принял, но forwarder упал до UPDATE offset_acked | При рестарте forwarder прочитает ту же строку снова → дубль в Kafka. | Ничего делать не надо. Consumer дедуплицирует по Message.UUID через Deduplicator middleware (см. idempotent-consumer.md). |
Злой sql-запрос в outbox-таблице (DELETE вручную) |
Потеряны события. | Запрещено делать руками. Только cleanup через CronJob. Доступ в прод DB — через pass-through, не direct. |
Testing¶
Unit через gochannel¶
Для теста бизнес-handler'а — in-memory pub-sub. Publisher подставляется
как gochannel.GoChannel, реальный outbox не нужен.
func TestReviewService_Create_PublishesEvent(t *testing.T) {
pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil)
defer pubsub.Close()
sub, _ := pubsub.Subscribe(context.Background(), "outbox")
svc := service.NewReviewService(fakeRepo, fakeDB, pubsub)
_, err := svc.Create(ctx, service.CreateReviewCommand{UserID: 42, PlaceID: 7, Rating: 5})
if err != nil {
t.Fatalf("create: %v", err)
}
select {
case msg := <-sub:
if got := msg.Metadata.Get("Event-Type"); got != "review.created" {
t.Fatalf("event-type: %q", got)
}
case <-time.After(2 * time.Second):
t.Fatal("event not published")
}
}
Такой тест покрывает: события строятся, envelope корректен, publish вызывается. Он не проверяет, что строка попала в Postgres-таблицу — для этого integration-тест.
Integration через testcontainers¶
Поднимается реальный Postgres, применяются миграции, в тесте
service.Create → тест читает SELECT FROM review.outbox и убеждается,
что строка появилась с правильным payload'ом.
func TestReviewService_Create_WritesOutboxRow(t *testing.T) {
pool := setupPostgres(t) // testcontainers Postgres + migrations
stdDB := stdlib.OpenDBFromPool(pool) // *sql.DB поверх pgx для watermill-sql
outboxPub := newOutboxPublisher(t, stdDB)
svc := service.NewReviewService(
postgres.NewReviewRepo(pool),
pkgdb.New(pool),
outboxPub,
)
_, err := svc.Create(ctx, service.CreateReviewCommand{UserID: 42, PlaceID: 7, Rating: 5})
if err != nil {
t.Fatalf("create: %v", err)
}
var count int
err = pool.QueryRow(ctx,
`SELECT count(*) FROM review.outbox WHERE metadata->>'Event-Type' = 'review.created'`,
).Scan(&count)
if err != nil || count != 1 {
t.Fatalf("outbox rows: got %d, err %v", count, err)
}
}
End-to-end¶
Testcontainers Postgres + testcontainers Kafka + forwarder внутри того же
теста. svc.Create → подождать сообщение из Kafka (через Watermill
subscriber на тестовой группе). Такой тест держим в build-tag'е
integration, запускается в отдельном Makefile-target'е.
Подробнее про testcontainers —
../conventions/testing.md.
Anti-patterns¶
Прямой kafka.Publisher.Publish в handler¶
// ПЛОХО: classic dual-write
func (s *ReviewService) Create(ctx context.Context, cmd CreateReviewCommand) (*domain.Review, error) {
r, err := s.reviews.Create(ctx, cmd) // БД
if err != nil { return nil, err }
return r, s.kafkaPub.Publish("kazmaps.review.review.created", buildMsg(r)) // Kafka
}
Эта схема выглядит работающей на стенде и в unit-тестах. Она
ломается только в проде под сетевыми сбоями: Kafka недоступна 200ms, а
БД уже закоммичена → событие потеряно, никто не узнает. Пример такой
ошибки — в репозитории сервиса review, файл
internal/event/publisher.go: Publisher работает напрямую с
kafka.Publisher, вызывается из handler'а после commit'а
транзакции. Это надо переписать на outbox (см. §Migration).
Свой outbox-worker с ручным SELECT FOR UPDATE¶
В репозитории сервиса user, файлы
internal/repository/postgres/outbox.go и internal/event/worker.go —
ручной outbox: FetchUnpublished + MarkPublished по id-списку.
Почему это антипаттерн:
FetchUnpublishedиспользуетpool.Query(...)сFOR UPDATE SKIP LOCKED. Ноpgx.Queryбез явной транзакции возвращает соединение в пул после закрытия rows — locks снимаются в момент автокоммита. В результатеFOR UPDATEничего не защищает: вторая реплика увидит те же строки «unlocked» и опубликует их дубли.MarkPublishedиспользует другое соединение из пула, отдельной транзакцией — то есть нет гарантии, что строка, которую мы пометили как published, это именно та строка, которую забиралFetchUnpublished.- Этот баг не ловится юнит-тестами (pool=1 connection, локи ведут себя иначе) и не ловится integration-тестом на одной реплике. Ловится только под продовой нагрузкой на нескольких репликах → дубли в Kafka → consumer-сайд падает от duplicates или молча повторяет side-effect'ы.
Вместо этого: всегда watermill-sql subscriber + forwarder. Они
используют правильные транзакционные границы и offsets-таблицу для
tracking'а.
Нет monitoring¶
Событие потерялось, retention в Kafka 7 дней, через месяц downstream
сервис показывает «неправильные счётчики» и никто не понимает почему.
Alert на outbox_forwarder_lag_seconds и
outbox_rows_total{status="unacked"} закрывает этот класс проблем.
Агрессивный cleanup¶
DELETE FROM outbox WHERE created_at < NOW() - INTERVAL '1 hour' —
после любого инцидента невозможно replay события. Не меньше 3 дней,
рекомендуем 7.
Один общий outbox на все сервисы¶
«Возьмём один общий Postgres, одну таблицу outbox, все сервисы
пишут туда» — ломает database-per-service. Любой сбой в этой общей БД
блокирует все publish'ы всех сервисов. Каждый сервис — свой outbox в
своей схеме.
PublishTx без InTx¶
// ПЛОХО: нет транзакции, есть dual-write
s.reviews.Create(ctx, r)
s.outboxPub.Publish("outbox", msg) // не PublishTx, не внутри tx
Мы потеряли атомарность: если между этими двумя строчками упадёт
процесс, БД-строка есть, outbox-строки нет. Всегда InTx + PublishTx.
Tx снаружи InTx, ручной BEGIN/COMMIT¶
Проблема — легко забыть return после ошибки, легко забыть defer.
Используй helper db.InTx — он управляет rollback/commit за тебя. См.
../conventions/db-pgx.md.
Migration: из no-outbox в outbox¶
План для сервиса, который сейчас публикует напрямую в Kafka (как
review):
- Миграция БД. Добавь таблицу
review.outboxпо DDL из §Schema. Отдельная миграция, expand-only (ничего не ломается у живого сервиса, таблица пустая). - Добавь
outboxPublisher. Вcmd/server/main.goсоздайwatermill-sqlpublisher, пропиши в DI в service-слой. - Dual-publish (временный шаг). В service-слое теперь и старый
прямой
kafka.Publish, и новыйoutboxPublisher.PublishTx. Это увеличит нагрузку: consumer получит дубли. ВключиDeduplicator-middleware на consumer-сайде, если он ещё не включён. - Включи forwarder. Запусти goroutine. Проверь логи/метрики:
events_published_totalинкрементится,outbox_rows_totalне копится. - Убери прямой publish. Теперь только outbox. В этот момент
поломки (если есть) станут видны через
outbox_lag_seconds— быстро, локально в сервисе, без последствий для Kafka. - Мониторь неделю. Убедись, что lag стабильный, ошибок нет, retention по cleanup настроен.
Шаги 3–4 критичны: нельзя просто «выключил прямой publish, включил outbox» — если что-то сломается в новой схеме, события теряются безмолвно. Dual-publish на время миграции даёт страховку.
FAQ¶
«Почему не CDC (Debezium)?» CDC читает WAL Postgres и вытаскивает изменения в Kafka без outbox-таблицы. Это элегантно, но требует: настроенный Kafka Connect, per-table whitelist, версионирование конфигов, отдельный lifecycle. Для наших объёмов прибавочная ценность не окупает сложности. Forwarder — меньше движущихся частей, внутри сервис-репо, без внешних зависимостей, кроме Kafka. Если когда-то выйдем на объёмы, где forwarder перестанет справляться — тогда отдельная задача на CDC. Пока нет.
«Можно ли Kafka использовать как outbox? Писать сразу в Kafka, а оттуда — в БД через consumer?» Нет, это возвращает dual-write: между publish в Kafka и commit в БД может упасть producer → событие опубликовано, БД не обновилась, consumer попытается применить изменение, которого нет в источнике. Outbox именно и решает этот случай.
«Outbox vs Event Sourcing?» Разное. Outbox — механизм доставки событий из источника в шину. Event Sourcing — хранение состояния как журнала событий (событие — первичный факт, состояние — производное). Комбинируются редко: если у тебя ES, тебе outbox не нужен, потому что журнал и есть outbox. У нас — обычные реляционные таблицы как источник правды, outbox как канал наружу.
«Publish делает RPC? Это не медленно?» PublishTx — это INSERT
в Postgres, не сетевой вызов в Kafka. Медленно только если outbox-
таблица заблокирована, что не штатно. На publisher-сайде задержка
измеряется сотнями микросекунд.
«А если мне нужна синхронная операция — ответить клиенту после того,
как событие дошло до consumer'а?» Это не outbox-use-case. Outbox —
про асинхронное распространение факта. Для синхронного взаимодействия
— HTTP internal endpoint (см.
api-composition.md).
«Почему не писать сразу envelope в payload-колонку, а metadata
отдельно?» Потому что watermill-sql именно так читает: разделение
payload/metadata — требование DefaultPostgreSQLSchema. Если
отойдёшь — придётся писать свой SchemaAdapter и поддерживать его.
Не надо.
Связанные разделы¶
../conventions/events.md— envelope, naming топиков, middleware stack, reference-уровень.../how-to/add-kafka-event.md— пошаговая процедура добавить новый тип события.../how-to/debug-outbox-lag.md— runbook, когда alert на lag сработал.idempotent-consumer.md— как consumer переживает at-least-once.cqrs.md— как поверх outbox+Kafka собрать типизированные handler'ы черезwatermill/components/cqrs.../conventions/db-pgx.md—InTx, миграции, advisory lock.../conventions/observability.md— метрики, трейсинг, алертинг.../conventions/shutdown.md— корректный drain forwarder'а при SIGTERM.../glossary.md— Forwarder, Envelope, Event-Id, Outbox.