Перейти к содержанию

Transactional Outbox

Deep-dive по паттерну. Если ты ищешь «как опубликовать событие по шагам» — открой ../how-to/add-kafka-event.md. Если ищешь «какие поля в envelope» — открой ../conventions/events.md. Эта страница отвечает на другой вопрос: как outbox работает под капотом, как его масштабировать, мониторить и как вытащить сервис из состояния, когда что-то сломалось.

Содержание

Что это и зачем

Outbox решает проблему dual-write: бизнес-операция пишет в Postgres и публикует событие в Kafka, и эти два действия должны быть согласованы. Без outbox возможны две поломки:

  1. Закоммитили транзакцию в БД → перед Publish процесс упал / сеть оборвалась → событие потеряно, downstream-сервисы не увидят факт, который уже отражён в БД источника.
  2. Опубликовали сначала в Kafka → транзакция откатилась → событие фантомное, downstream отреагирует на изменение, которого нет.

Outbox устраняет оба случая. В одной транзакции с бизнес-записью пишется строка в таблицу outbox в той же БД. Отдельный процесс (forwarder) забирает строки и публикует в Kafka с at-least-once гарантиями. Downstream обязан быть идемпотентным (см. idempotent-consumer.md) — это плата за простоту и атомарность записи в источнике.

Гарантии:

  • Атомарность записи бизнес-состояния и факта события — обеспечивает БД через общую транзакцию.
  • At-least-once доставки — обеспечивает forwarder: пока строка не помечена как published, её будут переопубликовывать.
  • Порядок per-aggregate в Kafka — обеспечивает partition key (см. §Ordering), а не outbox сам по себе.

Exactly-once здесь нет и не будет. Kafka даёт at-least-once, outbox повторяет эту семантику на своём уровне.

Когда использовать

Правило по умолчанию: всегда через outbox, если событие отражает изменение доменного состояния сервиса. Любой факт, который потом захочет прочитать другой сервис или аналитика, должен пройти через outbox: user.registered, user.banned, review.updated, photo.uploaded, moderation.approved.

Второе правило: если ты написал в БД и дальше дёрнул Kafka напрямую — ты создал dual-write. Даже если «всё работает на стенде». Починить эту логическую ошибку позже будет дороже, чем написать через outbox с самого начала.

Когда НЕ использовать

  • Ephemeral сигналы без привязки к DB-state. Пример: WebSocket-события «пользователь печатает», «активен на этой странице», live-позиция курсора. У них нет соответствующего изменения в БД, и потеря одной такой нотификации не ломает систему.
  • Fan-out внутри одного сервиса. Если publisher и subscriber живут в одном процессе и запись в БД им не нужна как общий факт — используй gochannel in-process pub/sub, не гоняй событие через Kafka.
  • Технические метрики. Prometheus counter, не Kafka event.

Архитектура

sequenceDiagram
    autonumber
    participant C as HTTP Client
    participant H as Handler
    participant S as Service
    participant DB as Postgres
    participant F as Forwarder
    participant K as Kafka
    participant D as Downstream

    C->>H: POST /v1/reviews
    H->>S: CreateReview(ctx, cmd)
    S->>DB: BEGIN
    S->>DB: INSERT INTO reviews ...
    S->>DB: INSERT INTO outbox ...
    S->>DB: COMMIT
    S-->>H: Review{ID: 42}
    H-->>C: 201 Created
    Note over F,DB: Polling, ~500ms
    F->>DB: SELECT FROM outbox WHERE offset_acked IS NULL
    F->>K: Publish(topic, envelope)
    F->>DB: UPDATE outbox SET offset_acked = NOW()
    K-->>D: review.created

Ключевые свойства:

  • Forwarder — отдельный worker-loop; либо внутри того же процесса сервиса (goroutine), либо отдельный Deployment. Мы по умолчанию запускаем goroutine в cmd/server/main.go — это проще и достаточно.
  • Transport из outbox в Kafka — через watermill-sql (subscriber) + watermill-kafka (publisher), склеенные watermill/components/forwarder.
  • Сам forwarder код не пишем — используем готовый из Watermill. Попытка написать свой (как в репозитории сервиса user, internal/event/worker.go) сразу становится антипаттерном: см. §Anti-patterns.

