События: Watermill, Kafka, outbox
Все межсервисные события ходят через Kafka. Мы работаем с Kafka через
Watermill : publisher, subscriber, middleware,
testing — всё стандартное. Напрямую sarama / kafka-go не используем.
End-to-end поток от HTTP-handler’а на producer-сервисе до handler’а на consumer-сервисе:
Ключевые свойства: запись в БД и строку в outbox атомарна, forwarder отдельно переносит строки в Kafka (at-least-once), consumer-middleware дедуплицирует retry-дубли. Producer и consumer — разные сервисы с разными БД.
TL;DR
Минимум правил для быстрой сверки (подробности — по разделам ниже):
- Транспорт: только Watermill поверх Kafka.
sarama/kafka-goнапрямую не используем. - Publish: только через outbox +
PublishTxвнутриInTx. Прямойkafka.Publishиз handler’а — dual-write, запрещён. - Topic name:
kazmaps.<service>.<entity>.<action>, action — в прошедшем времени (created,banned). - Envelope metadata (обязательны):
Event-Type,Schema-Version,Correlation-Id,Source-Service,Published-At,traceparent.Message.UUID— отдельно, ULID. - Consumer middleware stack (строгий порядок):
CorrelationID → Recoverer → PoisonQueue → Retry → Deduplicator.PoisonQueueдоRetry,Deduplicator— после. - Дедупликация: ключ —
Message.UUID, хранилище — Redis SETNX, TTL 24h (см. TTL-таблицу per use-case). Дедуп по business-key — запрещён. - Consumer-group:
<service>-<handler>(notification-on-review-created). Никаких обобщённыхconsumer/default. - Идемпотентность handler’а обязательна — даже при работающем
Deduplicator’е. UPSERT по natural key + гейт side effects по
RowsAffected. - DLQ:
<topic>.dlq, retention 30 дней. Разбор — вручную по инциденту (см.../troubleshooting/kafka-consumer-stuck). - Fail-open vs fail-closed на dedup — fail-closed дефолт;
fail-open только при DB-level идемпотентности handler’а.
Детали —
../patterns/idempotent-consumer.
Содержание
- Transport и топики
- Envelope
- Публикация: outbox pattern
- Consumer: Router и middleware stack
- Publisher middleware
- Тестирование
- Что не делать
- См. также
Transport и топики
Брокер
- Локально — один Kafka в
docker-compose.yml(KRaft-mode, один узел). - Продовый брокер поднимается отдельно; env-переменная
KAFKA_BROKERSзадаёт endpoint’ы. - Сериализация — JSON. Protobuf / Avro не используем.
Naming топиков
Формат:
kazmaps.<service>.<entity>Примеры:
kazmaps.user.account
kazmaps.user.push_token
kazmaps.review.review
kazmaps.media.photo
kazmaps.catalog.place
kazmaps.indoor.building<service>— producer.<entity>— существительное в единственном числе.- Действие (
created,updated,deleted, …) — НЕ в имени топика, а в заголовкеEvent-Type. Топик = поток событий одной сущности. - Ключ записи = id сущности — все события сущности идут в одну партицию и обрабатываются строго по порядку (обоснование — в event-catalog, «Почему сущность, а не событие»).
DLQ
Для каждого consumer-топика заводим DLQ с суффиксом .dlq:
kazmaps.review.review — основной топик
kazmaps.review.review.dlq — DLQВ DLQ уходят сообщения, которые падают после исчерпания retry (см. ниже).
Envelope
Каждое сообщение несёт набор headers (Watermill metadata). Это обязательно — без этих полей сообщение отклоняется на consumer’е.
Обязательные поля
| Поле | Что это | Как заполнять |
|---|---|---|
Message.UUID | Идентификатор события | ULID, один раз на событие |
Metadata["Event-Type"] | Тип события | review.created, user.banned |
Metadata["Schema-Version"] | Версия схемы payload’а | "1", "2", … |
Metadata["Correlation-Id"] | Трассировка операции | Приходит из HTTP-запроса / наследуется |
Metadata["Source-Service"] | Имя producer-сервиса | "review", "user" |
Metadata["Published-At"] | Время публикации | RFC3339Nano |
Metadata["traceparent"] | W3C trace context | Из OpenTelemetry propagator |
Пример построения сообщения
Helper eventmeta.New(ctx, Envelope{...}) — единая точка сборки сообщения.
Никто не заполняет metadata-поля вручную; если в коде встречаешь прямой
msg.Metadata.Set(...) без helper’а — это баг.
package eventmeta
import (
"encoding/json"
"time"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/oklog/ulid/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)
type Envelope struct {
EventType string
SchemaVersion string
Source string
Payload any
}
func New(ctx context.Context, e Envelope) (*message.Message, error) {
body, err := json.Marshal(e.Payload)
if err != nil {
return nil, fmt.Errorf("marshal payload: %w", err)
}
msg := message.NewMessage(ulid.Make().String(), body)
msg.Metadata.Set("Event-Type", e.EventType)
msg.Metadata.Set("Schema-Version", e.SchemaVersion)
msg.Metadata.Set("Source-Service", e.Source)
msg.Metadata.Set("Published-At", time.Now().UTC().Format(time.RFC3339Nano))
if corr := correlationIDFromCtx(ctx); corr != "" {
msg.Metadata.Set("Correlation-Id", corr)
}
// Пробрасываем W3C trace context
carrier := propagation.MapCarrier{}
otel.GetTextMapPropagator().Inject(ctx, carrier)
if tp := carrier["traceparent"]; tp != "" {
msg.Metadata.Set("traceparent", tp)
}
return msg, nil
}Schema-Version
- Инкрементируй, когда добавляешь/переименовываешь/удаляешь поля в payload.
- Добавление опционального поля совместимо — оставляй ту же версию, но
в handler’ах проверяй
nil. - Любое breaking-изменение (удаление поля, смена типа) — новая версия + пишешь в оба топика параллельно, пока consumer’ы не переехали (dual-publish).
Публикация: outbox pattern
Никогда не публикуй в Kafka напрямую из бизнес-логики. Всегда через outbox.
Почему
Если ты сначала закоммитил tx, а потом упал перед publisher.Publish —
событие потеряно. Если сначала опубликовал, а потом tx откатился —
консьюмер увидел фантомное событие. Outbox решает это: запись в outbox идёт в
той же транзакции, что и бизнес-операция, а отдельный forwarder переносит её
в Kafka.
Таблица outbox
Каждый сервис держит свою таблицу outbox:
CREATE TABLE <schema>.outbox (
id BIGSERIAL PRIMARY KEY,
aggregate_type TEXT NOT NULL, -- "user", "review", "media"
aggregate_id TEXT NOT NULL, -- ID сущности как строка
topic TEXT NOT NULL, -- полное имя топика
payload JSONB NOT NULL, -- тело события
headers JSONB NOT NULL, -- envelope metadata
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_unpublished
ON <schema>.outbox (id)
WHERE published_at IS NULL;Publisher через watermill-sql
В cmd/server/main.go создаётся SQL-publisher, который пишет в таблицу:
import watermillSQL "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
sqlPub, err := watermillSQL.NewPublisher(
stdDB, // *sql.DB поверх того же Postgres
watermillSQL.PublisherConfig{
SchemaAdapter: watermillSQL.DefaultPostgreSQLSchema{GenerateMessagesTableName: outboxTableName},
AutoInitializeSchema: false, // таблицу создаёт migration
},
watermillLogger,
)В service-слое используешь его как обычный publisher, обёрнутый в
db.InTx:
err := s.db.InTx(ctx, func(tx pgx.Tx) error {
if err := s.reviews.CreateTx(ctx, tx, review); err != nil {
return err
}
msg, err := event.NewMessage(ctx, event.Envelope{
EventType: "review.created",
SchemaVersion: "1",
Source: "review",
Payload: ReviewCreatedPayload{ReviewID: review.ID, ...},
})
if err != nil {
return err
}
return s.outboxPublisher.PublishTx(ctx, tx, "kazmaps.review.review", msg)
})PublishTx — тонкий wrapper, который пишет row в outbox, используя переданный
pgx.Tx (через adapter *sql.Tx или прямой insert). Commit происходит в
InTx — событие и запись сохранены атомарно.
Forwarder
Отдельный компонент переносит unpublished-строки из outbox в Kafka.
Используй components/forwarder из Watermill:
import "github.com/ThreeDotsLabs/watermill/components/forwarder"
fwd, err := forwarder.NewForwarder(forwarder.Config{
ForwarderTopic: outboxTableName,
Router: router,
Subscriber: sqlSubscriber, // читает из outbox
Publisher: kafkaPub, // пишет в Kafka
}, watermillLogger)
go fwd.Run(ctx)Forwarder:
- читает через
SELECT ... FOR UPDATE SKIP LOCKED— несколько replicas работают параллельно, - помечает строку как published (или удаляет) после успешной публикации в Kafka,
- при ошибке Kafka оставляет строку — на следующем тике заберётся снова.
Никакого собственного outbox-worker’а не пишем. Используем forwarder.
Consumer: Router и middleware stack
Consumer строится через message.Router. Middleware подключаются в строгом
порядке:
router, err := message.NewRouter(message.RouterConfig{}, watermillLogger)
if err != nil {
return fmt.Errorf("router: %w", err)
}
router.AddMiddleware(
middleware.CorrelationID, // 1. прокидывает Correlation-Id из metadata в ctx
middleware.Recoverer, // 2. ловит panic, возвращает error
poisonQueue(kafkaPub, dlqTopic), // 3. отправляет в DLQ после исчерпания retry
middleware.Retry{ // 4. повторы с экспоненциальным backoff + jitter
MaxRetries: 5,
InitialInterval: 500 * time.Millisecond,
Multiplier: 2.0,
MaxInterval: 30 * time.Second,
Logger: watermillLogger,
}.Middleware,
deduplicator(redisClient), // 5. idempotency через Redis SETNX
)Порядок важен:
CorrelationID— первый, чтобы все последующие middleware и handler видели correlation id в ctx.Recoverer— превращает panic в error, чтобы retry его увидел.PoisonQueue— до Retry, потому что poison срабатывает только когда Retry исчерпан (Retry пробрасывает ошибку дальше, PoisonQueue её ловит).Retry— с backoff и jitter. 5 попыток, начиная с 500ms.Deduplicator— последний, чтобы retry внутри одного запроса не падал на «уже обработано». Дедуп поEvent-Id(илиMessage.UUID).
Handler
Сигнатура handler’а:
func (h *ReviewCreatedHandler) Handle(msg *message.Message) error {
eventType := msg.Metadata.Get("Event-Type")
if eventType != "review.created" {
return nil // skip — чужое событие на том же топике
}
var p ReviewCreatedPayload
if err := json.Unmarshal(msg.Payload, &p); err != nil {
return fmt.Errorf("unmarshal %s: %w", eventType, err)
}
ctx := msg.Context()
if err := h.service.OnReviewCreated(ctx, p); err != nil {
return fmt.Errorf("on review created: %w", err)
}
return nil
}Правила:
- Возврат
nil— успех, offset коммитится. - Возврат
error— Retry обработает, потом PoisonQueue отправит в DLQ. - Не делай
msg.Ack()/msg.Nack()вручную — этим занимается router. - Handler должен быть идемпотентным: тот же
Event-Idне должен создать дубликат сущности. Deduplicator ловит retry в памяти/redis, но consumer тоже должен быть идемпотентным на уровне БД (unique constraint, upsert).
Регистрация handler’а
router.AddHandler(
"review.created.onUserActivity", // handler name — уникален в router
"kazmaps.review.review", // subscribe topic
kafkaSubscriber,
"", // publish topic (пусто — handler не публикует)
nil,
handler.Handle,
)Publisher middleware
На publisher’е (на SQL-publisher’е outbox) применяем:
- CorrelationID — подхватывает correlation из ctx и пишет в metadata.
- OpenTelemetry — инжектит
traceparentв metadata. - Metrics — инкрементирует prometheus-counter
events_published_total{topic, event_type}.
Эти middleware живут в pkg/eventmw/ внутри сервис-репо. Пока shared-библиотека
не вынесена в отдельный репозиторий, между сервисами они синхронизируются
копированием — за этим следят владельцы сервисов.
Эталонная реализация Publisher и middleware stack — в pkg/eventmw/ одного
из сервис-репо; конкретный путь в коде указан в README сервиса (см.
service-readme).
Тестирование
Unit-тесты через gochannel
Для тестов не поднимай Kafka. Используй gochannel.GoChannel — in-memory
pub-sub, совместимый с Watermill API:
import "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
func TestReviewCreatedHandler(t *testing.T) {
pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil)
defer pubsub.Close()
handler := NewReviewCreatedHandler(fakeService)
router, _ := message.NewRouter(message.RouterConfig{}, nil)
router.AddHandler("test", "topic", pubsub, "", nil, handler.Handle)
go router.Run(context.Background())
<-router.Running()
msg := message.NewMessage("evt-1", []byte(`{"review_id":42}`))
msg.Metadata.Set("Event-Type", "review.created")
require.NoError(t, pubsub.Publish("topic", msg))
// проверь, что fakeService получил вызов
}Integration-тесты с реальным Kafka
Если тест должен проверить интеграцию с реальным Kafka — используй testcontainers-go:
kafkaCt, _ := kafka.Run(ctx, "confluentinc/cp-kafka:7.6.0")
brokers, _ := kafkaCt.Brokers(ctx)Такие тесты живут в *_integration_test.go под build-tag’ом integration —
чтобы не тормозили обычный make test.
Что не делать
- Не используй
sarama/kafka-goнапрямую. Только через Watermill. - Не публикуй в Kafka из handler’а HTTP. Всегда через outbox.
- Не делай sync request-reply через Kafka. Для RPC — HTTP internal endpoint.
- Не коммить offset руками. Router делает это после успешного handler.
- Не делай handler, зависящий от порядка сообщений между топиками. Порядок гарантируется только внутри одной партиции одного топика.
- Не пиши секреты / PII в payload события. См.
logging— те же правила маскирования применяются к событиям.
См. также
../patterns/outbox— транзакционный outbox и forwarder.../patterns/cqrs— когда подключатьwatermill/components/cqrs.../patterns/idempotent-consumer— Deduplicator и идемпотентность.../how-to/add-kafka-event— пошаговый рецепт добавления нового события.../how-to/debug-outbox-lag— что делать, если forwarder отстаёт.../patterns/retry-and-circuit-breaker— настройка retry в middleware stack, когда подключать circuit breaker на HTTP-клиентах.../troubleshooting/kafka-consumer-stuck— consumer завис, лаг растёт.../troubleshooting/kafka-rebalancing— rebalance-loop: отличие от stuck, параметры клиента.../event-catalog— каталог всех фактических событий.