Перейти к содержанию

События: Watermill, Kafka, outbox

Все межсервисные события ходят через Kafka. Мы работаем с Kafka через Watermill: publisher, subscriber, middleware, testing — всё стандартное. Напрямую sarama / kafka-go не используем.

End-to-end поток от HTTP-handler'а на producer-сервисе до handler'а на consumer-сервисе:

flowchart LR
    subgraph Producer["producer-сервис"]
        H[HTTP handler]
        S[service слой]
        DB[(Postgres)]
        OB[outbox таблица]
        FWD[forwarder goroutine]
    end
    subgraph Kafka
        T["topic<br/>kazmaps.review.review.created"]
        DLQ["topic .dlq"]
    end
    subgraph Consumer["consumer-сервис"]
        SUB[Kafka subscriber]
        MW["middleware stack:<br/>CorrelationID → Recoverer →<br/>PoisonQueue → Retry →<br/>Deduplicator"]
        HND[event handler]
        CDB[(Postgres)]
    end

    H --> S
    S -->|InTx| DB
    S -->|InTx, PublishTx| OB
    FWD -->|SELECT FOR UPDATE| OB
    FWD -->|Publish| T
    T --> SUB
    SUB --> MW
    MW -->|ok| HND
    MW -.->|retry exhausted| DLQ
    HND --> CDB

Ключевые свойства: запись в БД и строку в outbox атомарна, forwarder отдельно переносит строки в Kafka (at-least-once), consumer-middleware дедуплицирует retry-дубли. Producer и consumer — разные сервисы с разными БД.

Содержание

Transport и топики

Брокер

  • Локально — один Kafka в docker-compose.yml (KRaft-mode, один узел).
  • Продовый брокер поднимается отдельно; env-переменная KAFKA_BROKERS задаёт endpoint'ы.
  • Сериализация — JSON. Protobuf / Avro не используем.

Naming топиков

Формат:

kazmaps.<service>.<entity>.<action>

Примеры:

kazmaps.user.user.registered
kazmaps.user.user.banned
kazmaps.review.review.created
kazmaps.review.review.updated
kazmaps.media.photo.uploaded
kazmaps.media.moderation.approved
  • <service> — producer.
  • <entity> — существительное в единственном числе.
  • <action> — прошедшее время (факт уже случился): created, updated, deleted, completed, failed.

DLQ

Для каждого consumer-топика заводим DLQ с суффиксом .dlq:

kazmaps.review.review.created        — основной топик
kazmaps.review.review.created.dlq    — DLQ

В DLQ уходят сообщения, которые падают после исчерпания retry (см. ниже).

Envelope

Каждое сообщение несёт набор headers (Watermill metadata). Это обязательно — без этих полей сообщение отклоняется на consumer'е.

Обязательные поля

Поле Что это Как заполнять
Message.UUID Идентификатор события ULID, один раз на событие
Metadata["Event-Type"] Тип события review.created, user.banned
Metadata["Schema-Version"] Версия схемы payload'а "1", "2", ...
Metadata["Correlation-Id"] Трассировка операции Приходит из HTTP-запроса / наследуется
Metadata["Source-Service"] Имя producer-сервиса "review", "user"
Metadata["Published-At"] Время публикации RFC3339Nano
Metadata["traceparent"] W3C trace context Из OpenTelemetry propagator

Пример построения сообщения

Helper eventmeta.New(ctx, Envelope{...}) — единая точка сборки сообщения. Никто не заполняет metadata-поля вручную; если в коде встречаешь прямой msg.Metadata.Set(...) без helper'а — это баг.

package eventmeta

import (
    "encoding/json"
    "time"

    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/oklog/ulid/v2"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/propagation"
)

type Envelope struct {
    EventType     string
    SchemaVersion string
    Source        string
    Payload       any
}

func New(ctx context.Context, e Envelope) (*message.Message, error) {
    body, err := json.Marshal(e.Payload)
    if err != nil {
        return nil, fmt.Errorf("marshal payload: %w", err)
    }

    msg := message.NewMessage(ulid.Make().String(), body)
    msg.Metadata.Set("Event-Type", e.EventType)
    msg.Metadata.Set("Schema-Version", e.SchemaVersion)
    msg.Metadata.Set("Source-Service", e.Source)
    msg.Metadata.Set("Published-At", time.Now().UTC().Format(time.RFC3339Nano))

    if corr := correlationIDFromCtx(ctx); corr != "" {
        msg.Metadata.Set("Correlation-Id", corr)
    }

    // Пробрасываем W3C trace context
    carrier := propagation.MapCarrier{}
    otel.GetTextMapPropagator().Inject(ctx, carrier)
    if tp := carrier["traceparent"]; tp != "" {
        msg.Metadata.Set("traceparent", tp)
    }
    return msg, nil
}