Schema outbox-таблицы

Схема совпадает с DefaultPostgreSQLSchema из watermill-sql/v3. DDL кладётся отдельной миграцией в каждый сервис-репо в свою схему:

-- review/migrations/XXX_outbox.up.sql
BEGIN;
SELECT pg_advisory_xact_lock(hashtext('review_migrations'));

CREATE TABLE review.outbox (
    offset_acked     BIGINT,
    offset_consumed  BIGSERIAL NOT NULL,
    uuid             VARCHAR(36) NOT NULL,
    created_at       TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    payload          JSON,
    metadata         JSON,
    transaction_id   xid8 NOT NULL DEFAULT pg_current_xact_id(),
    CONSTRAINT outbox_pk PRIMARY KEY (transaction_id, offset_consumed)
);

CREATE INDEX idx_outbox_acked
    ON review.outbox (offset_acked)
    WHERE offset_acked IS NULL;

COMMIT;

Пояснение полей:

  • offset_consumed — локальный монотонный номер в пределах таблицы (BIGSERIAL). Используется для сортировки при чтении.
  • offset_acked — выставляется в значение offset_consumed после успешной публикации. NULL — ещё не опубликовано.
  • uuid — ULID (Message.UUID), тот же, что доедет до Kafka и будет ключом в Deduplicator'е на consumer'е. Строится через eventmeta.New (см. ../conventions/events.md).
  • payload — JSON-тело события (ReviewCreatedV1, сериализованный в JSON).
  • metadata — JSON с envelope-заголовками (Event-Type, Schema-Version, Correlation-Id, Source-Service, Published-At, traceparent).
  • transaction_idxid8, идентификатор Postgres-транзакции. Именно он в связке с offset_consumed даёт корректный порядок чтения forwarder'ом: строки из одной транзакции читаются пачкой, строки из незакоммиченных транзакций пропускаются до момента их commit'а.
  • PRIMARY KEY (transaction_id, offset_consumed) — уникальность строки и естественный порядок для cursor-based чтения.

Partial-index по offset_acked IS NULL нужен, чтобы forwarder мог сканировать только неопубликованные строки, а не всю таблицу.

Имя таблицы — всегда outbox, схема — <service> (review.outbox, user.outbox, media.outbox). Одна таблица outbox на сервис. Одна таблица outbox на несколько сервисов — антипаттерн (см. §Anti-patterns).

Publisher side

Запись в outbox происходит внутри той же транзакции, что и бизнес-запись. Код в service-слое:

func (s *ReviewService) Create(ctx context.Context, cmd CreateReviewCommand) (*domain.Review, error) {
    var r *domain.Review
    err := s.db.InTx(ctx, func(tx pgx.Tx) error {
        saved, err := s.reviews.CreateTx(ctx, tx, cmd.UserID, cmd.PlaceID, cmd.Rating, cmd.Text)
        if err != nil {
            return err
        }
        r = saved

        payload := event.ReviewCreatedV1{
            ReviewID: saved.ID,
            PlaceID:  saved.PlaceID,
            UserID:   saved.UserID,
            Rating:   saved.Rating,
        }
        msg, err := eventmeta.New(ctx, eventmeta.Envelope{
            EventType:     "review.created",
            SchemaVersion: "1",
            Source:        "review",
            Payload:       payload,
        })
        if err != nil {
            return err
        }
        return s.outboxPublisher.PublishTx(ctx, tx, "outbox", msg)
    })
    if err != nil {
        return nil, fmt.Errorf("create review: %w", err)
    }
    return r, nil
}

