Skip to Content
ConventionsСобытия (Kafka)

События: Watermill, Kafka, outbox

Все межсервисные события ходят через Kafka. Мы работаем с Kafka через Watermill : publisher, subscriber, middleware, testing — всё стандартное. Напрямую sarama / kafka-go не используем.

End-to-end поток от HTTP-handler’а на producer-сервисе до handler’а на consumer-сервисе:

Ключевые свойства: запись в БД и строку в outbox атомарна, forwarder отдельно переносит строки в Kafka (at-least-once), consumer-middleware дедуплицирует retry-дубли. Producer и consumer — разные сервисы с разными БД.

TL;DR

Минимум правил для быстрой сверки (подробности — по разделам ниже):

  • Транспорт: только Watermill поверх Kafka. sarama/kafka-go напрямую не используем.
  • Publish: только через outbox + PublishTx внутри InTx. Прямой kafka.Publish из handler’а — dual-write, запрещён.
  • Topic name: kazmaps.<service>.<entity>.<action>, action — в прошедшем времени (created, banned).
  • Envelope metadata (обязательны): Event-Type, Schema-Version, Correlation-Id, Source-Service, Published-At, traceparent. Message.UUID — отдельно, ULID.
  • Consumer middleware stack (строгий порядок): CorrelationID → Recoverer → PoisonQueue → Retry → Deduplicator. PoisonQueue до Retry, Deduplicatorпосле.
  • Дедупликация: ключ — Message.UUID, хранилище — Redis SETNX, TTL 24h (см. TTL-таблицу per use-case). Дедуп по business-key — запрещён.
  • Consumer-group: <service>-<handler> (notification-on-review-created). Никаких обобщённых consumer/default.
  • Идемпотентность handler’а обязательна — даже при работающем Deduplicator’е. UPSERT по natural key + гейт side effects по RowsAffected.
  • DLQ: <topic>.dlq, retention 30 дней. Разбор — вручную по инциденту (см. ../troubleshooting/kafka-consumer-stuck).
  • Fail-open vs fail-closed на dedup — fail-closed дефолт; fail-open только при DB-level идемпотентности handler’а. Детали — ../patterns/idempotent-consumer.

Содержание

Transport и топики

Брокер

  • Локально — один Kafka в docker-compose.yml (KRaft-mode, один узел).
  • Продовый брокер поднимается отдельно; env-переменная KAFKA_BROKERS задаёт endpoint’ы.
  • Сериализация — JSON. Protobuf / Avro не используем.

Naming топиков

Формат:

kazmaps.<service>.<entity>

Примеры:

kazmaps.user.account kazmaps.user.push_token kazmaps.review.review kazmaps.media.photo kazmaps.catalog.place kazmaps.indoor.building
  • <service> — producer.
  • <entity> — существительное в единственном числе.
  • Действие (created, updated, deleted, …) — НЕ в имени топика, а в заголовке Event-Type. Топик = поток событий одной сущности.
  • Ключ записи = id сущности — все события сущности идут в одну партицию и обрабатываются строго по порядку (обоснование — в event-catalog, «Почему сущность, а не событие»).

DLQ

Для каждого consumer-топика заводим DLQ с суффиксом .dlq:

kazmaps.review.review — основной топик kazmaps.review.review.dlq — DLQ

В DLQ уходят сообщения, которые падают после исчерпания retry (см. ниже).

Envelope

Каждое сообщение несёт набор headers (Watermill metadata). Это обязательно — без этих полей сообщение отклоняется на consumer’е.

Обязательные поля

ПолеЧто этоКак заполнять
Message.UUIDИдентификатор событияULID, один раз на событие
Metadata["Event-Type"]Тип событияreview.created, user.banned
Metadata["Schema-Version"]Версия схемы payload’а"1", "2", …
Metadata["Correlation-Id"]Трассировка операцииПриходит из HTTP-запроса / наследуется
Metadata["Source-Service"]Имя producer-сервиса"review", "user"
Metadata["Published-At"]Время публикацииRFC3339Nano
Metadata["traceparent"]W3C trace contextИз OpenTelemetry propagator

Пример построения сообщения

Helper eventmeta.New(ctx, Envelope{...}) — единая точка сборки сообщения. Никто не заполняет metadata-поля вручную; если в коде встречаешь прямой msg.Metadata.Set(...) без helper’а — это баг.