Schema-Version

  • Инкрементируй, когда добавляешь/переименовываешь/удаляешь поля в payload.
  • Добавление опционального поля совместимо — оставляй ту же версию, но в handler'ах проверяй nil.
  • Любое breaking-изменение (удаление поля, смена типа) — новая версия + пишешь в оба топика параллельно, пока consumer'ы не переехали (dual-publish).

Публикация: outbox pattern

Никогда не публикуй в Kafka напрямую из бизнес-логики. Всегда через outbox.

Почему

Если ты сначала закоммитил tx, а потом упал перед publisher.Publish — событие потеряно. Если сначала опубликовал, а потом tx откатился — консьюмер увидел фантомное событие. Outbox решает это: запись в outbox идёт в той же транзакции, что и бизнес-операция, а отдельный forwarder переносит её в Kafka.

Таблица outbox

Каждый сервис держит свою таблицу outbox:

CREATE TABLE <schema>.outbox (
    id              BIGSERIAL   PRIMARY KEY,
    aggregate_type  TEXT        NOT NULL,    -- "user", "review", "media"
    aggregate_id    TEXT        NOT NULL,    -- ID сущности как строка
    topic           TEXT        NOT NULL,    -- полное имя топика
    payload         JSONB       NOT NULL,    -- тело события
    headers         JSONB       NOT NULL,    -- envelope metadata
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMPTZ
);

CREATE INDEX idx_outbox_unpublished
    ON <schema>.outbox (id)
    WHERE published_at IS NULL;

Publisher через watermill-sql

В cmd/server/main.go создаётся SQL-publisher, который пишет в таблицу:

import watermillSQL "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"

sqlPub, err := watermillSQL.NewPublisher(
    stdDB, // *sql.DB поверх того же Postgres
    watermillSQL.PublisherConfig{
        SchemaAdapter:        watermillSQL.DefaultPostgreSQLSchema{GenerateMessagesTableName: outboxTableName},
        AutoInitializeSchema: false, // таблицу создаёт migration
    },
    watermillLogger,
)

В service-слое используешь его как обычный publisher, обёрнутый в db.InTx:

err := s.db.InTx(ctx, func(tx pgx.Tx) error {
    if err := s.reviews.CreateTx(ctx, tx, review); err != nil {
        return err
    }
    msg, err := event.NewMessage(ctx, event.Envelope{
        EventType:     "review.created",
        SchemaVersion: "1",
        Source:        "review",
        Payload:       ReviewCreatedPayload{ReviewID: review.ID, ...},
    })
    if err != nil {
        return err
    }
    return s.outboxPublisher.PublishTx(ctx, tx, "kazmaps.review.review.created", msg)
})

PublishTx — тонкий wrapper, который пишет row в outbox, используя переданный pgx.Tx (через adapter *sql.Tx или прямой insert). Commit происходит в InTx — событие и запись сохранены атомарно.

Forwarder

Отдельный компонент переносит unpublished-строки из outbox в Kafka. Используй components/forwarder из Watermill:

import "github.com/ThreeDotsLabs/watermill/components/forwarder"

fwd, err := forwarder.NewForwarder(forwarder.Config{
    ForwarderTopic: outboxTableName,
    Router:         router,
    Subscriber:     sqlSubscriber, // читает из outbox
    Publisher:      kafkaPub,      // пишет в Kafka
}, watermillLogger)

go fwd.Run(ctx)

Forwarder: - читает через SELECT ... FOR UPDATE SKIP LOCKED — несколько replicas работают параллельно, - помечает строку как published (или удаляет) после успешной публикации в Kafka, - при ошибке Kafka оставляет строку — на следующем тике заберётся снова.

Никакого собственного outbox-worker'а не пишем. Используем forwarder.

Consumer: Router и middleware stack

Consumer строится через message.Router. Middleware подключаются в строгом порядке:

router, err := message.NewRouter(message.RouterConfig{}, watermillLogger)
if err != nil {
    return fmt.Errorf("router: %w", err)
}

router.AddMiddleware(
    middleware.CorrelationID,          // 1. прокидывает Correlation-Id из metadata в ctx
    middleware.Recoverer,              // 2. ловит panic, возвращает error
    poisonQueue(kafkaPub, dlqTopic),   // 3. отправляет в DLQ после исчерпания retry
    middleware.Retry{                  // 4. повторы с экспоненциальным backoff + jitter
        MaxRetries:      5,
        InitialInterval: 500 * time.Millisecond,
        Multiplier:      2.0,
        MaxInterval:     30 * time.Second,
        Logger:          watermillLogger,
    }.Middleware,
    deduplicator(redisClient),         // 5. idempotency через Redis SETNX
)