Разобьём по кусочкам:

  • eventmeta.New строит *message.Message с заполненным Message.UUID (ULID), envelope-metadata (Event-Type, Schema-Version, Correlation-Id, Source-Service, Published-At, traceparent из OTel). Код helper'а — в ../conventions/events.md.
  • s.outboxPublisher — это watermill-sql Publisher, настроенный писать в таблицу review.outbox. Метод PublishTx — тонкий wrapper, который пишет строку, используя переданный pgx.Tx.
  • Первый аргумент "outbox" в PublishTxне Kafka-topic. Это логическое «имя канала» внутри outbox-таблицы. Мы всегда передаём "outbox" — по одной на сервис. Реальный Kafka-topic определится позже, в forwarder'е, по envelope-metadata.
  • InTx коммитит или откатывает обе записи атомарно. Если Publish упал — откатится и вставка в review.reviews. Если вставка в reviews упала — вставка в outbox тоже не произошла.

Правило: внутри InTx никогда не делай прямых Kafka-вызовов, HTTP-вызовов, внешних побочных эффектов. Только БД-операции через переданный tx. Детали — ../conventions/db-pgx.md.

Forwarder

Forwarder — компонент watermill/components/forwarder. Он читает строки из outbox-таблицы и публикует в Kafka.

Setup

import (
    watermillSQL "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
    "github.com/ThreeDotsLabs/watermill/components/forwarder"
    "github.com/ThreeDotsLabs/watermill/message"
)

// 1. Subscriber поверх Postgres — читает outbox.
sqlSub, err := watermillSQL.NewSubscriber(stdDB, watermillSQL.SubscriberConfig{
    SchemaAdapter: watermillSQL.DefaultPostgreSQLSchema{
        GenerateMessagesTableName: func(topic string) string { return "review.outbox" },
    },
    OffsetsAdapter:   watermillSQL.DefaultPostgreSQLOffsetsAdapter{},
    InitializeSchema: false, // схема создаётся миграцией
    PollInterval:     500 * time.Millisecond,
    ConsumerGroup:    "review-outbox-forwarder",
}, watermillLogger)

// 2. Kafka publisher — пишет в реальный Kafka.
kafkaPub, err := kafka.NewPublisher(
    kafka.PublisherConfig{Brokers: cfg.Kafka.Brokers, Marshaler: kafka.DefaultMarshaler{}},
    watermillLogger,
)

// 3. Router для применения middleware к forwarder'у.
router, err := message.NewRouter(message.RouterConfig{}, watermillLogger)

// 4. Сам forwarder.
fwd, err := forwarder.NewForwarder(sqlSub, kafkaPub, watermillLogger, forwarder.Config{
    ForwarderTopic: "outbox", // то же имя, что используется в PublishTx
    Router:         router,
    Middlewares: []message.HandlerMiddleware{
        middleware.Recoverer,
        otelMiddleware.Trace(tracer),       // пробрасывает traceparent в Kafka
        outboxMetricsMiddleware,            // publish_duration + errors
    },
})

go func() { _ = fwd.Run(ctx) }()
go func() { _ = router.Run(ctx) }()

Параметры, на которые обращай внимание:

  • PollInterval — как часто forwarder делает запрос к outbox. Default 500ms. Снижение (100ms) даёт меньшую end-to-end задержку, но увеличивает нагрузку на БД. Увеличение (2s) экономит ресурсы, но задерживает публикацию на столько же.
  • ConsumerGroup — имя consumer-group'ы watermill-sql для offsets-таблицы. У разных forwarder'ов должна быть разная группа, чтобы они не перебивали друг другу offset'ы.
  • ForwarderTopic — должен совпадать с topic'ом, который мы передаём в PublishTx на publisher-стороне.

Как forwarder выбирает Kafka-topic

Forwarder читает строку из outbox и публикует её в Kafka. Имя Kafka- топика берётся из envelope-metadata. Стандартный маппинг:

Kafka topic = "kazmaps." + Source-Service + "." + Event-Type

Пример: Source-Service=review, Event-Type=review.createdkazmaps.review.review.created.

Маппинг реализуется через middleware forwarder'а: handler читает msg.Metadata, выставляет целевой topic через механизм forwarder.Config.DecorateMessage (или через явный publisher-wrapper, который принимает topic как функцию от *message.Message). Формат имён — ../conventions/events.md.

Конкурентность и advisory locks

