Перейти к содержанию

Event catalog

Каталог Kafka-событий. Одна запись на одно событие: как называется topic, кто publisher, какой payload, кто consumer'ы, сколько живёт сообщение в топике.

Это источник правды для событий. Если ты добавил новое событие и не завёл запись здесь — событие не деплоим. Ревьюер проверяет соответствие запись ↔ код.

Связка с services-catalog.md: сервис в каталоге перечисляет имена событий, которые публикует/потребляет — как метки навигации. Поля payload, schema version и consumers — здесь.

Содержание

Конвенции

Все события следуют единым правилам. Полный разбор Watermill / outbox — в conventions/events.md.

Topic naming

kazmaps.<service>.<entity>.<action>
  • <service> — имя сервиса-publisher'а (user, review, media, ...).
  • <entity> — доменная сущность в единственном числе.
  • <action> — глагол в прошедшем времени: created, updated, deleted, banned, completed.

На текущий момент у части исторических топиков короткое имя без префикса kazmaps. и без удвоенного <service> (например, review.created, photo.uploaded) — это коммитится как есть, каталог отражает фактические имена топиков в brokers, не идеал. При переименованиях следуй полному формату.

Envelope

Каждое сообщение несёт metadata (Watermill message.Metadata):

Поле Что это
Event-Type Тип события (review.created).
Schema-Version Версия payload'а ("1").
Correlation-Id ULID запроса, породившего событие.
Source-Service Имя publisher'а.
Published-At RFC3339Nano.
traceparent W3C trace context.

Описано в conventions/events.md.

Versioning

  • Non-breaking (добавили опциональное поле) → оставь ту же Schema-Version. Consumer'ы, ещё не знающие о новом поле, молча игнорируют его.
  • Breaking (удалили поле, сменили тип, переименовали) → два варианта:
  • Bump Schema-Version в metadata и временно dual-publish в старую и новую схему, пока consumer'ы не переехали.
  • Новый topic kazmaps.<service>.<entity>.<action>.v2 — для радикального редизайна payload'а. Старый топик живёт до остановки старых consumer'ов.

Retention

Класс события TTL в топике
Business event (regular) 7 дней
Sensitive (ban, role change) 30 дней
DLQ 90 дней

Retention настраивается на уровне Kafka cluster через topic config. Сервис не ретеншит сам.

DLQ naming

<topic>.dlq

Пример: kazmaps.review.review.created.dlq. Конфигурация Watermill PoisonQueue описана в conventions/events.md.

Schema location

Пока JSON-схемы payload'а описываются inline в этом каталоге — поля и типы перечислены в каждой записи. Когда появится отдельный contract-репо с JSON-schema файлами, payload-детали переедут туда, а этот каталог оставит только метаданные (topic, publisher, version, consumers, ссылка на schema).

Как добавлять новое событие

  1. Придумай имя топика по конвенции выше.
  2. Открой how-to/add-kafka-event.md и пройди по шагам: payload type, publisher, outbox, envelope.
  3. Добавь запись в этот каталог в том же PR, что и код события.
  4. Если есть consumer(ы) — пропиши их тоже; список consumer'ов обновляет тот, кто подписывается.

Без записи в каталоге CI не даст смержить PR: scripts/check-events.sh (или эквивалент) сверяет event-catalog.md с константами в internal/event/.


user.registered

Topic: user.registered Publisher: user Schema version: 1 Purpose: регистрация завершена. Пользователь создан в auth.users, доступен для логина.

Payload:

{
  "user_id": 1234,
  "name": "Abay",
  "email": "a***@example.com",
  "phone": "+7 *** *** 67"
}

Поля email и phone — опциональные (*string). В топике передаются в маскированном виде (те же правила, что для логов — см. conventions/logging.md).

Consumers: (зарезервированы — consumer'ы поднимутся отдельными PR'ами в своих сервисах) Retention: 7 дней


user.updated

Topic: user.updated Publisher: user Schema version: 1 Purpose: пользователь обновил профиль. Содержит список изменённых полей, не сами значения.

Payload:

{
  "user_id": 1234,
  "changed_fields": ["name", "avatar_url"]
}

Consumers:Retention: 7 дней


user.banned

Topic: user.banned Publisher: user Schema version: 1 Purpose: admin забанил пользователя. Остальные сервисы должны прекратить активность от имени этого пользователя (отменить подписки, снять токены и т.п.).

Payload:

{
  "user_id": 1234,
  "reason": "spam",
  "expires_at": 1735689600
}

expires_at — unix seconds, опционально. Пусто = permanent ban.

Consumers:Retention: 30 дней (sensitive event)


user.unbanned

Topic: user.unbanned Publisher: user Schema version: 1 Purpose: admin снял бан с пользователя.

Payload:

{
  "user_id": 1234
}

Consumers:Retention: 30 дней (sensitive event)


user.level_changed

Topic: user.level_changed Publisher: user Schema version: 1 Purpose: изменился уровень пользователя (геймификация). Consumer'ы могут обновлять denormalized badge'и.

Payload:

{
  "user_id": 1234,
  "old_level": 3,
  "new_level": 4
}

Consumers: - notification — отправляет уведомление «вы достигли нового уровня».

Retention: 7 дней


review.created

