События: Watermill, Kafka, outbox¶
Все межсервисные события ходят через Kafka. Мы работаем с Kafka через
Watermill: publisher, subscriber, middleware,
testing — всё стандартное. Напрямую sarama / kafka-go не используем.
End-to-end поток от HTTP-handler'а на producer-сервисе до handler'а на consumer-сервисе:
flowchart LR
subgraph Producer["producer-сервис"]
H[HTTP handler]
S[service слой]
DB[(Postgres)]
OB[outbox таблица]
FWD[forwarder goroutine]
end
subgraph Kafka
T["topic<br/>kazmaps.review.review.created"]
DLQ["topic .dlq"]
end
subgraph Consumer["consumer-сервис"]
SUB[Kafka subscriber]
MW["middleware stack:<br/>CorrelationID → Recoverer →<br/>PoisonQueue → Retry →<br/>Deduplicator"]
HND[event handler]
CDB[(Postgres)]
end
H --> S
S -->|InTx| DB
S -->|InTx, PublishTx| OB
FWD -->|SELECT FOR UPDATE| OB
FWD -->|Publish| T
T --> SUB
SUB --> MW
MW -->|ok| HND
MW -.->|retry exhausted| DLQ
HND --> CDB
Ключевые свойства: запись в БД и строку в outbox атомарна, forwarder отдельно переносит строки в Kafka (at-least-once), consumer-middleware дедуплицирует retry-дубли. Producer и consumer — разные сервисы с разными БД.
Содержание¶
- Transport и топики
- Envelope
- Публикация: outbox pattern
- Consumer: Router и middleware stack
- Publisher middleware
- Тестирование
- Что не делать
- См. также
Transport и топики¶
Брокер¶
- Локально — один Kafka в
docker-compose.yml(KRaft-mode, один узел). - Продовый брокер поднимается отдельно; env-переменная
KAFKA_BROKERSзадаёт endpoint'ы. - Сериализация — JSON. Protobuf / Avro не используем.
Naming топиков¶
Формат:
Примеры:
kazmaps.user.user.registered
kazmaps.user.user.banned
kazmaps.review.review.created
kazmaps.review.review.updated
kazmaps.media.photo.uploaded
kazmaps.media.moderation.approved
<service>— producer.<entity>— существительное в единственном числе.<action>— прошедшее время (факт уже случился):created,updated,deleted,completed,failed.
DLQ¶
Для каждого consumer-топика заводим DLQ с суффиксом .dlq:
В DLQ уходят сообщения, которые падают после исчерпания retry (см. ниже).
Envelope¶
Каждое сообщение несёт набор headers (Watermill metadata). Это обязательно — без этих полей сообщение отклоняется на consumer'е.
Обязательные поля¶
| Поле | Что это | Как заполнять |
|---|---|---|
Message.UUID |
Идентификатор события | ULID, один раз на событие |
Metadata["Event-Type"] |
Тип события | review.created, user.banned |
Metadata["Schema-Version"] |
Версия схемы payload'а | "1", "2", ... |
Metadata["Correlation-Id"] |
Трассировка операции | Приходит из HTTP-запроса / наследуется |
Metadata["Source-Service"] |
Имя producer-сервиса | "review", "user" |
Metadata["Published-At"] |
Время публикации | RFC3339Nano |
Metadata["traceparent"] |
W3C trace context | Из OpenTelemetry propagator |
Пример построения сообщения¶
Helper eventmeta.New(ctx, Envelope{...}) — единая точка сборки сообщения.
Никто не заполняет metadata-поля вручную; если в коде встречаешь прямой
msg.Metadata.Set(...) без helper'а — это баг.
package eventmeta
import (
"encoding/json"
"time"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/oklog/ulid/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)
type Envelope struct {
EventType string
SchemaVersion string
Source string
Payload any
}
func New(ctx context.Context, e Envelope) (*message.Message, error) {
body, err := json.Marshal(e.Payload)
if err != nil {
return nil, fmt.Errorf("marshal payload: %w", err)
}
msg := message.NewMessage(ulid.Make().String(), body)
msg.Metadata.Set("Event-Type", e.EventType)
msg.Metadata.Set("Schema-Version", e.SchemaVersion)
msg.Metadata.Set("Source-Service", e.Source)
msg.Metadata.Set("Published-At", time.Now().UTC().Format(time.RFC3339Nano))
if corr := correlationIDFromCtx(ctx); corr != "" {
msg.Metadata.Set("Correlation-Id", corr)
}
// Пробрасываем W3C trace context
carrier := propagation.MapCarrier{}
otel.GetTextMapPropagator().Inject(ctx, carrier)
if tp := carrier["traceparent"]; tp != "" {
msg.Metadata.Set("traceparent", tp)
}
return msg, nil
}
Schema-Version¶
- Инкрементируй, когда добавляешь/переименовываешь/удаляешь поля в payload.
- Добавление опционального поля совместимо — оставляй ту же версию, но
в handler'ах проверяй
nil. - Любое breaking-изменение (удаление поля, смена типа) — новая версия + пишешь в оба топика параллельно, пока consumer'ы не переехали (dual-publish).
Публикация: outbox pattern¶
Никогда не публикуй в Kafka напрямую из бизнес-логики. Всегда через outbox.
Почему¶
Если ты сначала закоммитил tx, а потом упал перед publisher.Publish —
событие потеряно. Если сначала опубликовал, а потом tx откатился —
консьюмер увидел фантомное событие. Outbox решает это: запись в outbox идёт в
той же транзакции, что и бизнес-операция, а отдельный forwarder переносит её
в Kafka.
Таблица outbox¶
Каждый сервис держит свою таблицу outbox:
CREATE TABLE <schema>.outbox (
id BIGSERIAL PRIMARY KEY,
aggregate_type TEXT NOT NULL, -- "user", "review", "media"
aggregate_id TEXT NOT NULL, -- ID сущности как строка
topic TEXT NOT NULL, -- полное имя топика
payload JSONB NOT NULL, -- тело события
headers JSONB NOT NULL, -- envelope metadata
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_unpublished
ON <schema>.outbox (id)
WHERE published_at IS NULL;
Publisher через watermill-sql¶
В cmd/server/main.go создаётся SQL-publisher, который пишет в таблицу:
import watermillSQL "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
sqlPub, err := watermillSQL.NewPublisher(
stdDB, // *sql.DB поверх того же Postgres
watermillSQL.PublisherConfig{
SchemaAdapter: watermillSQL.DefaultPostgreSQLSchema{GenerateMessagesTableName: outboxTableName},
AutoInitializeSchema: false, // таблицу создаёт migration
},
watermillLogger,
)
В service-слое используешь его как обычный publisher, обёрнутый в
db.InTx:
err := s.db.InTx(ctx, func(tx pgx.Tx) error {
if err := s.reviews.CreateTx(ctx, tx, review); err != nil {
return err
}
msg, err := event.NewMessage(ctx, event.Envelope{
EventType: "review.created",
SchemaVersion: "1",
Source: "review",
Payload: ReviewCreatedPayload{ReviewID: review.ID, ...},
})
if err != nil {
return err
}
return s.outboxPublisher.PublishTx(ctx, tx, "kazmaps.review.review.created", msg)
})
PublishTx — тонкий wrapper, который пишет row в outbox, используя переданный
pgx.Tx (через adapter *sql.Tx или прямой insert). Commit происходит в
InTx — событие и запись сохранены атомарно.
Forwarder¶
Отдельный компонент переносит unpublished-строки из outbox в Kafka.
Используй components/forwarder из Watermill:
import "github.com/ThreeDotsLabs/watermill/components/forwarder"
fwd, err := forwarder.NewForwarder(forwarder.Config{
ForwarderTopic: outboxTableName,
Router: router,
Subscriber: sqlSubscriber, // читает из outbox
Publisher: kafkaPub, // пишет в Kafka
}, watermillLogger)
go fwd.Run(ctx)
Forwarder:
- читает через SELECT ... FOR UPDATE SKIP LOCKED — несколько replicas работают параллельно,
- помечает строку как published (или удаляет) после успешной публикации в Kafka,
- при ошибке Kafka оставляет строку — на следующем тике заберётся снова.
Никакого собственного outbox-worker'а не пишем. Используем forwarder.
Consumer: Router и middleware stack¶
Consumer строится через message.Router. Middleware подключаются в строгом
порядке:
router, err := message.NewRouter(message.RouterConfig{}, watermillLogger)
if err != nil {
return fmt.Errorf("router: %w", err)
}
router.AddMiddleware(
middleware.CorrelationID, // 1. прокидывает Correlation-Id из metadata в ctx
middleware.Recoverer, // 2. ловит panic, возвращает error
poisonQueue(kafkaPub, dlqTopic), // 3. отправляет в DLQ после исчерпания retry
middleware.Retry{ // 4. повторы с экспоненциальным backoff + jitter
MaxRetries: 5,
InitialInterval: 500 * time.Millisecond,
Multiplier: 2.0,
MaxInterval: 30 * time.Second,
Logger: watermillLogger,
}.Middleware,
deduplicator(redisClient), // 5. idempotency через Redis SETNX
)
Порядок важен:
CorrelationID— первый, чтобы все последующие middleware и handler видели correlation id в ctx.Recoverer— превращает panic в error, чтобы retry его увидел.PoisonQueue— до Retry, потому что poison срабатывает только когда Retry исчерпан (Retry пробрасывает ошибку дальше, PoisonQueue её ловит).Retry— с backoff и jitter. 5 попыток, начиная с 500ms.Deduplicator— последний, чтобы retry внутри одного запроса не падал на «уже обработано». Дедуп поEvent-Id(илиMessage.UUID).
Handler¶
Сигнатура handler'а:
func (h *ReviewCreatedHandler) Handle(msg *message.Message) error {
eventType := msg.Metadata.Get("Event-Type")
if eventType != "review.created" {
return nil // skip — чужое событие на том же топике
}
var p ReviewCreatedPayload
if err := json.Unmarshal(msg.Payload, &p); err != nil {
return fmt.Errorf("unmarshal %s: %w", eventType, err)
}
ctx := msg.Context()
if err := h.service.OnReviewCreated(ctx, p); err != nil {
return fmt.Errorf("on review created: %w", err)
}
return nil
}
Правила:
- Возврат nil — успех, offset коммитится.
- Возврат error — Retry обработает, потом PoisonQueue отправит в DLQ.
- Не делай msg.Ack() / msg.Nack() вручную — этим занимается router.
- Handler должен быть идемпотентным: тот же Event-Id не должен создать
дубликат сущности. Deduplicator ловит retry в памяти/redis, но consumer
тоже должен быть идемпотентным на уровне БД (unique constraint, upsert).
Регистрация handler'а¶
router.AddHandler(
"review.created.onUserActivity", // handler name — уникален в router
"kazmaps.review.review.created", // subscribe topic
kafkaSubscriber,
"", // publish topic (пусто — handler не публикует)
nil,
handler.Handle,
)
Publisher middleware¶
На publisher'е (на SQL-publisher'е outbox) применяем:
- CorrelationID — подхватывает correlation из ctx и пишет в metadata.
- OpenTelemetry — инжектит
traceparentв metadata. - Metrics — инкрементирует prometheus-counter
events_published_total{topic, event_type}.
Эти middleware живут в pkg/eventmw/ внутри сервис-репо. Пока shared-библиотека
не вынесена в отдельный репозиторий, между сервисами они синхронизируются
копированием — за этим следят владельцы сервисов.
Эталонная реализация Publisher и middleware stack — в репозитории сервиса
review, файл internal/event/publisher.go.
Тестирование¶
Unit-тесты через gochannel¶
Для тестов не поднимай Kafka. Используй gochannel.GoChannel — in-memory
pub-sub, совместимый с Watermill API:
import "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
func TestReviewCreatedHandler(t *testing.T) {
pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil)
defer pubsub.Close()
handler := NewReviewCreatedHandler(fakeService)
router, _ := message.NewRouter(message.RouterConfig{}, nil)
router.AddHandler("test", "topic", pubsub, "", nil, handler.Handle)
go router.Run(context.Background())
<-router.Running()
msg := message.NewMessage("evt-1", []byte(`{"review_id":42}`))
msg.Metadata.Set("Event-Type", "review.created")
require.NoError(t, pubsub.Publish("topic", msg))
// проверь, что fakeService получил вызов
}
Integration-тесты с реальным Kafka¶
Если тест должен проверить интеграцию с реальным Kafka — используй testcontainers-go:
Такие тесты живут в *_integration_test.go под build-tag'ом integration —
чтобы не тормозили обычный make test.
Что не делать¶
- Не используй
sarama/kafka-goнапрямую. Только через Watermill. - Не публикуй в Kafka из handler'а HTTP. Всегда через outbox.
- Не делай sync request-reply через Kafka. Для RPC — HTTP internal endpoint.
- Не коммить offset руками. Router делает это после успешного handler.
- Не делай handler, зависящий от порядка сообщений между топиками. Порядок гарантируется только внутри одной партиции одного топика.
- Не пиши секреты / PII в payload события. См.
logging.md— те же правила маскирования применяются к событиям.
См. также¶
../patterns/outbox.md— транзакционный outbox и forwarder.../patterns/cqrs.md— когда подключатьwatermill/components/cqrs.../patterns/idempotent-consumer.md— Deduplicator и идемпотентность.../how-to/add-kafka-event.md— пошаговый рецепт добавления нового события.../how-to/debug-outbox-lag.md— что делать, если forwarder отстаёт.../patterns/retry-and-circuit-breaker.md— настройка retry в middleware stack, когда подключать circuit breaker на HTTP-клиентах.../troubleshooting/kafka-consumer-stuck.md— consumer завис, лаг растёт.../troubleshooting/kafka-rebalancing.md— rebalance-loop: отличие от stuck, параметры клиента.../event-catalog.md— каталог всех фактических событий.