watermill-sql при чтении outbox использует SELECT ... FOR UPDATE SKIP LOCKED. Это безопасно при нескольких forwarder-инстансах: две реплики могут параллельно читать разные блоки строк, не блокируя друг друга. Внутри watermill-sql поверх этого есть advisory-lock, защищающий offsets-таблицу от гонок при обновлении позиции.

Это не значит, что надо специально поднимать несколько forwarder'ов. По умолчанию — один forwarder на сервис (одна реплика cmd/server/main.go, одна goroutine). Если throughput outbox превысит возможности одной реплики — тогда отдельный forwarder-deployment, но это редкий кейс.

Ordering guarantees

Это частый источник недопонимания.

  • Между разными транзакциями в пределах одного outbox: строки упорядочены по (transaction_id, offset_consumed), forwarder публикует в этом порядке. Если транзакция T2 закоммитилась после T1, её события уйдут в Kafka после событий T1.
  • Внутри одной транзакции: порядок по offset_consumed — в каком порядке вызывали PublishTx, в таком порядке окажется в Kafka.
  • Per-aggregate ordering в Kafkaне гарантируется автоматически outbox'ом. Если тебе важно, чтобы review.updated для одного ревью пришёл после review.created для того же ревью, ты должен обеспечить две вещи:
  • Оба события идут через outbox одного и того же сервиса (✓ по определению — сервис-владелец aggregate'а).
  • В Kafka partition key = aggregate_id. Kafka гарантирует порядок только внутри одной партиции. Если review.created и review.updated попадут в разные партиции, consumer увидит их в произвольном порядке.

Partition key выставляется forwarder-middleware при публикации в Kafka:

outboxMiddleware := func(h message.HandlerFunc) message.HandlerFunc {
    return func(msg *message.Message) ([]*message.Message, error) {
        if aggID := msg.Metadata.Get("Aggregate-Id"); aggID != "" {
            msg.Metadata.Set("partition_key", aggID)
        }
        return h(msg)
    }
}

Kafka publisher читает partition_key и хэширует его в номер партиции. Один aggregate → одна партиция → корректный порядок.

Не пытайся делать «свою логику упорядочивания» поверх outbox (таблица «ожидающих публикации», проверка last_seq и т.п.). Выбери правильный partition key, прими at-least-once и сделай consumer'ов идемпотентными.

Monitoring

Метрики

outbox_rows_total{service, status="unacked"}
    — gauge, количество неопубликованных строк.
outbox_forwarder_lag_seconds{service}
    — histogram, now() - MIN(created_at) по unacked, замеряется раз в тик.
outbox_forwarder_publish_duration_seconds{service, topic, result}
    — histogram, длительность kafka.Publish.
outbox_forwarder_errors_total{service, reason}
    — counter, reason ∈ {kafka_unavailable, marshal, db_read, db_ack}.
events_published_total{service, topic, event_type}
    — counter, инкрементируется после успешной публикации.

Выставляет эти метрики forwarder-middleware; код middleware живёт в pkg/eventmw/ сервис-репо. Подключение — в setup-блоке (см. ../conventions/observability.md).

Alerting

Шаблоны alert-правил (per-сервис, лежат в infra-репо, здесь — как приёмочные критерии):

  • Forwarder падает или не может публиковать. Alert: rate(outbox_forwarder_errors_total[5m]) > 0 в течение 10 минут → ticket.
  • Лаг растёт. Alert: outbox_forwarder_lag_seconds > 60 в течение 5 минут → ticket. Диагностика — ../how-to/debug-outbox-lag.md.
  • Backlog накапливается. Alert: outbox_rows_total{status="unacked"} > 1000 → ticket.
  • Совсем нет публикаций при наличии трафика. Alert: rate(http_requests_total{endpoint="/v1/reviews", method="POST", status="201"}[5m]) > 0 и rate(events_published_total{event_type="review.created"}[5m]) == 0 → страница.

Трейсинг

OTel-middleware в forwarder'е читает traceparent из envelope-metadata и продолжает тот же trace, что создал HTTP-handler на входе. В Tempo на одном waterfall видны: HTTP span → DB insert span → kafka.Publish span. Это и есть основной способ разобрать «почему между HTTP-ответом и появлением события в Kafka прошло 3 секунды».

Cleanup

Таблица outbox не растёт бесконечно. Acked-строки (offset_acked IS NOT NULL) старше 7 дней удаляются CronJob'ом:

DELETE FROM review.outbox
 WHERE offset_acked IS NOT NULL
   AND created_at < NOW() - INTERVAL '7 days';

CronJob — k8s Job с расписанием раз в сутки. Живёт в infra-репо, конфигурируется per-сервис.

Правила:

  • Никогда не удаляй unacked-строки. Если строка не опубликована — её должен обработать forwarder. Удалить = потерять событие.
  • Не делай retention меньше 3 дней. Если был инцидент в пятницу и мы разбираемся в понедельник, нужна возможность посмотреть payload исходного события.
  • Не клади cleanup в сам сервис. Это инфраструктурная задача, отдельный Job — отдельный lifecycle.

Failure modes

Сценарий Поведение Что делать
Forwarder-goroutine упала (panic) middleware.Recoverer поймал, goroutine перезапустилась (если wrapped в supervisor). Без supervisor — упала навсегда до рестарта pod'а. Всегда оборачивай fwd.Run(ctx) в retry-loop или полагайся на middleware.Recoverer + log. Alert на forwarder_errors.
Kafka недоступен Publish возвращает ошибку, строка в outbox остаётся unacked. Forwarder retry'ит на следующем тике. Лаг растёт. Alert на lag → диагностика Kafka → восстановление. Строки дрейн'ятся после восстановления.
DB pool исчерпан Forwarder ждёт свободное соединение → лаг растёт → alert. Выдели forwarder'у отдельный pool (3–5 соединений), изолированный от HTTP-handler'ов. Конфиг — в internal/config/.
Две реплики forwarder'а SELECT FOR UPDATE SKIP LOCKED + advisory lock защищают. Но дубли доставки в Kafka возможны на границах. По умолчанию — один forwarder на сервис. Если нужен HA — полагайся на k8s перезапуск pod'а, а не на multi-writer.
Падение между DB commit и публикацией Строка в outbox уже коммитнута, в Kafka ещё не ушло → forwarder возьмёт её на следующем тике. Ничего делать не надо, это штатный путь at-least-once.
Kafka принял, но forwarder упал до UPDATE offset_acked При рестарте forwarder прочитает ту же строку снова → дубль в Kafka. Ничего делать не надо. Consumer дедуплицирует по Message.UUID через Deduplicator middleware (см. idempotent-consumer.md).
Злой sql-запрос в outbox-таблице (DELETE вручную) Потеряны события. Запрещено делать руками. Только cleanup через CronJob. Доступ в прод DB — через pass-through, не direct.

Testing

Unit через gochannel

Для теста бизнес-handler'а — in-memory pub-sub. Publisher подставляется как gochannel.GoChannel, реальный outbox не нужен.

func TestReviewService_Create_PublishesEvent(t *testing.T) {
    pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil)
    defer pubsub.Close()

    sub, _ := pubsub.Subscribe(context.Background(), "outbox")

    svc := service.NewReviewService(fakeRepo, fakeDB, pubsub)
    _, err := svc.Create(ctx, service.CreateReviewCommand{UserID: 42, PlaceID: 7, Rating: 5})
    if err != nil {
        t.Fatalf("create: %v", err)
    }

    select {
    case msg := <-sub:
        if got := msg.Metadata.Get("Event-Type"); got != "review.created" {
            t.Fatalf("event-type: %q", got)
        }
    case <-time.After(2 * time.Second):
        t.Fatal("event not published")
    }
}