package eventmeta import ( "encoding/json" "time" "github.com/ThreeDotsLabs/watermill/message" "github.com/oklog/ulid/v2" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" ) type Envelope struct { EventType string SchemaVersion string Source string Payload any } func New(ctx context.Context, e Envelope) (*message.Message, error) { body, err := json.Marshal(e.Payload) if err != nil { return nil, fmt.Errorf("marshal payload: %w", err) } msg := message.NewMessage(ulid.Make().String(), body) msg.Metadata.Set("Event-Type", e.EventType) msg.Metadata.Set("Schema-Version", e.SchemaVersion) msg.Metadata.Set("Source-Service", e.Source) msg.Metadata.Set("Published-At", time.Now().UTC().Format(time.RFC3339Nano)) if corr := correlationIDFromCtx(ctx); corr != "" { msg.Metadata.Set("Correlation-Id", corr) } // Пробрасываем W3C trace context carrier := propagation.MapCarrier{} otel.GetTextMapPropagator().Inject(ctx, carrier) if tp := carrier["traceparent"]; tp != "" { msg.Metadata.Set("traceparent", tp) } return msg, nil }

Schema-Version

  • Инкрементируй, когда добавляешь/переименовываешь/удаляешь поля в payload.
  • Добавление опционального поля совместимо — оставляй ту же версию, но в handler’ах проверяй nil.
  • Любое breaking-изменение (удаление поля, смена типа) — новая версия + пишешь в оба топика параллельно, пока consumer’ы не переехали (dual-publish).

Публикация: outbox pattern

Никогда не публикуй в Kafka напрямую из бизнес-логики. Всегда через outbox.

Почему

Если ты сначала закоммитил tx, а потом упал перед publisher.Publish — событие потеряно. Если сначала опубликовал, а потом tx откатился — консьюмер увидел фантомное событие. Outbox решает это: запись в outbox идёт в той же транзакции, что и бизнес-операция, а отдельный forwarder переносит её в Kafka.

Таблица outbox

Каждый сервис держит свою таблицу outbox:

CREATE TABLE <schema>.outbox ( id BIGSERIAL PRIMARY KEY, aggregate_type TEXT NOT NULL, -- "user", "review", "media" aggregate_id TEXT NOT NULL, -- ID сущности как строка topic TEXT NOT NULL, -- полное имя топика payload JSONB NOT NULL, -- тело события headers JSONB NOT NULL, -- envelope metadata created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), published_at TIMESTAMPTZ ); CREATE INDEX idx_outbox_unpublished ON <schema>.outbox (id) WHERE published_at IS NULL;

Publisher через watermill-sql

В cmd/server/main.go создаётся SQL-publisher, который пишет в таблицу:

