Перейти к содержанию

Как добавить Kafka-событие

Пошаговый рецепт добавления нового event'а на примере review.created. В другом сервисе замени имена — шаги такие же.

Правила Watermill / outbox / envelope описаны в ../conventions/events.md. Здесь — сжатый практический сценарий для PR-автора.

1. Определить тип события

Формат: past tense, точкой разделены аггрегат и действие.

review.created
review.updated
review.deleted
user.banned
photo.uploaded
moderation.approved

Имя топика в Kafka:

kazmaps.<service>.<entity>.<action>
kazmaps.review.review.created
kazmaps.user.user.banned
  • <service> — producer (откуда событие).
  • <entity> — существительное в единственном числе.
  • <action> — прошедшее время.

Не называй события в настоящем/будущем времени (review.create, user.will-ban) — событие — это факт о случившемся.

2. Payload struct

В internal/event/types.go (producer'а):

package event

type ReviewCreatedV1 struct {
    ReviewID int64     `json:"review_id"`
    PlaceID  int64     `json:"place_id"`
    UserID   int64     `json:"user_id"`
    Rating   int16     `json:"rating"`
    // Schema-Version кладётся в metadata, не в payload.
}
  • Типы — публичные (консьюмер в своём сервис-репо объявляет собственный идентичный struct и парсит payload по JSON-контракту; прямого cross-repo импорта между сервис-репозиториями нет).
  • JSON-теги snake_case.
  • Не складывай в payload секреты / PII в открытом виде (email, phone, token). Если нужен reference — клади user_id, консьюмер дотянется через /internal/users/{id}.

3. Реестр контрактов

Когда появится отдельный репозиторий kazmaps-contracts (общие типы событий для producer'ов и consumer'ов) — добавь туда тип. Пока его нет — копируй struct в консьюмер при подключении. Синхронизацию держат владельцы. (TODO: заменить на импорт из contracts-репо.)

4. Publisher: только через outbox

Никогда не публикуй напрямую в Kafka из handler'а или service'а. Запись в outbox идёт в той же транзакции, что и бизнес-запись:

func (s *ReviewService) Create(ctx context.Context, cmd CreateReviewCommand) (*domain.Review, error) {
    var review *domain.Review
    err := s.db.InTx(ctx, func(tx pgx.Tx) error {
        r, err := s.reviews.CreateTx(ctx, tx, cmd.UserID, cmd.PlaceID, cmd.Rating, cmd.Text)
        if err != nil {
            return err
        }

        payload := event.ReviewCreatedV1{
            ReviewID: r.ID, PlaceID: r.PlaceID, UserID: r.UserID, Rating: r.Rating,
        }
        msg, err := eventmeta.New(ctx, eventmeta.Envelope{
            EventType:     "review.created",
            SchemaVersion: "1",
            Source:        "review",
            Payload:       payload,
        })
        if err != nil {
            return err
        }
        return s.outboxPublisher.PublishTx(ctx, tx, "kazmaps.review.review.created", msg)
    })
    if err != nil {
        return nil, fmt.Errorf("create review: %w", err)
    }
    return review, nil
}

Forwarder (watermill/components/forwarder) автоматически перетащит строку из outbox в Kafka. См. ../conventions/events.md.

5. Envelope

Все metadata-поля заполняются через helper eventmeta.New — не пиши их руками:

msg, err := eventmeta.New(ctx, eventmeta.Envelope{
    EventType:     "review.created",
    SchemaVersion: "1",
    Source:        "review",
    Payload:       payload,
})

eventmeta.New заполняет:

  • Message.UUID — ULID (идентификатор события),
  • Metadata["Event-Type"],
  • Metadata["Schema-Version"],
  • Metadata["Source-Service"],
  • Metadata["Correlation-Id"] — из ctx,
  • Metadata["Published-At"] — RFC3339Nano,
  • Metadata["traceparent"] — W3C trace context из OTel propagator.

Если какого-то поля в metadata нет — consumer отклонит сообщение.

6. Consumer: регистрация handler'а

В сервисе-подписчике добавь handler в router.

// internal/event/router.go
router.AddNoPublisherHandler(
    "on-review-created",                    // уникальное имя handler'а
    "kazmaps.review.review.created",        // топик
    kafkaSubscriber,
    reviewCreatedHandler.Handle,
)

Middleware stack (CorrelationID → Recoverer → PoisonQueue → Retry → Deduplicator) применяется к router'у один раз в wiring сервиса — одинаково для всех handler'ов. Не добавляй per-handler middleware ad-hoc.

7. Handler signature

// internal/event/review_created.go
package event

import (
    "context"
    "encoding/json"
    "fmt"

    "github.com/ThreeDotsLabs/watermill/message"
)

type ReviewCreatedHandler struct {
    svc *service.Stats
}

func NewReviewCreatedHandler(svc *service.Stats) *ReviewCreatedHandler {
    return &ReviewCreatedHandler{svc: svc}
}

func (h *ReviewCreatedHandler) Handle(msg *message.Message) error {
    if eventType := msg.Metadata.Get("Event-Type"); eventType != "review.created" {
        return nil // чужое событие на том же топике — пропускаем
    }
    var evt ReviewCreatedV1
    if err := json.Unmarshal(msg.Payload, &evt); err != nil {
        return fmt.Errorf("unmarshal review.created: %w", err)
    }
    return h.svc.OnReviewCreated(msg.Context(), evt)
}

Правила handler'а:

  • Возвращай nil → router коммитит offset.
  • Возвращай error → Retry сработает, потом PoisonQueue отправит в DLQ.
  • Не делай msg.Ack() / msg.Nack() вручную. Этим управляет router.

8. Идемпотентность

Kafka даёт at-least-once: один и тот же event может прийти дважды (сетевой глюк, retry, перезапуск consumer'а). Handler обязан корректно отрабатывать повтор.

Стандартные подходы:

  1. UPSERT в БД по event_id:
INSERT INTO review.review_stats (review_id, rating, seen_event_id)
VALUES ($1, $2, $3)
ON CONFLICT (seen_event_id) DO NOTHING;
  1. Deduplicator middleware — Redis SETNX event:<UUID>. Уже в middleware stack, ловит повторы в окне TTL.
  2. Уникальный constraint на доменной колонке: «review_stats unique по review_id» — повторный insert даст ошибку, handler ловит и возвращает nil (факт уже учтён).

Используй хотя бы одну стратегию. Никогда не надейся «наверное, Kafka не повторит».

9. Тесты

Unit через gochannel

Не поднимай Kafka в unit-тестах.

func TestOnReviewCreated_CountsStats(t *testing.T) {
    pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil)
    defer pubsub.Close()

    svc := &fakeStats{}
    h := event.NewReviewCreatedHandler(svc)

    router, _ := message.NewRouter(message.RouterConfig{}, nil)
    router.AddNoPublisherHandler("t", "reviews", pubsub, h.Handle)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    go func() { _ = router.Run(ctx) }()
    <-router.Running()

    msg := message.NewMessage("evt-1", []byte(`{"review_id":42,"place_id":7,"user_id":1,"rating":5}`))
    msg.Metadata.Set("Event-Type", "review.created")
    if err := pubsub.Publish("reviews", msg); err != nil {
        t.Fatalf("publish: %v", err)
    }

    eventually(t, 2*time.Second, func() bool { return svc.handled == 1 })
}

Подробности — в ../conventions/testing.md.

Integration (опционально)

Если нужно проверить интеграцию с настоящим Kafka — testcontainers-go с build-тегом integration. Держи такие тесты отдельной Makefile-целью (make test-integration), чтобы make test оставался быстрым.

10. Schema evolution

  • Non-breaking изменение (добавил optional поле в payload) — остаётся тот же Event-Type, можно bump'нуть Schema-Version с 1 на 2. Старые consumer'ы игнорируют новое поле (они tolerate extra fields).
  • Breaking изменение (удалил поле, сменил тип, перестроил структуру) — новый Event-Type: review.created.v2. Publisher параллельно пишет в оба топика (review.created и review.created.v2) до тех пор, пока все consumer'ы не переехали на v2. После миграции старый публикатор выключается.
  • Consumer'ы tolerate extra fields: json.Unmarshal по умолчанию игнорирует лишние поля в JSON. Но если хочешь надёжнее — проверяй Schema-Version в metadata.

11. Глоссарий

Если событие вводит новую доменную концепцию (consent_revoked, upload_quarantined), добавь короткое определение в ../glossary.md — чтобы ревьюер и новички не гадали.

Чеклист

  • Имя события в past tense, топик kazmaps.<service>.<entity>.<action>.
  • Payload — публичный struct, JSON-теги snake_case.
  • Publisher пишет через outbox внутри бизнес-транзакции.
  • Envelope заполнен через eventmeta.New (все обязательные поля).
  • Consumer-handler возвращает error при проблеме, не делает ack вручную.
  • Handler идемпотентен (UPSERT / unique constraint / Deduplicator).
  • Unit-тест через gochannel написан.
  • DLQ-топик <topic>.dlq создаётся автоматически при первом сообщении, но убедись, что alert на ненулевой DLQ подключён.

Связанные разделы

  • ../conventions/events.md — envelope, middleware, naming, reference.
  • ../patterns/outbox.md — почему publisher всегда пишет в outbox внутри бизнес-транзакции.
  • ../event-catalog.md — добавь запись о новом событии сюда, как только оно появилось.