Такой тест покрывает: события строятся, envelope корректен, publish вызывается. Он не проверяет, что строка попала в Postgres-таблицу — для этого integration-тест.

Integration через testcontainers

Поднимается реальный Postgres, применяются миграции, в тесте service.Create → тест читает SELECT FROM review.outbox и убеждается, что строка появилась с правильным payload'ом.

func TestReviewService_Create_WritesOutboxRow(t *testing.T) {
    pool := setupPostgres(t)            // testcontainers Postgres + migrations
    stdDB := stdlib.OpenDBFromPool(pool) // *sql.DB поверх pgx для watermill-sql
    outboxPub := newOutboxPublisher(t, stdDB)

    svc := service.NewReviewService(
        postgres.NewReviewRepo(pool),
        pkgdb.New(pool),
        outboxPub,
    )
    _, err := svc.Create(ctx, service.CreateReviewCommand{UserID: 42, PlaceID: 7, Rating: 5})
    if err != nil {
        t.Fatalf("create: %v", err)
    }

    var count int
    err = pool.QueryRow(ctx,
        `SELECT count(*) FROM review.outbox WHERE metadata->>'Event-Type' = 'review.created'`,
    ).Scan(&count)
    if err != nil || count != 1 {
        t.Fatalf("outbox rows: got %d, err %v", count, err)
    }
}