import watermillSQL "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" sqlPub, err := watermillSQL.NewPublisher( stdDB, // *sql.DB поверх того же Postgres watermillSQL.PublisherConfig{ SchemaAdapter: watermillSQL.DefaultPostgreSQLSchema{GenerateMessagesTableName: outboxTableName}, AutoInitializeSchema: false, // таблицу создаёт migration }, watermillLogger, )

В service-слое используешь его как обычный publisher, обёрнутый в db.InTx:

err := s.db.InTx(ctx, func(tx pgx.Tx) error { if err := s.reviews.CreateTx(ctx, tx, review); err != nil { return err } msg, err := event.NewMessage(ctx, event.Envelope{ EventType: "review.created", SchemaVersion: "1", Source: "review", Payload: ReviewCreatedPayload{ReviewID: review.ID, ...}, }) if err != nil { return err } return s.outboxPublisher.PublishTx(ctx, tx, "kazmaps.review.review", msg) })

PublishTx — тонкий wrapper, который пишет row в outbox, используя переданный pgx.Tx (через adapter *sql.Tx или прямой insert). Commit происходит в InTx — событие и запись сохранены атомарно.

Forwarder

Отдельный компонент переносит unpublished-строки из outbox в Kafka. Используй components/forwarder из Watermill:

import "github.com/ThreeDotsLabs/watermill/components/forwarder" fwd, err := forwarder.NewForwarder(forwarder.Config{ ForwarderTopic: outboxTableName, Router: router, Subscriber: sqlSubscriber, // читает из outbox Publisher: kafkaPub, // пишет в Kafka }, watermillLogger) go fwd.Run(ctx)

Forwarder:

  • читает через SELECT ... FOR UPDATE SKIP LOCKED — несколько replicas работают параллельно,
  • помечает строку как published (или удаляет) после успешной публикации в Kafka,
  • при ошибке Kafka оставляет строку — на следующем тике заберётся снова.

Никакого собственного outbox-worker’а не пишем. Используем forwarder.

Consumer: Router и middleware stack

Consumer строится через message.Router. Middleware подключаются в строгом порядке:

router, err := message.NewRouter(message.RouterConfig{}, watermillLogger) if err != nil { return fmt.Errorf("router: %w", err) } router.AddMiddleware( middleware.CorrelationID, // 1. прокидывает Correlation-Id из metadata в ctx middleware.Recoverer, // 2. ловит panic, возвращает error poisonQueue(kafkaPub, dlqTopic), // 3. отправляет в DLQ после исчерпания retry middleware.Retry{ // 4. повторы с экспоненциальным backoff + jitter MaxRetries: 5, InitialInterval: 500 * time.Millisecond, Multiplier: 2.0, MaxInterval: 30 * time.Second, Logger: watermillLogger, }.Middleware, deduplicator(redisClient), // 5. idempotency через Redis SETNX )

Порядок важен:

  • CorrelationIDпервый, чтобы все последующие middleware и handler видели correlation id в ctx.
  • Recoverer — превращает panic в error, чтобы retry его увидел.
  • PoisonQueueдо Retry, потому что poison срабатывает только когда Retry исчерпан (Retry пробрасывает ошибку дальше, PoisonQueue её ловит).
  • Retry — с backoff и jitter. 5 попыток, начиная с 500ms.
  • Deduplicatorпоследний, чтобы retry внутри одного запроса не падал на «уже обработано». Дедуп по Event-Id (или Message.UUID).

Handler

Сигнатура handler’а:

func (h *ReviewCreatedHandler) Handle(msg *message.Message) error { eventType := msg.Metadata.Get("Event-Type") if eventType != "review.created" { return nil // skip — чужое событие на том же топике } var p ReviewCreatedPayload if err := json.Unmarshal(msg.Payload, &p); err != nil { return fmt.Errorf("unmarshal %s: %w", eventType, err) } ctx := msg.Context() if err := h.service.OnReviewCreated(ctx, p); err != nil { return fmt.Errorf("on review created: %w", err) } return nil }

Правила:

  • Возврат nil — успех, offset коммитится.
  • Возврат error — Retry обработает, потом PoisonQueue отправит в DLQ.
  • Не делай msg.Ack() / msg.Nack() вручную — этим занимается router.
  • Handler должен быть идемпотентным: тот же Event-Id не должен создать дубликат сущности. Deduplicator ловит retry в памяти/redis, но consumer тоже должен быть идемпотентным на уровне БД (unique constraint, upsert).

Регистрация handler’а

router.AddHandler( "review.created.onUserActivity", // handler name — уникален в router "kazmaps.review.review", // subscribe topic kafkaSubscriber, "", // publish topic (пусто — handler не публикует) nil, handler.Handle, )

Publisher middleware

На publisher’е (на SQL-publisher’е outbox) применяем:

  • CorrelationID — подхватывает correlation из ctx и пишет в metadata.
  • OpenTelemetry — инжектит traceparent в metadata.
  • Metrics — инкрементирует prometheus-counter events_published_total{topic, event_type}.

Эти middleware живут в pkg/eventmw/ внутри сервис-репо. Пока shared-библиотека не вынесена в отдельный репозиторий, между сервисами они синхронизируются копированием — за этим следят владельцы сервисов.

Эталонная реализация Publisher и middleware stack — в pkg/eventmw/ одного из сервис-репо; конкретный путь в коде указан в README сервиса (см. service-readme).

Тестирование

Unit-тесты через gochannel

Для тестов не поднимай Kafka. Используй gochannel.GoChannel — in-memory pub-sub, совместимый с Watermill API:

import "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" func TestReviewCreatedHandler(t *testing.T) { pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil) defer pubsub.Close() handler := NewReviewCreatedHandler(fakeService) router, _ := message.NewRouter(message.RouterConfig{}, nil) router.AddHandler("test", "topic", pubsub, "", nil, handler.Handle) go router.Run(context.Background()) <-router.Running() msg := message.NewMessage("evt-1", []byte(`{"review_id":42}`)) msg.Metadata.Set("Event-Type", "review.created") require.NoError(t, pubsub.Publish("topic", msg)) // проверь, что fakeService получил вызов }

Integration-тесты с реальным Kafka

Если тест должен проверить интеграцию с реальным Kafka — используй testcontainers-go:

kafkaCt, _ := kafka.Run(ctx, "confluentinc/cp-kafka:7.6.0") brokers, _ := kafkaCt.Brokers(ctx)

Такие тесты живут в *_integration_test.go под build-tag’ом integration — чтобы не тормозили обычный make test.

Что не делать

  • Не используй sarama / kafka-go напрямую. Только через Watermill.
  • Не публикуй в Kafka из handler’а HTTP. Всегда через outbox.
  • Не делай sync request-reply через Kafka. Для RPC — HTTP internal endpoint.
  • Не коммить offset руками. Router делает это после успешного handler.
  • Не делай handler, зависящий от порядка сообщений между топиками. Порядок гарантируется только внутри одной партиции одного топика.
  • Не пиши секреты / PII в payload события. См. logging — те же правила маскирования применяются к событиям.

См. также

Last updated on