Skip to Content

CQRS и watermill/components/cqrs

Deep-dive по CQRS в контексте нашего стека. Reference-уровень про события и middleware — в ../conventions/events. Здесь — что такое CQRS в нашем конкретном смысле, когда включать, как заводить типизированные handler’ы поверх Watermill и что делать не надо.

Содержание

Что это

CQRS — Command Query Responsibility Segregation. Идея в том, чтобы разделить модель записи (command) и модель чтения (query): команды мутируют состояние, запросы не имеют побочных эффектов. В Go + Watermill это выливается в практический приём: вместо одного router’а с switch eventType — типизированные handler’ы и шины (bus), которые знают про конкретные типы команд и событий.

CQRS — не «шина сообщений». Не «event-driven». Не «микросервисы». Это способ организовать код внутри одного сервиса так, чтобы мутации и чтения не мешали друг другу.

Архитектура

Общая картина: publisher-сайд отдаёт типизированные события через EventBus в outbox, forwarder раскладывает их в Kafka; consumer-сайд прогоняет Kafka-сообщение через middleware-стек, EventProcessor делает unmarshal в Go-struct и вызывает типизированный handler.

Детализация по компонентам — §Event Bus, §Event Processor, §Middleware stack ниже.

Формы CQRS — не путай варианты

Под «CQRS» в разных командах понимают очень разные вещи. Разведи их в голове, чтобы не спорить о несовместимых штуках.

Lite CQRS (то, что делаем мы)

  • Модели чтения и записи логически разделены в коде одного сервиса.
  • Одна и та же БД для записи и чтения (обычно).
  • Command handler’ы и query handler’ы — отдельные типы, не смешиваются.
  • Типизированные шины через watermill/components/cqrs.

Это то, что имеется в виду, когда handbook говорит «CQRS». В архитектурном обзоре это отмечено как «CQRS — используем частично» (см. ../architecture-overview).

Full CQRS

  • Отдельные БД / отдельные схемы для записи и чтения.
  • Синхронизация между ними — через события: command handler пишет в write-модель и публикует событие, projection-handler читает и обновляет read-модель.
  • Read-модель оптимизирована под конкретный запрос (денормализация, precomputed aggregate, materialized view).

Не используем. Оправдан, когда read нагрузка в десятки раз выше write, или когда read-запросы требуют совершенно другой shape. У нас такого кейса пока нет.

Event Sourcing + CQRS

  • Write-модель = append-only журнал событий.
  • Read-модель = projection, пересчитываемая из журнала.
  • Восстановление состояния — прокрутить все события с начала.

Не используем, не планируем. Event Sourcing как store-of-truth — явно отказались, см. ../architecture-overview.md §«Что мы ЯВНО не делаем».

Когда lite-CQRS оправдан

Сигналы, что сервис дорос до CQRS:

  • Handler count per сервис > 3. В inline-switch’е уже штук 5 case "review.created": / case "review.updated": / …, и каждый case — своя логика распаковки.
  • Разные shapes у request/response. Create принимает одно, Read возвращает DTO совсем другой формы, внутри одного handler’а два неудобно совместимых struct’а.
  • Тесты растут. Unit-тест на один case требует моков для всего сервиса, потому что switch-handler знает про всё сразу.
  • Duplicate unmarshal-кода. Каждый case делает json.Unmarshal в свой struct, после 3-го раза это копипаста.

Если всё это узнал в своём сервисе — включай CQRS. Пример перевода — §Migration ниже.

Когда lite-CQRS не нужен

  • CRUD-сервис с 1-2 endpoint’ами. Generalization-оверхед не окупится.
  • Сервис без Kafka-консьюмера. CQRS — про команды и события, если у тебя только HTTP endpoint’ы, тебе нужен просто service-слой, без шин.
  • Команды от одного типа. Один event handler на один топик — это обычный AddNoPublisherHandler, а не CQRS (см. ../conventions/events).

Event Bus

EventBus — типизированный publisher. Вместо того чтобы вручную строить *message.Message с envelope-metadata и дёргать publisher. Publish(topic, msg), ты просто передаёшь struct события и EventBus сам делает маршалинг, выбор топика и заполнение metadata.

Setup