End-to-end

Testcontainers Postgres + testcontainers Kafka + forwarder внутри того же теста. svc.Create → подождать сообщение из Kafka (через Watermill subscriber на тестовой группе). Такой тест держим в build-tag'е integration, запускается в отдельном Makefile-target'е.

Подробнее про testcontainers — ../conventions/testing.md.

Anti-patterns

Прямой kafka.Publisher.Publish в handler

// ПЛОХО: classic dual-write
func (s *ReviewService) Create(ctx context.Context, cmd CreateReviewCommand) (*domain.Review, error) {
    r, err := s.reviews.Create(ctx, cmd) // БД
    if err != nil { return nil, err }
    return r, s.kafkaPub.Publish("kazmaps.review.review.created", buildMsg(r)) // Kafka
}

Эта схема выглядит работающей на стенде и в unit-тестах. Она ломается только в проде под сетевыми сбоями: Kafka недоступна 200ms, а БД уже закоммичена → событие потеряно, никто не узнает. Пример такой ошибки — в репозитории сервиса review, файл internal/event/publisher.go: Publisher работает напрямую с kafka.Publisher, вызывается из handler'а после commit'а транзакции. Это надо переписать на outbox (см. §Migration).

Свой outbox-worker с ручным SELECT FOR UPDATE

В репозитории сервиса user, файлы internal/repository/postgres/outbox.go и internal/event/worker.go — ручной outbox: FetchUnpublished + MarkPublished по id-списку.

Почему это антипаттерн:

  • FetchUnpublished использует pool.Query(...) с FOR UPDATE SKIP LOCKED. Но pgx.Query без явной транзакции возвращает соединение в пул после закрытия rows — locks снимаются в момент автокоммита. В результате FOR UPDATE ничего не защищает: вторая реплика увидит те же строки «unlocked» и опубликует их дубли.
  • MarkPublished использует другое соединение из пула, отдельной транзакцией — то есть нет гарантии, что строка, которую мы пометили как published, это именно та строка, которую забирал FetchUnpublished.
  • Этот баг не ловится юнит-тестами (pool=1 connection, локи ведут себя иначе) и не ловится integration-тестом на одной реплике. Ловится только под продовой нагрузкой на нескольких репликах → дубли в Kafka → consumer-сайд падает от duplicates или молча повторяет side-effect'ы.

Вместо этого: всегда watermill-sql subscriber + forwarder. Они используют правильные транзакционные границы и offsets-таблицу для tracking'а.

Нет monitoring

Событие потерялось, retention в Kafka 7 дней, через месяц downstream сервис показывает «неправильные счётчики» и никто не понимает почему. Alert на outbox_forwarder_lag_seconds и outbox_rows_total{status="unacked"} закрывает этот класс проблем.

Агрессивный cleanup

DELETE FROM outbox WHERE created_at < NOW() - INTERVAL '1 hour' — после любого инцидента невозможно replay события. Не меньше 3 дней, рекомендуем 7.

Один общий outbox на все сервисы

«Возьмём один общий Postgres, одну таблицу outbox, все сервисы пишут туда» — ломает database-per-service. Любой сбой в этой общей БД блокирует все publish'ы всех сервисов. Каждый сервис — свой outbox в своей схеме.

PublishTx без InTx

