Как добавить Kafka-событие
Пошаговый рецепт добавления нового event’а на примере review.created. В
другом сервисе замени имена — шаги такие же.
Правила Watermill / outbox / envelope описаны в
../conventions/events. Здесь — сжатый
практический сценарий для PR-автора.
1. Определить тип события
Формат: past tense, точкой разделены аггрегат и действие.
review.created
review.updated
review.deleted
user.banned
photo.uploaded
moderation.approvedИмя топика в Kafka — per-entity (действие НЕ в имени, а в заголовке
Event-Type):
kazmaps.<service>.<entity>
kazmaps.review.review # Event-Type: review.created / review.updated / ...
kazmaps.user.account # Event-Type: account.banned / account.updated / ...<service>— producer (откуда событие).<entity>— существительное в единственном числе. Топик = поток всех событий этой сущности; ключ записи = id сущности (порядок событий на сущность).- Действие (
created/updated/…) — в заголовкеEvent-Type, past tense.
Не называй действия в настоящем/будущем времени (review.create,
user.will-ban) — событие это факт о случившемся. И не клади действие в имя
топика — только в Event-Type (см.
event-catalog, «Почему сущность, а не событие»).
2. Payload struct
В internal/event/types.go (producer’а):
package event
type ReviewCreatedV1 struct {
ReviewID int64 `json:"review_id"`
PlaceID int64 `json:"place_id"`
UserID int64 `json:"user_id"`
Rating int16 `json:"rating"`
// Schema-Version кладётся в metadata, не в payload.
}- Типы — публичные (консьюмер в своём сервис-репо объявляет собственный идентичный struct и парсит payload по JSON-контракту; прямого cross-repo импорта между сервис-репозиториями нет).
- JSON-теги
snake_case. - Не складывай в payload секреты / PII в открытом виде (email, phone,
token). Если нужен reference — клади
user_id, консьюмер дотянется через/internal/users/{id}.
3. Реестр контрактов
Когда появится отдельный репозиторий kazmaps-contracts (общие типы
событий для producer’ов и consumer’ов) — добавь туда тип. Пока его нет —
копируй struct в консьюмер при подключении. Синхронизацию держат
владельцы. (TODO: заменить на импорт из contracts-репо.)
4. Publisher: только через outbox
Никогда не публикуй напрямую в Kafka из handler’а или service’а. Запись в outbox идёт в той же транзакции, что и бизнес-запись:
func (s *ReviewService) Create(ctx context.Context, cmd CreateReviewCommand) (*domain.Review, error) {
var review *domain.Review
err := s.db.InTx(ctx, func(tx pgx.Tx) error {
r, err := s.reviews.CreateTx(ctx, tx, cmd.UserID, cmd.PlaceID, cmd.Rating, cmd.Text)
if err != nil {
return err
}
payload := event.ReviewCreatedV1{
ReviewID: r.ID, PlaceID: r.PlaceID, UserID: r.UserID, Rating: r.Rating,
}
msg, err := eventmeta.New(ctx, eventmeta.Envelope{
EventType: "review.created",
SchemaVersion: "1",
Source: "review",
Payload: payload,
})
if err != nil {
return err
}
// Топик = сущность (kazmaps.review.review); действие "review.created"
// уже лежит в Event-Type envelope. Ключ записи — id сущности (review id).
return s.outboxPublisher.PublishTx(ctx, tx, "kazmaps.review.review", msg)
})
if err != nil {
return nil, fmt.Errorf("create review: %w", err)
}
return review, nil
}Forwarder (watermill/components/forwarder) автоматически перетащит
строку из outbox в Kafka. См. ../conventions/events.
5. Envelope
Все metadata-поля заполняются через helper eventmeta.New — не пиши их
руками:
msg, err := eventmeta.New(ctx, eventmeta.Envelope{
EventType: "review.created",
SchemaVersion: "1",
Source: "review",
Payload: payload,
})eventmeta.New заполняет:
Message.UUID— ULID (идентификатор события),Metadata["Event-Type"],Metadata["Schema-Version"],Metadata["Source-Service"],Metadata["Correlation-Id"]— из ctx,Metadata["Published-At"]— RFC3339Nano,Metadata["traceparent"]— W3C trace context из OTel propagator.
Если какого-то поля в metadata нет — consumer отклонит сообщение.
6. Consumer: регистрация handler’а
В сервисе-подписчике добавь handler в router.
// internal/event/router.go
router.AddNoPublisherHandler(
"on-review", // уникальное имя handler'а
"kazmaps.review.review", // топик = сущность (все её действия)
kafkaSubscriber,
reviewHandler.Handle, // внутри роутит по Event-Type
)Топик сущности несёт все её действия, поэтому handler читает
msg.Metadata.Get("Event-Type") и обрабатывает нужные (остальные —
ack-and-skip, чтобы не копились в retry/DLQ).
Middleware stack (CorrelationID → Recoverer → PoisonQueue → Retry → Deduplicator) применяется к router’у один раз в wiring сервиса — одинаково
для всех handler’ов. Не добавляй per-handler middleware ad-hoc.
7. Handler signature
// internal/event/review_created.go
package event
import (
"context"
"encoding/json"
"fmt"
"github.com/ThreeDotsLabs/watermill/message"
)
type ReviewCreatedHandler struct {
svc *service.Stats
}
func NewReviewCreatedHandler(svc *service.Stats) *ReviewCreatedHandler {
return &ReviewCreatedHandler{svc: svc}
}
func (h *ReviewCreatedHandler) Handle(msg *message.Message) error {
if eventType := msg.Metadata.Get("Event-Type"); eventType != "review.created" {
return nil // чужое событие на том же топике — пропускаем
}
var evt ReviewCreatedV1
if err := json.Unmarshal(msg.Payload, &evt); err != nil {
return fmt.Errorf("unmarshal review.created: %w", err)
}
return h.svc.OnReviewCreated(msg.Context(), evt)
}Правила handler’а:
- Возвращай
nil→ router коммитит offset. - Возвращай
error→ Retry сработает, потом PoisonQueue отправит в DLQ. - Не делай
msg.Ack()/msg.Nack()вручную. Этим управляет router.
8. Идемпотентность
Kafka даёт at-least-once: один и тот же event может прийти дважды (сетевой глюк, retry, перезапуск consumer’а). Handler обязан корректно отрабатывать повтор.
Стандартные подходы:
-
UPSERT в БД по event_id:
INSERT INTO review.review_stats (review_id, rating, seen_event_id) VALUES ($1, $2, $3) ON CONFLICT (seen_event_id) DO NOTHING; -
Deduplicator middleware — Redis
SETNX event:<UUID>. Уже в middleware stack, ловит повторы в окне TTL. -
Уникальный constraint на доменной колонке: «review_stats unique по review_id» — повторный insert даст ошибку, handler ловит и возвращает nil (факт уже учтён).
Используй хотя бы одну стратегию. Никогда не надейся «наверное, Kafka не повторит».
9. Тесты
Unit через gochannel
Не поднимай Kafka в unit-тестах.
func TestOnReviewCreated_CountsStats(t *testing.T) {
pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil)
defer pubsub.Close()
svc := &fakeStats{}
h := event.NewReviewCreatedHandler(svc)
router, _ := message.NewRouter(message.RouterConfig{}, nil)
router.AddNoPublisherHandler("t", "reviews", pubsub, h.Handle)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { _ = router.Run(ctx) }()
<-router.Running()
msg := message.NewMessage("evt-1", []byte(`{"review_id":42,"place_id":7,"user_id":1,"rating":5}`))
msg.Metadata.Set("Event-Type", "review.created")
if err := pubsub.Publish("reviews", msg); err != nil {
t.Fatalf("publish: %v", err)
}
eventually(t, 2*time.Second, func() bool { return svc.handled == 1 })
}Подробности — в ../conventions/testing.
Integration (опционально)
Если нужно проверить интеграцию с настоящим Kafka — testcontainers-go с
build-тегом integration. Держи такие тесты отдельной Makefile-целью
(make test-integration), чтобы make test оставался быстрым.
10. Schema evolution
- Non-breaking изменение (добавил optional поле в payload) — остаётся
тот же
Event-Type, можно bump’нутьSchema-Versionс 1 на 2. Старые consumer’ы игнорируют новое поле (они tolerate extra fields). - Breaking изменение (удалил поле, сменил тип, перестроил
структуру) — новый
Event-Type:review.created.v2. Publisher параллельно пишет в оба топика (review.createdиreview.created.v2) до тех пор, пока все consumer’ы не переехали на v2. После миграции старый публикатор выключается. - Consumer’ы tolerate extra fields:
json.Unmarshalпо умолчанию игнорирует лишние поля в JSON. Но если хочешь надёжнее — проверяйSchema-Versionв metadata.
11. Глоссарий
Если событие вводит новую доменную концепцию (consent_revoked,
upload_quarantined), добавь короткое определение в
../glossary — чтобы ревьюер и новички не гадали.
Чеклист
- Имя события в past tense, топик
kazmaps.<service>.<entity>.<action>. - Payload — публичный struct, JSON-теги
snake_case. - Publisher пишет через outbox внутри бизнес-транзакции.
- Envelope заполнен через
eventmeta.New(все обязательные поля). - Consumer-handler возвращает error при проблеме, не делает ack вручную.
- Handler идемпотентен (UPSERT / unique constraint / Deduplicator).
- Unit-тест через
gochannelнаписан. - DLQ-топик
<topic>.dlqсоздаётся автоматически при первом сообщении, но убедись, что alert на ненулевой DLQ подключён.
Связанные разделы
../conventions/events— envelope, middleware, naming, reference.../patterns/outbox— почему publisher всегда пишет в outbox внутри бизнес-транзакции.../event-catalog— добавь запись о новом событии сюда, как только оно появилось.