Порядок важен:

  • CorrelationIDпервый, чтобы все последующие middleware и handler видели correlation id в ctx.
  • Recoverer — превращает panic в error, чтобы retry его увидел.
  • PoisonQueueдо Retry, потому что poison срабатывает только когда Retry исчерпан (Retry пробрасывает ошибку дальше, PoisonQueue её ловит).
  • Retry — с backoff и jitter. 5 попыток, начиная с 500ms.
  • Deduplicatorпоследний, чтобы retry внутри одного запроса не падал на «уже обработано». Дедуп по Event-Id (или Message.UUID).

Handler

Сигнатура handler'а:

func (h *ReviewCreatedHandler) Handle(msg *message.Message) error {
    eventType := msg.Metadata.Get("Event-Type")
    if eventType != "review.created" {
        return nil // skip — чужое событие на том же топике
    }

    var p ReviewCreatedPayload
    if err := json.Unmarshal(msg.Payload, &p); err != nil {
        return fmt.Errorf("unmarshal %s: %w", eventType, err)
    }

    ctx := msg.Context()
    if err := h.service.OnReviewCreated(ctx, p); err != nil {
        return fmt.Errorf("on review created: %w", err)
    }
    return nil
}

Правила: - Возврат nil — успех, offset коммитится. - Возврат error — Retry обработает, потом PoisonQueue отправит в DLQ. - Не делай msg.Ack() / msg.Nack() вручную — этим занимается router. - Handler должен быть идемпотентным: тот же Event-Id не должен создать дубликат сущности. Deduplicator ловит retry в памяти/redis, но consumer тоже должен быть идемпотентным на уровне БД (unique constraint, upsert).

Регистрация handler'а

router.AddHandler(
    "review.created.onUserActivity",          // handler name — уникален в router
    "kazmaps.review.review.created",           // subscribe topic
    kafkaSubscriber,
    "",                                        // publish topic (пусто — handler не публикует)
    nil,
    handler.Handle,
)

Publisher middleware

На publisher'е (на SQL-publisher'е outbox) применяем:

  • CorrelationID — подхватывает correlation из ctx и пишет в metadata.
  • OpenTelemetry — инжектит traceparent в metadata.
  • Metrics — инкрементирует prometheus-counter events_published_total{topic, event_type}.

Эти middleware живут в pkg/eventmw/ внутри сервис-репо. Пока shared-библиотека не вынесена в отдельный репозиторий, между сервисами они синхронизируются копированием — за этим следят владельцы сервисов.

Эталонная реализация Publisher и middleware stack — в репозитории сервиса review, файл internal/event/publisher.go.

Тестирование

Unit-тесты через gochannel

Для тестов не поднимай Kafka. Используй gochannel.GoChannel — in-memory pub-sub, совместимый с Watermill API:

import "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"

func TestReviewCreatedHandler(t *testing.T) {
    pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil)
    defer pubsub.Close()

    handler := NewReviewCreatedHandler(fakeService)

    router, _ := message.NewRouter(message.RouterConfig{}, nil)
    router.AddHandler("test", "topic", pubsub, "", nil, handler.Handle)

    go router.Run(context.Background())
    <-router.Running()

    msg := message.NewMessage("evt-1", []byte(`{"review_id":42}`))
    msg.Metadata.Set("Event-Type", "review.created")
    require.NoError(t, pubsub.Publish("topic", msg))

    // проверь, что fakeService получил вызов
}

Integration-тесты с реальным Kafka

Если тест должен проверить интеграцию с реальным Kafka — используй testcontainers-go:

kafkaCt, _ := kafka.Run(ctx, "confluentinc/cp-kafka:7.6.0")
brokers, _ := kafkaCt.Brokers(ctx)

Такие тесты живут в *_integration_test.go под build-tag'ом integration — чтобы не тормозили обычный make test.

Что не делать

  • Не используй sarama / kafka-go напрямую. Только через Watermill.
  • Не публикуй в Kafka из handler'а HTTP. Всегда через outbox.
  • Не делай sync request-reply через Kafka. Для RPC — HTTP internal endpoint.
  • Не коммить offset руками. Router делает это после успешного handler.
  • Не делай handler, зависящий от порядка сообщений между топиками. Порядок гарантируется только внутри одной партиции одного топика.
  • Не пиши секреты / PII в payload события. См. logging.md — те же правила маскирования применяются к событиям.

См. также