CQRS и watermill/components/cqrs¶
Deep-dive по CQRS в контексте нашего стека. Reference-уровень про
события и middleware — в ../conventions/events.md.
Здесь — что такое CQRS в нашем конкретном смысле, когда включать,
как заводить типизированные handler'ы поверх Watermill и что делать
не надо.
Содержание¶
- Что это
- Архитектура
- Формы CQRS — не путай варианты
- Когда lite-CQRS оправдан
- Когда lite-CQRS не нужен
- Event Bus
- Event Processor
- Command Bus
- Middleware stack
- Marshaler и имена событий
- Testing
- Anti-patterns
- Миграция от switch-driven handler'ов к CQRS
- FAQ
- Связанные разделы
Что это¶
CQRS — Command Query Responsibility Segregation. Идея в том, чтобы
разделить модель записи (command) и модель чтения (query): команды
мутируют состояние, запросы не имеют побочных эффектов. В Go + Watermill
это выливается в практический приём: вместо одного router'а с
switch eventType — типизированные handler'ы и шины (bus), которые
знают про конкретные типы команд и событий.
CQRS — не «шина сообщений». Не «event-driven». Не «микросервисы». Это способ организовать код внутри одного сервиса так, чтобы мутации и чтения не мешали друг другу.
Архитектура¶
Общая картина: publisher-сайд отдаёт типизированные события через
EventBus в outbox, forwarder раскладывает их в Kafka; consumer-сайд
прогоняет Kafka-сообщение через middleware-стек, EventProcessor
делает unmarshal в Go-struct и вызывает типизированный handler.
flowchart LR
subgraph Publisher["Publisher side"]
SVC[Service Layer]
EB[EventBus]
SVC -->|"Publish(ctx, *ReviewCreatedV1)"| EB
EB -->|JSON marshal + envelope| K1[Kafka topic: kazmaps.review.created]
end
subgraph Consumer["Consumer side"]
K2[Kafka topic: kazmaps.review.created]
ROUTER[Router + middleware]
EP[EventProcessor]
H[OnReviewCreated]
SVC2[Service Layer]
K2 --> ROUTER
ROUTER -->|"CorrelationID → Recoverer → PoisonQueue → Retry → Deduplicator"| EP
EP -->|"unmarshal → typed *ReviewCreatedV1"| H
H --> SVC2
end
Детализация по компонентам — §Event Bus, §Event Processor, §Middleware stack ниже.
Формы CQRS — не путай варианты¶
Под «CQRS» в разных командах понимают очень разные вещи. Разведи их в голове, чтобы не спорить о несовместимых штуках.
Lite CQRS (то, что делаем мы)¶
- Модели чтения и записи логически разделены в коде одного сервиса.
- Одна и та же БД для записи и чтения (обычно).
- Command handler'ы и query handler'ы — отдельные типы, не смешиваются.
- Типизированные шины через
watermill/components/cqrs.
Это то, что имеется в виду, когда handbook говорит «CQRS». В
архитектурном обзоре это отмечено как «CQRS — используем частично»
(см. ../architecture-overview.md).
Full CQRS¶
- Отдельные БД / отдельные схемы для записи и чтения.
- Синхронизация между ними — через события: command handler пишет в write-модель и публикует событие, projection-handler читает и обновляет read-модель.
- Read-модель оптимизирована под конкретный запрос (денормализация, precomputed aggregate, materialized view).
Не используем. Оправдан, когда read нагрузка в десятки раз выше write, или когда read-запросы требуют совершенно другой shape. У нас такого кейса пока нет.
Event Sourcing + CQRS¶
- Write-модель = append-only журнал событий.
- Read-модель = projection, пересчитываемая из журнала.
- Восстановление состояния — прокрутить все события с начала.
Не используем, не планируем. Event Sourcing как store-of-truth —
явно отказались, см. architecture-overview.md §«Что мы ЯВНО не
делаем».
Когда lite-CQRS оправдан¶
Сигналы, что сервис дорос до CQRS:
- Handler count per сервис > 3. В inline-switch'е уже штук 5
case "review.created":/case "review.updated":/ ..., и каждый case — своя логика распаковки. - Разные shapes у request/response. Create принимает одно, Read возвращает DTO совсем другой формы, внутри одного handler'а два неудобно совместимых struct'а.
- Тесты растут. Unit-тест на один case требует моков для всего сервиса, потому что switch-handler знает про всё сразу.
- Duplicate unmarshal-кода. Каждый case делает
json.Unmarshalв свой struct, после 3-го раза это копипаста.
Если всё это узнал в своём сервисе — включай CQRS. Пример перевода — §Migration ниже.
Когда lite-CQRS не нужен¶
- CRUD-сервис с 1-2 endpoint'ами. Generalization-оверхед не окупится.
- Сервис без Kafka-консьюмера. CQRS — про команды и события, если у тебя только HTTP endpoint'ы, тебе нужен просто service-слой, без шин.
- Команды от одного типа. Один event handler на один топик — это
обычный
AddNoPublisherHandler, а не CQRS (см.../conventions/events.md).
Event Bus¶
EventBus — типизированный publisher. Вместо того чтобы вручную
строить *message.Message с envelope-metadata и дёргать publisher.
Publish(topic, msg), ты просто передаёшь struct события и EventBus
сам делает маршалинг, выбор топика и заполнение metadata.
Setup¶
import "github.com/ThreeDotsLabs/watermill/components/cqrs"
marshaler := cqrs.JSONMarshaler{
GenerateName: cqrs.StructName, // имя события = имя типа struct
}
eventBus, err := cqrs.NewEventBusWithConfig(
outboxPublisher, // наш watermill-sql publisher (см. outbox.md)
cqrs.EventBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
// Все события пишем в логический канал "outbox" —
// forwarder развернёт их в правильные Kafka topic'и.
return "outbox", nil
},
OnPublish: func(params cqrs.OnEventSendParams) error {
// Заполняем envelope через наш eventmeta helper.
return eventmeta.Enrich(params.Message, eventmeta.Envelope{
EventType: eventTypeFromStructName(params.EventName),
SchemaVersion: schemaVersion(params.Event),
Source: "review",
})
},
Marshaler: marshaler,
Logger: watermillLogger,
},
)
Комментарии:
outboxPublisher— это тот жеwatermill-sqlpublisher, что вoutbox.md. CQRS поверх outbox'а:eventBus.PublishвнутриInTx= запись в outbox-таблицу.GeneratePublishTopicвозвращает"outbox"— логическое имя forwarder'а, не Kafka-topic. Kafka-topic подставляется forwarder'ом позже (см.outbox.md§Forwarder).OnPublish— hook, через который мы заполняем envelope. Это важно: без envelope consumer отклонит сообщение.Marshaler—JSONMarshaler{GenerateName: cqrs.StructName}. Имя события у CQRS = имя Go-типа, напримерReviewCreatedV1.eventTypeFromStructName("ReviewCreatedV1")→"review.created"— конвенция именования в нашем стиле. Маппинг простой: lowercase + разделение по кейсу + dropV1-суффикса. Вариант без маппинга — переименовать struct'ы вreview.createdV1, но это не Go.
Usage¶
func (s *ReviewService) Create(ctx context.Context, cmd CreateReviewCommand) (*domain.Review, error) {
var r *domain.Review
err := s.db.InTx(ctx, func(tx pgx.Tx) error {
saved, err := s.reviews.CreateTx(ctx, tx, cmd)
if err != nil {
return err
}
r = saved
return s.eventBus.Publish(ctx, &events.ReviewCreatedV1{
ReviewID: saved.ID,
PlaceID: saved.PlaceID,
UserID: saved.UserID,
Rating: saved.Rating,
})
})
if err != nil {
return nil, fmt.Errorf("create review: %w", err)
}
return r, nil
}
Сравни с raw-вариантом (см. outbox.md §Publisher side):
CQRS-версия короче. eventBus.Publish внутри делает:
Marshaler.Marshal(event)→*message.Messageс payload в JSON иMetadata["name"] = "ReviewCreatedV1".OnPublish(...)→ обогащение envelope черезeventmeta.Enrich.Publisher.Publish("outbox", msg)черезwatermill-sql— строка в outbox-таблице.
Event Processor¶
EventProcessor — типизированный consumer. Он знает, какой Go-тип
соответствует какому топику, делает unmarshal за тебя, вызывает твой
handler с типизированным аргументом.
Setup¶
eventProcessor, err := cqrs.NewEventProcessorWithConfig(
router,
cqrs.EventProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.GenerateEventHandlerSubscribeTopicParams) (string, error) {
// "ReviewCreatedV1" -> "kazmaps.review.review.created"
return kafkaTopicFor(params.EventName), nil
},
SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
return kafka.NewSubscriber(kafka.SubscriberConfig{
Brokers: cfg.Kafka.Brokers,
ConsumerGroup: "notification-" + params.HandlerName,
Unmarshaler: kafka.DefaultMarshaler{},
}, watermillLogger)
},
Marshaler: cqrs.JSONMarshaler{GenerateName: cqrs.StructName},
Logger: watermillLogger,
},
)
Важные моменты:
SubscriberConstructorвызывается на каждый handler — у каждого будет свой consumer group. Это защищает от «один handler падает, offset другого едет» и даёт независимый rebalance per handler.ConsumerGroupнадо делать per (service, handler), не per-service. Иначе две копии одного consumer-group'а разделят партиции и каждый handler получит только половину сообщений.
Handler registration¶
h := NewNotificationHandler(notifSvc)
err = eventProcessor.AddHandlers(
cqrs.NewEventHandler("OnReviewCreated", h.OnReviewCreated),
cqrs.NewEventHandler("OnUserBanned", h.OnUserBanned),
cqrs.NewEventHandler("OnPhotoUploaded", h.OnPhotoUploaded),
)
Имя handler'а — уникальное в рамках processor'а. Используется в
ConsumerGroup и в логах. Формат — On<EventName>.
Типизированный handler¶
type NotificationHandler struct {
svc *service.NotificationService
}
func (h *NotificationHandler) OnReviewCreated(ctx context.Context, evt *events.ReviewCreatedV1) error {
return h.svc.HandleReviewCreated(ctx, evt.UserID, evt.PlaceID, evt.ReviewID)
}
func (h *NotificationHandler) OnUserBanned(ctx context.Context, evt *events.UserBannedV1) error {
return h.svc.HandleUserBanned(ctx, evt.UserID, evt.Reason)
}
Сравни с raw-handler'ом (см. ../conventions/events.md
§Handler): тут нет ручного json.Unmarshal, нет ручной проверки
Event-Type, нет каста msg.Metadata.Get(...). CQRS всё это делает сам.
Правила handler'а те же, что для raw:
- Возврат
nil— router коммитит offset. - Возврат
error— Retry сработает (через router middleware), потом PoisonQueue отправит в DLQ. - Не делай
msg.Ack()/msg.Nack()— это не твоё дело. - Handler должен быть идемпотентным. CQRS в этом не помогает: он
решает только типизацию, не дедупликацию. См.
idempotent-consumer.md.
Command Bus¶
Для in-process команд с request/response внутри сервиса. Используется реже — в основном для декомпозиции сложных service-слоёв.
commandBus, err := cqrs.NewCommandBusWithConfig(
inProcessPublisher, // gochannel pub-sub для in-process команд
cqrs.CommandBusConfig{
GeneratePublishTopic: func(params cqrs.GenerateCommandPublishTopicParams) (string, error) {
return "commands." + params.CommandName, nil
},
Marshaler: cqrs.JSONMarshaler{GenerateName: cqrs.StructName},
Logger: watermillLogger,
},
)
// Послать команду
err = commandBus.Send(ctx, &CreateReviewCommand{UserID: 42, PlaceID: 7, Rating: 5})
Когда нужен:
- Сервис делит бизнес-операцию на этапы, каждый этап — независимый
handler (pipeline:
ValidateReview→StoreReview→IndexReview). - Тестируемая композиция этапов — легче ставить fake handler в CommandBus, чем мокать сам service.
Когда не нужен:
- Обычный линейный service-метод. Простой вызов функции короче и понятнее.
- Cross-service: никогда. Command — это внутри одного сервиса. Cross-service — через события.
Middleware stack¶
Важный момент: CQRS не заменяет message.Router, он встраивается
поверх него. Router-middleware применяется к CQRS handler'ам
автоматически.
Стандартный stack (см. ../conventions/events.md):
router, err := message.NewRouter(message.RouterConfig{}, watermillLogger)
router.AddMiddleware(
middleware.CorrelationID,
middleware.Recoverer,
poisonQueue(kafkaPub, dlqTopic),
middleware.Retry{
MaxRetries: 5,
InitialInterval: 500 * time.Millisecond,
Multiplier: 2.0,
MaxInterval: 30 * time.Second,
Logger: watermillLogger,
}.Middleware,
deduplicator(redisClient),
)
// EventProcessor регистрирует handler'ы в этом же router'е —
// middleware применяется прозрачно.
eventProcessor, _ := cqrs.NewEventProcessorWithConfig(router, cqrs.EventProcessorConfig{...})
eventProcessor.AddHandlers(...)
go func() { _ = router.Run(ctx) }()
Порядок middleware важен (объяснение — в
../conventions/events.md). CQRS ничего не
меняет в этом порядке. Не добавляй per-handler middleware руками —
единый stack на router'е достаточно.
Marshaler и имена событий¶
Default JSONMarshaler{GenerateName: cqrs.StructName} даёт:
Но у нас в конвенции topic'ов — kazmaps.review.review.created, а в
Event-Type — review.created. Как связать?
Вариант 1: маппинг в GeneratePublishTopic/GenerateSubscribeTopic.
func kafkaTopicFor(eventName string) string {
// "ReviewCreatedV1" -> "review.created"
eventType := eventTypeFromStructName(eventName)
// "review.created" -> "kazmaps.review.review.created"
service := strings.Split(eventType, ".")[0]
return "kazmaps." + service + "." + eventType
}
func eventTypeFromStructName(name string) string {
// Убираем версионный суффикс (V1/V2), splitcamel, lowercase.
stripped := strings.TrimSuffix(name, "V1")
stripped = strings.TrimSuffix(stripped, "V2")
// "ReviewCreated" -> "review.created"
return camelToDot(stripped)
}
И аналогичный маппинг в OnPublish для заполнения
Metadata["Event-Type"]:
OnPublish: func(params cqrs.OnEventSendParams) error {
params.Message.Metadata.Set("Event-Type", eventTypeFromStructName(params.EventName))
return eventmeta.Enrich(params.Message, eventmeta.Envelope{...})
},
Вариант 2: собственный Marshaler.GenerateName. Пишешь функцию,
которая возвращает уже готовый kebab-case/dot-case. Но тогда внутри
handler'ов cqrs.NewEventHandler надо будет резолвить имя туда же.
Дольше, не выигрывает читаемость.
Мы используем вариант 1 — struct-имена как в Go (ReviewCreatedV1), а
маппинг — в CQRS-конфиге. Helper eventTypeFromStructName живёт в
pkg/eventmeta/ сервис-репо.
Testing¶
Unit: gochannel как pub/sub¶
func TestOnReviewCreated_SendsPush(t *testing.T) {
pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil)
defer pubsub.Close()
router, _ := message.NewRouter(message.RouterConfig{}, nil)
processor, _ := cqrs.NewEventProcessorWithConfig(router, cqrs.EventProcessorConfig{
GenerateSubscribeTopic: func(p cqrs.GenerateEventHandlerSubscribeTopicParams) (string, error) {
return "test.topic", nil
},
SubscriberConstructor: func(p cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
return pubsub, nil
},
Marshaler: cqrs.JSONMarshaler{GenerateName: cqrs.StructName},
})
fakeSvc := &fakeNotificationService{}
h := NewNotificationHandler(fakeSvc)
_ = processor.AddHandlers(cqrs.NewEventHandler("OnReviewCreated", h.OnReviewCreated))
bus, _ := cqrs.NewEventBusWithConfig(pubsub, cqrs.EventBusConfig{
GeneratePublishTopic: func(p cqrs.GenerateEventPublishTopicParams) (string, error) {
return "test.topic", nil
},
Marshaler: cqrs.JSONMarshaler{GenerateName: cqrs.StructName},
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { _ = router.Run(ctx) }()
<-router.Running()
_ = bus.Publish(ctx, &events.ReviewCreatedV1{ReviewID: 42, UserID: 1, PlaceID: 7, Rating: 5})
eventually(t, 2*time.Second, func() bool { return fakeSvc.handled == 1 })
}
gochannel подставляется и как publisher, и как subscriber — одна и
та же in-memory шина. Marshaler и CQRS поверх неё работают абсолютно
так же, как в проде.
Integration¶
Testcontainers Kafka + реальный watermill-kafka. CQRS поверх них —
тот же код, что в проде, меняется только endpoint брокера. Полная схема
— outbox.md §Testing и
../conventions/testing.md.
Anti-patterns¶
CQRS на CRUD-сервисе с одним событием¶
Один review.created → один handler в notification. Притянуть сюда
CQRS с EventProcessor, EventBus, маппингом имён — over-engineering.
Простой router.AddNoPublisherHandler(...) в 3 строчки — читается
лучше.
Критерий: если в сервисе меньше 3 типизированных handler'ов на одну доменную модель, не вводи CQRS. Дождись роста.
CQRS без Deduplicator¶
CQRS делает код чище, но ничего не говорит про idempotency. Если
подключил CQRS и не подключил Deduplicator middleware — под
retry получишь дубликат обработки. Deduplicator подключается к
router'у до CQRS-wiring'а, работает прозрачно для CQRS handler'ов.
Детали — idempotent-consumer.md.
Command между сервисами¶
// Плохо: посылаем command другому сервису через Kafka
userServiceCommandBus.Send(ctx, &SendWelcomeEmailCommand{...})
Command — это in-process артефакт. Между сервисами — события (факты в past tense), не команды (императив). Command implies sync request/response или prerequisites, это плохо ложится на асинхронную шину.
Если надо «попросить» другой сервис что-то сделать:
- Синхронно → HTTP internal endpoint (см.
api-composition.md). - Асинхронно → опубликуй событие-факт (
user.registered), и notification-сервис сам решит, нужно ли что-то делать.
Состояние в handler-struct¶
type NotificationHandler struct {
svc *NotificationService
seen map[int64]bool // ПЛОХО
mu sync.Mutex
}
Handler-struct должен быть stateless. Состояние — в service/repository слоях. Причин несколько:
- Router может вызывать handler конкурентно — нужен sync, что дороже, чем transaction в БД.
- Состояние в памяти теряется при рестарте pod'а.
- Разные реплики не видят состояние друг друга.
Если нужен shared state — Redis или БД. Если нужен dedup — см.
idempotent-consumer.md.
Один EventBus на несколько сервисов-реплик¶
Нельзя два разных сервиса использовать один и тот же ConsumerGroup в
Kafka — партиции разделятся, каждый сервис увидит только часть
сообщений. ConsumerGroup = <service>-<handler>, не handler без
префикса.
Raw и CQRS в одном router'е¶
Включил CQRS → перевели все handler'ы сервиса, не держи параллельно
raw AddNoPublisherHandler для того же топика. Двойной consumer на
один event = два раза обработано. Cleanly switch, без «смешанных
периодов». Dual-publish в пределах publisher'а допустим в миграции
(см. outbox.md §Migration), dual-consume — нет.
Миграция от switch-driven handler'ов к CQRS¶
План, когда сервис созрел до CQRS (3+ handler'а, рост event-типов):
- Определи Go-типы событий. Если ещё нет — создай
internal/events/types.goсо struct'амиReviewCreatedV1,UserBannedV1и т.п. JSON-теги в snake_case, совпадают с тем, что сейчас идёт в payload. - Настрой Marshaler + маппинг имён.
cqrs.JSONMarshaler{ GenerateName: cqrs.StructName}+ функцияeventTypeFromStructName+kafkaTopicFor. - Заведи EventProcessor. Подключи к существующему router'у. Middleware stack остаётся прежним — CQRS его наследует.
- Перепиши handler'ы в типизированный вид. Каждый
caseв switch превращается в методOnReviewCreated(ctx, *ReviewCreatedV1). - Зарегистрируй handler'ы через
AddHandlers. СтарыеAddNoPublisherHandlerудали. - Заведи EventBus на publisher-сайде. Замени
outboxPublisher. PublishTx(ctx, tx, topic, msg)наeventBus.Publish(ctx, &ReviewCreatedV1{...}). ВнутриInTx— то же место, та же транзакция. - Тесты. Unit-тесты перепиши через типизированные handler'ы и gochannel (см. §Testing).
Сначала на publisher-стороне, потом на consumer-стороне. Не наоборот — консьюмеру всё равно, какой API у publisher'а.
FAQ¶
«CQRS == Event Sourcing?» Нет. Это разные идеи, часто объединяют в литературе, но ES требует журнала событий как store-of-truth, а CQRS — нет. У нас CQRS без ES.
«Нужен отдельный read-model?» В lite-CQRS — нет, одна БД. В full-CQRS — да, read-model обновляется через projection по событиям. Если обнаружил, что read-запросы мешают write-нагрузке в БД (измерил!), сначала пробуй индексы, read-реплики, кэш — и только если не хватает, full CQRS. Это серьёзный шаг.
«Saga поверх CQRS?» Возможно, но выходит за рамки этой страницы.
Saga — координация cross-service последовательности с компенсирующими
действиями. В
../architecture-overview.md отмечено
как «планируется к подключению», пока cross-service операций с откатом
нет.
«Можно ли наполовину CQRS — типизировать только часть handler'ов?» Можно в переходный период, но в итоговом состоянии — всё через CQRS или всё через raw. Смешанный режим тяжело поддерживать: разные шаблоны в одном сервисе, разные тесты, новичок теряется.
«EventBus vs CommandBus — когда что?» EventBus — для фактов (past tense, прошедшее), может быть cross-service через Kafka. CommandBus — для намерений (императив), всегда in-process, in-memory. Если сомневаешься — скорее всего нужен EventBus.
«CQRS знает про outbox?» Не напрямую. EventBus — это обёртка над
Publisher. Если в Publisher передать watermill-sql outbox-publisher —
все публикации пойдут через outbox. Если передать прямой kafka.
Publisher — пойдут напрямую (dual-write, не делай так).
Связанные разделы¶
outbox.md— транспорт под CQRS publisher'ом.idempotent-consumer.md— CQRS + retry без дубликатов.api-composition.md— когда CQRS не подходит и лучше синхронный HTTP-вызов.../conventions/events.md— raw-уровень публикации и consume, envelope, middleware stack.../conventions/testing.md— тестирование CQRS handler'ов черезgochannel.../architecture-overview.md— место CQRS в общей картине.../glossary.md— CQRS, EventBus, EventProcessor, Deduplicator.