Topic: review.created Publisher: review Schema version: 1 Purpose: пользователь создал отзыв. Consumer'ы пересчитывают агрегаты (place_id → avg_rating, user_id → review_count).

Payload:

{
  "review_id": 1001,
  "place_id": 7,
  "user_id": 1234,
  "rating": 5
}

Consumers:Retention: 7 дней


review.updated

Topic: review.updated Publisher: review Schema version: 1 Purpose: отзыв обновлён — интересует изменение rating'а для пересчёта агрегатов.

Payload:

{
  "review_id": 1001,
  "place_id": 7,
  "old_rating": 3,
  "new_rating": 5
}

Consumers:Retention: 7 дней


review.deleted

Topic: review.deleted Publisher: review Schema version: 1 Purpose: отзыв удалён (soft-delete). Consumer'ы уменьшают агрегаты на rating.

Payload:

{
  "review_id": 1001,
  "place_id": 7,
  "rating": 5
}

Consumers:Retention: 7 дней


review.replied

Topic: review.replied Publisher: review Schema version: 1 Purpose: владелец места ответил на отзыв. Автор отзыва получает уведомление.

Payload:

{
  "review_id": 1001,
  "place_id": 7,
  "reply_user_id": 5678
}

Consumers: - notification — отправляет уведомление автору отзыва о новом реплае.

Retention: 7 дней


photo.uploaded

Topic: photo.uploaded Publisher: media Schema version: 1 Purpose: фото загружено и обработано (derivatives готовы), доступно по URL.

Payload:

{
  "photo_id": 42,
  "place_id": 7,
  "user_id": 1234,
  "type": "place",
  "url": "https://cdn.example/p/42.webp"
}

user_id — опциональный (*int64); пусто у системных/импортированных фото. type — семантика фото (place, review, avatar, ...).

Consumers:Retention: 7 дней


photo.deleted

Topic: photo.deleted Publisher: media Schema version: 1 Purpose: фото удалено. Consumer'ы чистят ссылки.

Payload:

{
  "photo_id": 42,
  "place_id": 7,
  "type": "place"
}

Consumers:Retention: 7 дней


photo.tagged

Topic: photo.tagged Publisher: media Schema version: 1 Purpose: к фото применены теги (автоматически или модератором).

Payload:

{
  "photo_id": 42,
  "place_id": 7,
  "tags": ["food", "interior"]
}

Consumers:Retention: 7 дней


media.processing

Topic: media.processing Publisher: media Schema version: 1 Purpose: внутренний pipeline-топик сервиса media. Handler загрузки (HTTP) кладёт задачу processing'а, worker того же сервиса её забирает. Это не межсервисное API и другие сервисы не должны на него подписываться.

Payload:

{
  "photo_id": 42,
  "place_id": 7,
  "user_id": 1234,
  "url": "https://minio.internal/bucket/raw/42.jpg",
  "type": "place"
}

Consumers: - media (тот же сервис, worker-инстанс).

Retention: 7 дней


moderation.approved

Topic: moderation.approved Publisher: модерационный сервис (внешний, пока не описан в этом каталоге — consumer'ы потребляют по факту появления). Schema version: 1 Purpose: фото прошло модерацию. media снимает флаг pending и делает публикацию в photo.uploaded (если процессинг уже завершён).

Payload:

{
  "photo_id": 42
}

Consumers: - media — обновляет статус фото.

Retention: 30 дней (sensitive event)


moderation.rejected

Topic: moderation.rejected Publisher: модерационный сервис (внешний). Schema version: 1 Purpose: фото отклонено модерацией с причиной. media помечает фото как rejected; пользователь получает уведомление через notification.

Payload:

{
  "photo_id": 42,
  "reason": "inappropriate content"
}

Consumers: - media — помечает фото как отклонённое. - notification — отправляет уведомление автору фото.

Retention: 30 дней (sensitive event)


user.push_token.created / user.push_token.updated / user.push_token.deleted

Topics: user.push_token.created, user.push_token.updated, user.push_token.deleted Publisher: user (ожидается; consumer уже подписан) Schema version: 1 Purpose: синхронизация device push-токенов из user-сервиса в notification-сервис. notification держит локальную таблицу push_tokens для скорости отправки push-а без round-trip в user.

Payload:

{
  "user_id": 1234,
  "token": "fcm_device_token_...",
  "platform": "android"
}

Consumers: - notification — upsert (на created/updated) или delete (на deleted) в таблицу push_tokens.

Retention: 7 дней


DLQ-топики

Все основные топики имеют пару <topic>.dlq. Сообщения попадают туда после исчерпания retry. Чтение DLQ — ручное, через psql / kafka-console- consumer, по инциденту. Retention 90 дней.

Примеры:

review.created.dlq
photo.uploaded.dlq
moderation.rejected.dlq
user.push_token.created.dlq

Что не в каталоге

В этот каталог не входят:

  • Тestовые топики (*_test, *-local) — локальная разработка.
  • Топики, упомянутые в KAFKA_NOTIFICATION_TOPICS сервиса notification для будущих сервисов (booking.*, queue.*, correction.* и т.д.) — они попадут сюда, когда соответствующие publisher-сервисы появятся и начнут публиковать реальные события. До этого момента notification просто подписывается и ничего не получает.
  • Internal команды через HTTP — это REST, см. conventions/http-api.md.