Skip to Content
How-toДобавить Kafka-событие

Как добавить Kafka-событие

Пошаговый рецепт добавления нового event’а на примере review.created. В другом сервисе замени имена — шаги такие же.

Правила Watermill / outbox / envelope описаны в ../conventions/events. Здесь — сжатый практический сценарий для PR-автора.

1. Определить тип события

Формат: past tense, точкой разделены аггрегат и действие.

review.created review.updated review.deleted user.banned photo.uploaded moderation.approved

Имя топика в Kafka — per-entity (действие НЕ в имени, а в заголовке Event-Type):

kazmaps.<service>.<entity> kazmaps.review.review # Event-Type: review.created / review.updated / ... kazmaps.user.account # Event-Type: account.banned / account.updated / ...
  • <service> — producer (откуда событие).
  • <entity> — существительное в единственном числе. Топик = поток всех событий этой сущности; ключ записи = id сущности (порядок событий на сущность).
  • Действие (created/updated/…) — в заголовке Event-Type, past tense.

Не называй действия в настоящем/будущем времени (review.create, user.will-ban) — событие это факт о случившемся. И не клади действие в имя топика — только в Event-Type (см. event-catalog, «Почему сущность, а не событие»).

2. Payload struct

В internal/event/types.go (producer’а):

package event type ReviewCreatedV1 struct { ReviewID int64 `json:"review_id"` PlaceID int64 `json:"place_id"` UserID int64 `json:"user_id"` Rating int16 `json:"rating"` // Schema-Version кладётся в metadata, не в payload. }
  • Типы — публичные (консьюмер в своём сервис-репо объявляет собственный идентичный struct и парсит payload по JSON-контракту; прямого cross-repo импорта между сервис-репозиториями нет).
  • JSON-теги snake_case.
  • Не складывай в payload секреты / PII в открытом виде (email, phone, token). Если нужен reference — клади user_id, консьюмер дотянется через /internal/users/{id}.

3. Реестр контрактов

Когда появится отдельный репозиторий kazmaps-contracts (общие типы событий для producer’ов и consumer’ов) — добавь туда тип. Пока его нет — копируй struct в консьюмер при подключении. Синхронизацию держат владельцы. (TODO: заменить на импорт из contracts-репо.)

4. Publisher: только через outbox

Никогда не публикуй напрямую в Kafka из handler’а или service’а. Запись в outbox идёт в той же транзакции, что и бизнес-запись:

func (s *ReviewService) Create(ctx context.Context, cmd CreateReviewCommand) (*domain.Review, error) { var review *domain.Review err := s.db.InTx(ctx, func(tx pgx.Tx) error { r, err := s.reviews.CreateTx(ctx, tx, cmd.UserID, cmd.PlaceID, cmd.Rating, cmd.Text) if err != nil { return err } payload := event.ReviewCreatedV1{ ReviewID: r.ID, PlaceID: r.PlaceID, UserID: r.UserID, Rating: r.Rating, } msg, err := eventmeta.New(ctx, eventmeta.Envelope{ EventType: "review.created", SchemaVersion: "1", Source: "review", Payload: payload, }) if err != nil { return err } // Топик = сущность (kazmaps.review.review); действие "review.created" // уже лежит в Event-Type envelope. Ключ записи — id сущности (review id). return s.outboxPublisher.PublishTx(ctx, tx, "kazmaps.review.review", msg) }) if err != nil { return nil, fmt.Errorf("create review: %w", err) } return review, nil }

Forwarder (watermill/components/forwarder) автоматически перетащит строку из outbox в Kafka. См. ../conventions/events.

5. Envelope

Все metadata-поля заполняются через helper eventmeta.New — не пиши их руками:

msg, err := eventmeta.New(ctx, eventmeta.Envelope{ EventType: "review.created", SchemaVersion: "1", Source: "review", Payload: payload, })

eventmeta.New заполняет:

  • Message.UUID — ULID (идентификатор события),
  • Metadata["Event-Type"],
  • Metadata["Schema-Version"],
  • Metadata["Source-Service"],
  • Metadata["Correlation-Id"] — из ctx,
  • Metadata["Published-At"] — RFC3339Nano,
  • Metadata["traceparent"] — W3C trace context из OTel propagator.

Если какого-то поля в metadata нет — consumer отклонит сообщение.

6. Consumer: регистрация handler’а

В сервисе-подписчике добавь handler в router.

// internal/event/router.go router.AddNoPublisherHandler( "on-review", // уникальное имя handler'а "kazmaps.review.review", // топик = сущность (все её действия) kafkaSubscriber, reviewHandler.Handle, // внутри роутит по Event-Type )

Топик сущности несёт все её действия, поэтому handler читает msg.Metadata.Get("Event-Type") и обрабатывает нужные (остальные — ack-and-skip, чтобы не копились в retry/DLQ).

Middleware stack (CorrelationID → Recoverer → PoisonQueue → Retry → Deduplicator) применяется к router’у один раз в wiring сервиса — одинаково для всех handler’ов. Не добавляй per-handler middleware ad-hoc.

7. Handler signature

// internal/event/review_created.go package event import ( "context" "encoding/json" "fmt" "github.com/ThreeDotsLabs/watermill/message" ) type ReviewCreatedHandler struct { svc *service.Stats } func NewReviewCreatedHandler(svc *service.Stats) *ReviewCreatedHandler { return &ReviewCreatedHandler{svc: svc} } func (h *ReviewCreatedHandler) Handle(msg *message.Message) error { if eventType := msg.Metadata.Get("Event-Type"); eventType != "review.created" { return nil // чужое событие на том же топике — пропускаем } var evt ReviewCreatedV1 if err := json.Unmarshal(msg.Payload, &evt); err != nil { return fmt.Errorf("unmarshal review.created: %w", err) } return h.svc.OnReviewCreated(msg.Context(), evt) }

Правила handler’а:

  • Возвращай nil → router коммитит offset.
  • Возвращай error → Retry сработает, потом PoisonQueue отправит в DLQ.
  • Не делай msg.Ack() / msg.Nack() вручную. Этим управляет router.

8. Идемпотентность

Kafka даёт at-least-once: один и тот же event может прийти дважды (сетевой глюк, retry, перезапуск consumer’а). Handler обязан корректно отрабатывать повтор.

Стандартные подходы:

  1. UPSERT в БД по event_id:

    INSERT INTO review.review_stats (review_id, rating, seen_event_id) VALUES ($1, $2, $3) ON CONFLICT (seen_event_id) DO NOTHING;
  2. Deduplicator middleware — Redis SETNX event:<UUID>. Уже в middleware stack, ловит повторы в окне TTL.

  3. Уникальный constraint на доменной колонке: «review_stats unique по review_id» — повторный insert даст ошибку, handler ловит и возвращает nil (факт уже учтён).

Используй хотя бы одну стратегию. Никогда не надейся «наверное, Kafka не повторит».

9. Тесты

Unit через gochannel

Не поднимай Kafka в unit-тестах.

func TestOnReviewCreated_CountsStats(t *testing.T) { pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil) defer pubsub.Close() svc := &fakeStats{} h := event.NewReviewCreatedHandler(svc) router, _ := message.NewRouter(message.RouterConfig{}, nil) router.AddNoPublisherHandler("t", "reviews", pubsub, h.Handle) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { _ = router.Run(ctx) }() <-router.Running() msg := message.NewMessage("evt-1", []byte(`{"review_id":42,"place_id":7,"user_id":1,"rating":5}`)) msg.Metadata.Set("Event-Type", "review.created") if err := pubsub.Publish("reviews", msg); err != nil { t.Fatalf("publish: %v", err) } eventually(t, 2*time.Second, func() bool { return svc.handled == 1 }) }

Подробности — в ../conventions/testing.

Integration (опционально)

Если нужно проверить интеграцию с настоящим Kafka — testcontainers-go с build-тегом integration. Держи такие тесты отдельной Makefile-целью (make test-integration), чтобы make test оставался быстрым.

10. Schema evolution

  • Non-breaking изменение (добавил optional поле в payload) — остаётся тот же Event-Type, можно bump’нуть Schema-Version с 1 на 2. Старые consumer’ы игнорируют новое поле (они tolerate extra fields).
  • Breaking изменение (удалил поле, сменил тип, перестроил структуру) — новый Event-Type: review.created.v2. Publisher параллельно пишет в оба топика (review.created и review.created.v2) до тех пор, пока все consumer’ы не переехали на v2. После миграции старый публикатор выключается.
  • Consumer’ы tolerate extra fields: json.Unmarshal по умолчанию игнорирует лишние поля в JSON. Но если хочешь надёжнее — проверяй Schema-Version в metadata.

11. Глоссарий

Если событие вводит новую доменную концепцию (consent_revoked, upload_quarantined), добавь короткое определение в ../glossary — чтобы ревьюер и новички не гадали.

Чеклист

  • Имя события в past tense, топик kazmaps.<service>.<entity>.<action>.
  • Payload — публичный struct, JSON-теги snake_case.
  • Publisher пишет через outbox внутри бизнес-транзакции.
  • Envelope заполнен через eventmeta.New (все обязательные поля).
  • Consumer-handler возвращает error при проблеме, не делает ack вручную.
  • Handler идемпотентен (UPSERT / unique constraint / Deduplicator).
  • Unit-тест через gochannel написан.
  • DLQ-топик <topic>.dlq создаётся автоматически при первом сообщении, но убедись, что alert на ненулевой DLQ подключён.

Связанные разделы

  • ../conventions/events — envelope, middleware, naming, reference.
  • ../patterns/outbox — почему publisher всегда пишет в outbox внутри бизнес-транзакции.
  • ../event-catalog — добавь запись о новом событии сюда, как только оно появилось.
Last updated on