// ПЛОХО: нет транзакции, есть dual-write
s.reviews.Create(ctx, r)
s.outboxPub.Publish("outbox", msg) // не PublishTx, не внутри tx

Мы потеряли атомарность: если между этими двумя строчками упадёт процесс, БД-строка есть, outbox-строки нет. Всегда InTx + PublishTx.

Tx снаружи InTx, ручной BEGIN/COMMIT

tx, _ := pool.Begin(ctx)
defer tx.Rollback(ctx)
// ... много кода ...
tx.Commit(ctx)

Проблема — легко забыть return после ошибки, легко забыть defer. Используй helper db.InTx — он управляет rollback/commit за тебя. См. ../conventions/db-pgx.md.

Migration: из no-outbox в outbox

План для сервиса, который сейчас публикует напрямую в Kafka (как review):

  1. Миграция БД. Добавь таблицу review.outbox по DDL из §Schema. Отдельная миграция, expand-only (ничего не ломается у живого сервиса, таблица пустая).
  2. Добавь outboxPublisher. В cmd/server/main.go создай watermill-sql publisher, пропиши в DI в service-слой.
  3. Dual-publish (временный шаг). В service-слое теперь и старый прямой kafka.Publish, и новый outboxPublisher.PublishTx. Это увеличит нагрузку: consumer получит дубли. Включи Deduplicator-middleware на consumer-сайде, если он ещё не включён.
  4. Включи forwarder. Запусти goroutine. Проверь логи/метрики: events_published_total инкрементится, outbox_rows_total не копится.
  5. Убери прямой publish. Теперь только outbox. В этот момент поломки (если есть) станут видны через outbox_lag_seconds — быстро, локально в сервисе, без последствий для Kafka.
  6. Мониторь неделю. Убедись, что lag стабильный, ошибок нет, retention по cleanup настроен.

Шаги 3–4 критичны: нельзя просто «выключил прямой publish, включил outbox» — если что-то сломается в новой схеме, события теряются безмолвно. Dual-publish на время миграции даёт страховку.

FAQ

«Почему не CDC (Debezium)?» CDC читает WAL Postgres и вытаскивает изменения в Kafka без outbox-таблицы. Это элегантно, но требует: настроенный Kafka Connect, per-table whitelist, версионирование конфигов, отдельный lifecycle. Для наших объёмов прибавочная ценность не окупает сложности. Forwarder — меньше движущихся частей, внутри сервис-репо, без внешних зависимостей, кроме Kafka. Если когда-то выйдем на объёмы, где forwarder перестанет справляться — тогда отдельная задача на CDC. Пока нет.

«Можно ли Kafka использовать как outbox? Писать сразу в Kafka, а оттуда — в БД через consumer?» Нет, это возвращает dual-write: между publish в Kafka и commit в БД может упасть producer → событие опубликовано, БД не обновилась, consumer попытается применить изменение, которого нет в источнике. Outbox именно и решает этот случай.

«Outbox vs Event Sourcing?» Разное. Outbox — механизм доставки событий из источника в шину. Event Sourcing — хранение состояния как журнала событий (событие — первичный факт, состояние — производное). Комбинируются редко: если у тебя ES, тебе outbox не нужен, потому что журнал и есть outbox. У нас — обычные реляционные таблицы как источник правды, outbox как канал наружу.

«Publish делает RPC? Это не медленно?» PublishTx — это INSERT в Postgres, не сетевой вызов в Kafka. Медленно только если outbox- таблица заблокирована, что не штатно. На publisher-сайде задержка измеряется сотнями микросекунд.

«А если мне нужна синхронная операция — ответить клиенту после того, как событие дошло до consumer'а?» Это не outbox-use-case. Outbox — про асинхронное распространение факта. Для синхронного взаимодействия — HTTP internal endpoint (см. api-composition.md).

«Почему не писать сразу envelope в payload-колонку, а metadata отдельно?» Потому что watermill-sql именно так читает: разделение payload/metadata — требование DefaultPostgreSQLSchema. Если отойдёшь — придётся писать свой SchemaAdapter и поддерживать его. Не надо.

Связанные разделы