import "github.com/ThreeDotsLabs/watermill/components/cqrs" marshaler := cqrs.JSONMarshaler{ GenerateName: cqrs.StructName, // имя события = имя типа struct } eventBus, err := cqrs.NewEventBusWithConfig( outboxPublisher, // наш watermill-sql publisher (см. outbox.md) cqrs.EventBusConfig{ GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) { // Все события пишем в логический канал "outbox" — // forwarder развернёт их в правильные Kafka topic'и. return "outbox", nil }, OnPublish: func(params cqrs.OnEventSendParams) error { // Заполняем envelope через наш eventmeta helper. return eventmeta.Enrich(params.Message, eventmeta.Envelope{ EventType: eventTypeFromStructName(params.EventName), SchemaVersion: schemaVersion(params.Event), Source: "review", }) }, Marshaler: marshaler, Logger: watermillLogger, }, )

Комментарии:

  • outboxPublisher — это тот же watermill-sql publisher, что в outbox. CQRS поверх outbox’а: eventBus.Publish внутри InTx = запись в outbox-таблицу.
  • GeneratePublishTopic возвращает "outbox" — логическое имя forwarder’а, не Kafka-topic. Kafka-topic подставляется forwarder’ом позже (см. outbox.md §Forwarder).
  • OnPublish — hook, через который мы заполняем envelope. Это важно: без envelope consumer отклонит сообщение.
  • MarshalerJSONMarshaler{GenerateName: cqrs.StructName}. Имя события у CQRS = имя Go-типа, например ReviewCreatedV1.
  • eventTypeFromStructName("ReviewCreatedV1")"review.created" — конвенция именования в нашем стиле. Маппинг простой: lowercase + разделение по кейсу + drop V1-суффикса. Вариант без маппинга — переименовать struct’ы в review.createdV1, но это не Go.

Usage

func (s *ReviewService) Create(ctx context.Context, cmd CreateReviewCommand) (*domain.Review, error) { var r *domain.Review err := s.db.InTx(ctx, func(tx pgx.Tx) error { saved, err := s.reviews.CreateTx(ctx, tx, cmd) if err != nil { return err } r = saved return s.eventBus.Publish(ctx, &events.ReviewCreatedV1{ ReviewID: saved.ID, PlaceID: saved.PlaceID, UserID: saved.UserID, Rating: saved.Rating, }) }) if err != nil { return nil, fmt.Errorf("create review: %w", err) } return r, nil }

Сравни с raw-вариантом (см. outbox.md §Publisher side): CQRS-версия короче. eventBus.Publish внутри делает:

  1. Marshaler.Marshal(event)*message.Message с payload в JSON и Metadata["name"] = "ReviewCreatedV1".
  2. OnPublish(...) → обогащение envelope через eventmeta.Enrich.
  3. Publisher.Publish("outbox", msg) через watermill-sql — строка в outbox-таблице.

Event Processor

EventProcessor — типизированный consumer. Он знает, какой Go-тип соответствует какому топику, делает unmarshal за тебя, вызывает твой handler с типизированным аргументом.

Setup

eventProcessor, err := cqrs.NewEventProcessorWithConfig( router, cqrs.EventProcessorConfig{ GenerateSubscribeTopic: func(params cqrs.GenerateEventHandlerSubscribeTopicParams) (string, error) { // "ReviewCreatedV1" -> "kazmaps.review.review" return kafkaTopicFor(params.EventName), nil }, SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { return kafka.NewSubscriber(kafka.SubscriberConfig{ Brokers: cfg.Kafka.Brokers, ConsumerGroup: "notification-" + params.HandlerName, Unmarshaler: kafka.DefaultMarshaler{}, }, watermillLogger) }, Marshaler: cqrs.JSONMarshaler{GenerateName: cqrs.StructName}, Logger: watermillLogger, }, )

Важные моменты:

  • SubscriberConstructor вызывается на каждый handler — у каждого будет свой consumer group. Это защищает от «один handler падает, offset другого едет» и даёт независимый rebalance per handler.
  • ConsumerGroup надо делать per (service, handler), не per-service. Иначе две копии одного consumer-group’а разделят партиции и каждый handler получит только половину сообщений.

Handler registration

h := NewNotificationHandler(notifSvc) err = eventProcessor.AddHandlers( cqrs.NewEventHandler("OnReviewCreated", h.OnReviewCreated), cqrs.NewEventHandler("OnUserBanned", h.OnUserBanned), cqrs.NewEventHandler("OnPhotoUploaded", h.OnPhotoUploaded), )

Имя handler’а — уникальное в рамках processor’а. Используется в ConsumerGroup и в логах. Формат — On<EventName>.

Типизированный handler

type NotificationHandler struct { svc *service.NotificationService } func (h *NotificationHandler) OnReviewCreated(ctx context.Context, evt *events.ReviewCreatedV1) error { return h.svc.HandleReviewCreated(ctx, evt.UserID, evt.PlaceID, evt.ReviewID) } func (h *NotificationHandler) OnUserBanned(ctx context.Context, evt *events.UserBannedV1) error { return h.svc.HandleUserBanned(ctx, evt.UserID, evt.Reason) }

Сравни с raw-handler’ом (см. ../conventions/events.md §Handler): тут нет ручного json.Unmarshal, нет ручной проверки Event-Type, нет каста msg.Metadata.Get(...). CQRS всё это делает сам.

Правила handler’а те же, что для raw:

  • Возврат nil — router коммитит offset.
  • Возврат error — Retry сработает (через router middleware), потом PoisonQueue отправит в DLQ.
  • Не делай msg.Ack() / msg.Nack() — это не твоё дело.
  • Handler должен быть идемпотентным. CQRS в этом не помогает: он решает только типизацию, не дедупликацию. См. idempotent-consumer.

Command Bus

Для in-process команд с request/response внутри сервиса. Используется реже — в основном для декомпозиции сложных service-слоёв.

commandBus, err := cqrs.NewCommandBusWithConfig( inProcessPublisher, // gochannel pub-sub для in-process команд cqrs.CommandBusConfig{ GeneratePublishTopic: func(params cqrs.GenerateCommandPublishTopicParams) (string, error) { return "commands." + params.CommandName, nil }, Marshaler: cqrs.JSONMarshaler{GenerateName: cqrs.StructName}, Logger: watermillLogger, }, ) // Послать команду err = commandBus.Send(ctx, &CreateReviewCommand{UserID: 42, PlaceID: 7, Rating: 5})

Когда нужен:

  • Сервис делит бизнес-операцию на этапы, каждый этап — независимый handler (pipeline: ValidateReviewStoreReviewIndexReview).
  • Тестируемая композиция этапов — легче ставить fake handler в CommandBus, чем мокать сам service.

Когда не нужен:

  • Обычный линейный service-метод. Простой вызов функции короче и понятнее.
  • Cross-service: никогда. Command — это внутри одного сервиса. Cross-service — через события.

Middleware stack

Важный момент: CQRS не заменяет message.Router, он встраивается поверх него. Router-middleware применяется к CQRS handler’ам автоматически.

Стандартный stack (см. ../conventions/events):

router, err := message.NewRouter(message.RouterConfig{}, watermillLogger) router.AddMiddleware( middleware.CorrelationID, middleware.Recoverer, poisonQueue(kafkaPub, dlqTopic), middleware.Retry{ MaxRetries: 5, InitialInterval: 500 * time.Millisecond, Multiplier: 2.0, MaxInterval: 30 * time.Second, Logger: watermillLogger, }.Middleware, deduplicator(redisClient), ) // EventProcessor регистрирует handler'ы в этом же router'е — // middleware применяется прозрачно. eventProcessor, _ := cqrs.NewEventProcessorWithConfig(router, cqrs.EventProcessorConfig{...}) eventProcessor.AddHandlers(...) go func() { _ = router.Run(ctx) }()

Порядок middleware важен (объяснение — в ../conventions/events). CQRS ничего не меняет в этом порядке. Не добавляй per-handler middleware руками — единый stack на router’е достаточно.

Marshaler и имена событий

Default JSONMarshaler{GenerateName: cqrs.StructName} даёт:

type ReviewCreatedV1 struct { ... } → event name = "ReviewCreatedV1"

Но у нас в конвенции topic’ов — kazmaps.review.review, а в Event-Typereview.created. Как связать?

Вариант 1: маппинг в GeneratePublishTopic/GenerateSubscribeTopic.

func kafkaTopicFor(eventName string) string { // "ReviewCreatedV1" -> "review.created" eventType := eventTypeFromStructName(eventName) // "review.created" -> "kazmaps.review.review" service := strings.Split(eventType, ".")[0] return "kazmaps." + service + "." + eventType } func eventTypeFromStructName(name string) string { // Убираем версионный суффикс (V1/V2), splitcamel, lowercase. stripped := strings.TrimSuffix(name, "V1") stripped = strings.TrimSuffix(stripped, "V2") // "ReviewCreated" -> "review.created" return camelToDot(stripped) }

И аналогичный маппинг в OnPublish для заполнения Metadata["Event-Type"]:

OnPublish: func(params cqrs.OnEventSendParams) error { params.Message.Metadata.Set("Event-Type", eventTypeFromStructName(params.EventName)) return eventmeta.Enrich(params.Message, eventmeta.Envelope{...}) },

Вариант 2: собственный Marshaler.GenerateName. Пишешь функцию, которая возвращает уже готовый kebab-case/dot-case. Но тогда внутри handler’ов cqrs.NewEventHandler надо будет резолвить имя туда же. Дольше, не выигрывает читаемость.

Мы используем вариант 1 — struct-имена как в Go (ReviewCreatedV1), а маппинг — в CQRS-конфиге. Helper eventTypeFromStructName живёт в pkg/eventmeta/ сервис-репо.

Testing

Unit: gochannel как pub/sub

func TestOnReviewCreated_SendsPush(t *testing.T) { pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil) defer pubsub.Close() router, _ := message.NewRouter(message.RouterConfig{}, nil) processor, _ := cqrs.NewEventProcessorWithConfig(router, cqrs.EventProcessorConfig{ GenerateSubscribeTopic: func(p cqrs.GenerateEventHandlerSubscribeTopicParams) (string, error) { return "test.topic", nil }, SubscriberConstructor: func(p cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) { return pubsub, nil }, Marshaler: cqrs.JSONMarshaler{GenerateName: cqrs.StructName}, }) fakeSvc := &fakeNotificationService{} h := NewNotificationHandler(fakeSvc) _ = processor.AddHandlers(cqrs.NewEventHandler("OnReviewCreated", h.OnReviewCreated)) bus, _ := cqrs.NewEventBusWithConfig(pubsub, cqrs.EventBusConfig{ GeneratePublishTopic: func(p cqrs.GenerateEventPublishTopicParams) (string, error) { return "test.topic", nil }, Marshaler: cqrs.JSONMarshaler{GenerateName: cqrs.StructName}, }) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { _ = router.Run(ctx) }() <-router.Running() _ = bus.Publish(ctx, &events.ReviewCreatedV1{ReviewID: 42, UserID: 1, PlaceID: 7, Rating: 5}) eventually(t, 2*time.Second, func() bool { return fakeSvc.handled == 1 }) }

gochannel подставляется и как publisher, и как subscriber — одна и та же in-memory шина. Marshaler и CQRS поверх неё работают абсолютно так же, как в проде.

Integration

Testcontainers Kafka + реальный watermill-kafka. CQRS поверх них — тот же код, что в проде, меняется только endpoint брокера. Полная схема — outbox.md §Testing и ../conventions/testing.

Anti-patterns

CQRS на CRUD-сервисе с одним событием

Один review.created → один handler в notification. Притянуть сюда CQRS с EventProcessor, EventBus, маппингом имён — over-engineering. Простой router.AddNoPublisherHandler(...) в 3 строчки — читается лучше.

Критерий: если в сервисе меньше 3 типизированных handler’ов на одну доменную модель, не вводи CQRS. Дождись роста.

CQRS без Deduplicator

CQRS делает код чище, но ничего не говорит про idempotency. Если подключил CQRS и не подключил Deduplicator middleware — под retry получишь дубликат обработки. Deduplicator подключается к router’у до CQRS-wiring’а, работает прозрачно для CQRS handler’ов. Детали — idempotent-consumer.

Command между сервисами

// Плохо: посылаем command другому сервису через Kafka userServiceCommandBus.Send(ctx, &SendWelcomeEmailCommand{...})

Command — это in-process артефакт. Между сервисами — события (факты в past tense), не команды (императив). Command implies sync request/response или prerequisites, это плохо ложится на асинхронную шину.

Если надо «попросить» другой сервис что-то сделать:

  • Синхронно → HTTP internal endpoint (см. api-composition).
  • Асинхронно → опубликуй событие-факт (user.registered), и notification-сервис сам решит, нужно ли что-то делать.

Состояние в handler-struct

type NotificationHandler struct { svc *NotificationService seen map[int64]bool // ПЛОХО mu sync.Mutex }

Handler-struct должен быть stateless. Состояние — в service/repository слоях. Причин несколько:

  • Router может вызывать handler конкурентно — нужен sync, что дороже, чем transaction в БД.
  • Состояние в памяти теряется при рестарте pod’а.
  • Разные реплики не видят состояние друг друга.

Если нужен shared state — Redis или БД. Если нужен dedup — см. idempotent-consumer.

Один EventBus на несколько сервисов-реплик

Нельзя два разных сервиса использовать один и тот же ConsumerGroup в Kafka — партиции разделятся, каждый сервис увидит только часть сообщений. ConsumerGroup = <service>-<handler>, не handler без префикса.

Raw и CQRS в одном router’е

Включил CQRS → перевели все handler’ы сервиса, не держи параллельно raw AddNoPublisherHandler для того же топика. Двойной consumer на один event = два раза обработано. Cleanly switch, без «смешанных периодов». Dual-publish в пределах publisher’а допустим в миграции (см. outbox.md §Migration), dual-consume — нет.

Миграция от switch-driven handler’ов к CQRS

План, когда сервис созрел до CQRS (3+ handler’а, рост event-типов):

  1. Определи Go-типы событий. Если ещё нет — создай internal/events/types.go со struct’ами ReviewCreatedV1, UserBannedV1 и т.п. JSON-теги в snake_case, совпадают с тем, что сейчас идёт в payload.
  2. Настрой Marshaler + маппинг имён. cqrs.JSONMarshaler{ GenerateName: cqrs.StructName} + функция eventTypeFromStructName + kafkaTopicFor.
  3. Заведи EventProcessor. Подключи к существующему router’у. Middleware stack остаётся прежним — CQRS его наследует.
  4. Перепиши handler’ы в типизированный вид. Каждый case в switch превращается в метод OnReviewCreated(ctx, *ReviewCreatedV1).
  5. Зарегистрируй handler’ы через AddHandlers. Старые AddNoPublisherHandler удали.
  6. Заведи EventBus на publisher-сайде. Замени outboxPublisher. PublishTx(ctx, tx, topic, msg) на eventBus.Publish(ctx, &ReviewCreatedV1{...}). Внутри InTx — то же место, та же транзакция.
  7. Тесты. Unit-тесты перепиши через типизированные handler’ы и gochannel (см. §Testing).

Сначала на publisher-стороне, потом на consumer-стороне. Не наоборот — консьюмеру всё равно, какой API у publisher’а.

FAQ

«CQRS == Event Sourcing?» Нет. Это разные идеи, часто объединяют в литературе, но ES требует журнала событий как store-of-truth, а CQRS — нет. У нас CQRS без ES.

«Нужен отдельный read-model?» В lite-CQRS — нет, одна БД. В full-CQRS — да, read-model обновляется через projection по событиям. Если обнаружил, что read-запросы мешают write-нагрузке в БД (измерил!), сначала пробуй индексы, read-реплики, кэш — и только если не хватает, full CQRS. Это серьёзный шаг.

«Saga поверх CQRS?» Возможно, но выходит за рамки этой страницы. Saga — координация cross-service последовательности с компенсирующими действиями. В ../architecture-overview отмечено как «планируется к подключению», пока cross-service операций с откатом нет.

«Можно ли наполовину CQRS — типизировать только часть handler’ов?» Можно в переходный период, но в итоговом состоянии — всё через CQRS или всё через raw. Смешанный режим тяжело поддерживать: разные шаблоны в одном сервисе, разные тесты, новичок теряется.

«EventBus vs CommandBus — когда что?» EventBus — для фактов (past tense, прошедшее), может быть cross-service через Kafka. CommandBus — для намерений (императив), всегда in-process, in-memory. Если сомневаешься — скорее всего нужен EventBus.

«CQRS знает про outbox?» Не напрямую. EventBus — это обёртка над Publisher. Если в Publisher передать watermill-sql outbox-publisher — все публикации пойдут через outbox. Если передать прямой kafka. Publisher — пойдут напрямую (dual-write, не делай так).

Связанные разделы

Last updated on