Как добавить Kafka-событие¶
Пошаговый рецепт добавления нового event'а на примере review.created. В
другом сервисе замени имена — шаги такие же.
Правила Watermill / outbox / envelope описаны в
../conventions/events.md. Здесь — сжатый
практический сценарий для PR-автора.
1. Определить тип события¶
Формат: past tense, точкой разделены аггрегат и действие.
Имя топика в Kafka:
<service>— producer (откуда событие).<entity>— существительное в единственном числе.<action>— прошедшее время.
Не называй события в настоящем/будущем времени (review.create,
user.will-ban) — событие — это факт о случившемся.
2. Payload struct¶
В internal/event/types.go (producer'а):
package event
type ReviewCreatedV1 struct {
ReviewID int64 `json:"review_id"`
PlaceID int64 `json:"place_id"`
UserID int64 `json:"user_id"`
Rating int16 `json:"rating"`
// Schema-Version кладётся в metadata, не в payload.
}
- Типы — публичные (консьюмер в своём сервис-репо объявляет собственный идентичный struct и парсит payload по JSON-контракту; прямого cross-repo импорта между сервис-репозиториями нет).
- JSON-теги
snake_case. - Не складывай в payload секреты / PII в открытом виде (email, phone,
token). Если нужен reference — клади
user_id, консьюмер дотянется через/internal/users/{id}.
3. Реестр контрактов¶
Когда появится отдельный репозиторий kazmaps-contracts (общие типы
событий для producer'ов и consumer'ов) — добавь туда тип. Пока его нет —
копируй struct в консьюмер при подключении. Синхронизацию держат
владельцы. (TODO: заменить на импорт из contracts-репо.)
4. Publisher: только через outbox¶
Никогда не публикуй напрямую в Kafka из handler'а или service'а. Запись в outbox идёт в той же транзакции, что и бизнес-запись:
func (s *ReviewService) Create(ctx context.Context, cmd CreateReviewCommand) (*domain.Review, error) {
var review *domain.Review
err := s.db.InTx(ctx, func(tx pgx.Tx) error {
r, err := s.reviews.CreateTx(ctx, tx, cmd.UserID, cmd.PlaceID, cmd.Rating, cmd.Text)
if err != nil {
return err
}
payload := event.ReviewCreatedV1{
ReviewID: r.ID, PlaceID: r.PlaceID, UserID: r.UserID, Rating: r.Rating,
}
msg, err := eventmeta.New(ctx, eventmeta.Envelope{
EventType: "review.created",
SchemaVersion: "1",
Source: "review",
Payload: payload,
})
if err != nil {
return err
}
return s.outboxPublisher.PublishTx(ctx, tx, "kazmaps.review.review.created", msg)
})
if err != nil {
return nil, fmt.Errorf("create review: %w", err)
}
return review, nil
}
Forwarder (watermill/components/forwarder) автоматически перетащит
строку из outbox в Kafka. См. ../conventions/events.md.
5. Envelope¶
Все metadata-поля заполняются через helper eventmeta.New — не пиши их
руками:
msg, err := eventmeta.New(ctx, eventmeta.Envelope{
EventType: "review.created",
SchemaVersion: "1",
Source: "review",
Payload: payload,
})
eventmeta.New заполняет:
Message.UUID— ULID (идентификатор события),Metadata["Event-Type"],Metadata["Schema-Version"],Metadata["Source-Service"],Metadata["Correlation-Id"]— из ctx,Metadata["Published-At"]— RFC3339Nano,Metadata["traceparent"]— W3C trace context из OTel propagator.
Если какого-то поля в metadata нет — consumer отклонит сообщение.
6. Consumer: регистрация handler'а¶
В сервисе-подписчике добавь handler в router.
// internal/event/router.go
router.AddNoPublisherHandler(
"on-review-created", // уникальное имя handler'а
"kazmaps.review.review.created", // топик
kafkaSubscriber,
reviewCreatedHandler.Handle,
)
Middleware stack (CorrelationID → Recoverer → PoisonQueue → Retry →
Deduplicator) применяется к router'у один раз в wiring сервиса — одинаково
для всех handler'ов. Не добавляй per-handler middleware ad-hoc.
7. Handler signature¶
// internal/event/review_created.go
package event
import (
"context"
"encoding/json"
"fmt"
"github.com/ThreeDotsLabs/watermill/message"
)
type ReviewCreatedHandler struct {
svc *service.Stats
}
func NewReviewCreatedHandler(svc *service.Stats) *ReviewCreatedHandler {
return &ReviewCreatedHandler{svc: svc}
}
func (h *ReviewCreatedHandler) Handle(msg *message.Message) error {
if eventType := msg.Metadata.Get("Event-Type"); eventType != "review.created" {
return nil // чужое событие на том же топике — пропускаем
}
var evt ReviewCreatedV1
if err := json.Unmarshal(msg.Payload, &evt); err != nil {
return fmt.Errorf("unmarshal review.created: %w", err)
}
return h.svc.OnReviewCreated(msg.Context(), evt)
}
Правила handler'а:
- Возвращай
nil→ router коммитит offset. - Возвращай
error→ Retry сработает, потом PoisonQueue отправит в DLQ. - Не делай
msg.Ack()/msg.Nack()вручную. Этим управляет router.
8. Идемпотентность¶
Kafka даёт at-least-once: один и тот же event может прийти дважды (сетевой глюк, retry, перезапуск consumer'а). Handler обязан корректно отрабатывать повтор.
Стандартные подходы:
- UPSERT в БД по event_id:
INSERT INTO review.review_stats (review_id, rating, seen_event_id)
VALUES ($1, $2, $3)
ON CONFLICT (seen_event_id) DO NOTHING;
- Deduplicator middleware — Redis
SETNX event:<UUID>. Уже в middleware stack, ловит повторы в окне TTL. - Уникальный constraint на доменной колонке: «review_stats unique по review_id» — повторный insert даст ошибку, handler ловит и возвращает nil (факт уже учтён).
Используй хотя бы одну стратегию. Никогда не надейся «наверное, Kafka не повторит».
9. Тесты¶
Unit через gochannel¶
Не поднимай Kafka в unit-тестах.
func TestOnReviewCreated_CountsStats(t *testing.T) {
pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil)
defer pubsub.Close()
svc := &fakeStats{}
h := event.NewReviewCreatedHandler(svc)
router, _ := message.NewRouter(message.RouterConfig{}, nil)
router.AddNoPublisherHandler("t", "reviews", pubsub, h.Handle)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { _ = router.Run(ctx) }()
<-router.Running()
msg := message.NewMessage("evt-1", []byte(`{"review_id":42,"place_id":7,"user_id":1,"rating":5}`))
msg.Metadata.Set("Event-Type", "review.created")
if err := pubsub.Publish("reviews", msg); err != nil {
t.Fatalf("publish: %v", err)
}
eventually(t, 2*time.Second, func() bool { return svc.handled == 1 })
}
Подробности — в ../conventions/testing.md.
Integration (опционально)¶
Если нужно проверить интеграцию с настоящим Kafka — testcontainers-go с
build-тегом integration. Держи такие тесты отдельной Makefile-целью
(make test-integration), чтобы make test оставался быстрым.
10. Schema evolution¶
- Non-breaking изменение (добавил optional поле в payload) — остаётся
тот же
Event-Type, можно bump'нутьSchema-Versionс 1 на 2. Старые consumer'ы игнорируют новое поле (они tolerate extra fields). - Breaking изменение (удалил поле, сменил тип, перестроил
структуру) — новый
Event-Type:review.created.v2. Publisher параллельно пишет в оба топика (review.createdиreview.created.v2) до тех пор, пока все consumer'ы не переехали на v2. После миграции старый публикатор выключается. - Consumer'ы tolerate extra fields:
json.Unmarshalпо умолчанию игнорирует лишние поля в JSON. Но если хочешь надёжнее — проверяйSchema-Versionв metadata.
11. Глоссарий¶
Если событие вводит новую доменную концепцию (consent_revoked,
upload_quarantined), добавь короткое определение в
../glossary.md — чтобы ревьюер и новички не гадали.
Чеклист¶
- Имя события в past tense, топик
kazmaps.<service>.<entity>.<action>. - Payload — публичный struct, JSON-теги
snake_case. - Publisher пишет через outbox внутри бизнес-транзакции.
- Envelope заполнен через
eventmeta.New(все обязательные поля). - Consumer-handler возвращает error при проблеме, не делает ack вручную.
- Handler идемпотентен (UPSERT / unique constraint / Deduplicator).
- Unit-тест через
gochannelнаписан. - DLQ-топик
<topic>.dlqсоздаётся автоматически при первом сообщении, но убедись, что alert на ненулевой DLQ подключён.
Связанные разделы¶
../conventions/events.md— envelope, middleware, naming, reference.../patterns/outbox.md— почему publisher всегда пишет в outbox внутри бизнес-транзакции.../event-catalog.md— добавь запись о новом событии сюда, как только оно появилось.