Перейти к содержанию

CQRS и watermill/components/cqrs

Deep-dive по CQRS в контексте нашего стека. Reference-уровень про события и middleware — в ../conventions/events.md. Здесь — что такое CQRS в нашем конкретном смысле, когда включать, как заводить типизированные handler'ы поверх Watermill и что делать не надо.

Содержание

Что это

CQRS — Command Query Responsibility Segregation. Идея в том, чтобы разделить модель записи (command) и модель чтения (query): команды мутируют состояние, запросы не имеют побочных эффектов. В Go + Watermill это выливается в практический приём: вместо одного router'а с switch eventType — типизированные handler'ы и шины (bus), которые знают про конкретные типы команд и событий.

CQRS — не «шина сообщений». Не «event-driven». Не «микросервисы». Это способ организовать код внутри одного сервиса так, чтобы мутации и чтения не мешали друг другу.

Архитектура

Общая картина: publisher-сайд отдаёт типизированные события через EventBus в outbox, forwarder раскладывает их в Kafka; consumer-сайд прогоняет Kafka-сообщение через middleware-стек, EventProcessor делает unmarshal в Go-struct и вызывает типизированный handler.

flowchart LR
    subgraph Publisher["Publisher side"]
        SVC[Service Layer]
        EB[EventBus]
        SVC -->|"Publish(ctx, *ReviewCreatedV1)"| EB
        EB -->|JSON marshal + envelope| K1[Kafka topic: kazmaps.review.created]
    end
    subgraph Consumer["Consumer side"]
        K2[Kafka topic: kazmaps.review.created]
        ROUTER[Router + middleware]
        EP[EventProcessor]
        H[OnReviewCreated]
        SVC2[Service Layer]
        K2 --> ROUTER
        ROUTER -->|"CorrelationID → Recoverer → PoisonQueue → Retry → Deduplicator"| EP
        EP -->|"unmarshal → typed *ReviewCreatedV1"| H
        H --> SVC2
    end

Детализация по компонентам — §Event Bus, §Event Processor, §Middleware stack ниже.

Формы CQRS — не путай варианты

Под «CQRS» в разных командах понимают очень разные вещи. Разведи их в голове, чтобы не спорить о несовместимых штуках.

Lite CQRS (то, что делаем мы)

  • Модели чтения и записи логически разделены в коде одного сервиса.
  • Одна и та же БД для записи и чтения (обычно).
  • Command handler'ы и query handler'ы — отдельные типы, не смешиваются.
  • Типизированные шины через watermill/components/cqrs.

Это то, что имеется в виду, когда handbook говорит «CQRS». В архитектурном обзоре это отмечено как «CQRS — используем частично» (см. ../architecture-overview.md).

Full CQRS

  • Отдельные БД / отдельные схемы для записи и чтения.
  • Синхронизация между ними — через события: command handler пишет в write-модель и публикует событие, projection-handler читает и обновляет read-модель.
  • Read-модель оптимизирована под конкретный запрос (денормализация, precomputed aggregate, materialized view).

Не используем. Оправдан, когда read нагрузка в десятки раз выше write, или когда read-запросы требуют совершенно другой shape. У нас такого кейса пока нет.

Event Sourcing + CQRS

  • Write-модель = append-only журнал событий.
  • Read-модель = projection, пересчитываемая из журнала.
  • Восстановление состояния — прокрутить все события с начала.

Не используем, не планируем. Event Sourcing как store-of-truth — явно отказались, см. architecture-overview.md §«Что мы ЯВНО не делаем».

Когда lite-CQRS оправдан

Сигналы, что сервис дорос до CQRS:

  • Handler count per сервис > 3. В inline-switch'е уже штук 5 case "review.created": / case "review.updated": / ..., и каждый case — своя логика распаковки.
  • Разные shapes у request/response. Create принимает одно, Read возвращает DTO совсем другой формы, внутри одного handler'а два неудобно совместимых struct'а.
  • Тесты растут. Unit-тест на один case требует моков для всего сервиса, потому что switch-handler знает про всё сразу.
  • Duplicate unmarshal-кода. Каждый case делает json.Unmarshal в свой struct, после 3-го раза это копипаста.

Если всё это узнал в своём сервисе — включай CQRS. Пример перевода — §Migration ниже.

Когда lite-CQRS не нужен

  • CRUD-сервис с 1-2 endpoint'ами. Generalization-оверхед не окупится.
  • Сервис без Kafka-консьюмера. CQRS — про команды и события, если у тебя только HTTP endpoint'ы, тебе нужен просто service-слой, без шин.
  • Команды от одного типа. Один event handler на один топик — это обычный AddNoPublisherHandler, а не CQRS (см. ../conventions/events.md).

Event Bus

EventBus — типизированный publisher. Вместо того чтобы вручную строить *message.Message с envelope-metadata и дёргать publisher. Publish(topic, msg), ты просто передаёшь struct события и EventBus сам делает маршалинг, выбор топика и заполнение metadata.

Setup

import "github.com/ThreeDotsLabs/watermill/components/cqrs"

marshaler := cqrs.JSONMarshaler{
    GenerateName: cqrs.StructName, // имя события = имя типа struct
}

eventBus, err := cqrs.NewEventBusWithConfig(
    outboxPublisher, // наш watermill-sql publisher (см. outbox.md)
    cqrs.EventBusConfig{
        GeneratePublishTopic: func(params cqrs.GenerateEventPublishTopicParams) (string, error) {
            // Все события пишем в логический канал "outbox" —
            // forwarder развернёт их в правильные Kafka topic'и.
            return "outbox", nil
        },
        OnPublish: func(params cqrs.OnEventSendParams) error {
            // Заполняем envelope через наш eventmeta helper.
            return eventmeta.Enrich(params.Message, eventmeta.Envelope{
                EventType:     eventTypeFromStructName(params.EventName),
                SchemaVersion: schemaVersion(params.Event),
                Source:        "review",
            })
        },
        Marshaler: marshaler,
        Logger:    watermillLogger,
    },
)

Комментарии:

  • outboxPublisher — это тот же watermill-sql publisher, что в outbox.md. CQRS поверх outbox'а: eventBus.Publish внутри InTx = запись в outbox-таблицу.
  • GeneratePublishTopic возвращает "outbox" — логическое имя forwarder'а, не Kafka-topic. Kafka-topic подставляется forwarder'ом позже (см. outbox.md §Forwarder).
  • OnPublish — hook, через который мы заполняем envelope. Это важно: без envelope consumer отклонит сообщение.
  • MarshalerJSONMarshaler{GenerateName: cqrs.StructName}. Имя события у CQRS = имя Go-типа, например ReviewCreatedV1.
  • eventTypeFromStructName("ReviewCreatedV1")"review.created" — конвенция именования в нашем стиле. Маппинг простой: lowercase + разделение по кейсу + drop V1-суффикса. Вариант без маппинга — переименовать struct'ы в review.createdV1, но это не Go.

Usage

func (s *ReviewService) Create(ctx context.Context, cmd CreateReviewCommand) (*domain.Review, error) {
    var r *domain.Review
    err := s.db.InTx(ctx, func(tx pgx.Tx) error {
        saved, err := s.reviews.CreateTx(ctx, tx, cmd)
        if err != nil {
            return err
        }
        r = saved

        return s.eventBus.Publish(ctx, &events.ReviewCreatedV1{
            ReviewID: saved.ID,
            PlaceID:  saved.PlaceID,
            UserID:   saved.UserID,
            Rating:   saved.Rating,
        })
    })
    if err != nil {
        return nil, fmt.Errorf("create review: %w", err)
    }
    return r, nil
}

Сравни с raw-вариантом (см. outbox.md §Publisher side): CQRS-версия короче. eventBus.Publish внутри делает:

  1. Marshaler.Marshal(event)*message.Message с payload в JSON и Metadata["name"] = "ReviewCreatedV1".
  2. OnPublish(...) → обогащение envelope через eventmeta.Enrich.
  3. Publisher.Publish("outbox", msg) через watermill-sql — строка в outbox-таблице.

Event Processor

EventProcessor — типизированный consumer. Он знает, какой Go-тип соответствует какому топику, делает unmarshal за тебя, вызывает твой handler с типизированным аргументом.

Setup

eventProcessor, err := cqrs.NewEventProcessorWithConfig(
    router,
    cqrs.EventProcessorConfig{
        GenerateSubscribeTopic: func(params cqrs.GenerateEventHandlerSubscribeTopicParams) (string, error) {
            // "ReviewCreatedV1" -> "kazmaps.review.review.created"
            return kafkaTopicFor(params.EventName), nil
        },
        SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
            return kafka.NewSubscriber(kafka.SubscriberConfig{
                Brokers:       cfg.Kafka.Brokers,
                ConsumerGroup: "notification-" + params.HandlerName,
                Unmarshaler:   kafka.DefaultMarshaler{},
            }, watermillLogger)
        },
        Marshaler: cqrs.JSONMarshaler{GenerateName: cqrs.StructName},
        Logger:    watermillLogger,
    },
)

Важные моменты:

  • SubscriberConstructor вызывается на каждый handler — у каждого будет свой consumer group. Это защищает от «один handler падает, offset другого едет» и даёт независимый rebalance per handler.
  • ConsumerGroup надо делать per (service, handler), не per-service. Иначе две копии одного consumer-group'а разделят партиции и каждый handler получит только половину сообщений.

Handler registration

h := NewNotificationHandler(notifSvc)
err = eventProcessor.AddHandlers(
    cqrs.NewEventHandler("OnReviewCreated",   h.OnReviewCreated),
    cqrs.NewEventHandler("OnUserBanned",      h.OnUserBanned),
    cqrs.NewEventHandler("OnPhotoUploaded",   h.OnPhotoUploaded),
)

Имя handler'а — уникальное в рамках processor'а. Используется в ConsumerGroup и в логах. Формат — On<EventName>.

Типизированный handler

type NotificationHandler struct {
    svc *service.NotificationService
}

func (h *NotificationHandler) OnReviewCreated(ctx context.Context, evt *events.ReviewCreatedV1) error {
    return h.svc.HandleReviewCreated(ctx, evt.UserID, evt.PlaceID, evt.ReviewID)
}

func (h *NotificationHandler) OnUserBanned(ctx context.Context, evt *events.UserBannedV1) error {
    return h.svc.HandleUserBanned(ctx, evt.UserID, evt.Reason)
}

Сравни с raw-handler'ом (см. ../conventions/events.md §Handler): тут нет ручного json.Unmarshal, нет ручной проверки Event-Type, нет каста msg.Metadata.Get(...). CQRS всё это делает сам.

Правила handler'а те же, что для raw:

  • Возврат nil — router коммитит offset.
  • Возврат error — Retry сработает (через router middleware), потом PoisonQueue отправит в DLQ.
  • Не делай msg.Ack() / msg.Nack() — это не твоё дело.
  • Handler должен быть идемпотентным. CQRS в этом не помогает: он решает только типизацию, не дедупликацию. См. idempotent-consumer.md.

Command Bus

Для in-process команд с request/response внутри сервиса. Используется реже — в основном для декомпозиции сложных service-слоёв.

commandBus, err := cqrs.NewCommandBusWithConfig(
    inProcessPublisher, // gochannel pub-sub для in-process команд
    cqrs.CommandBusConfig{
        GeneratePublishTopic: func(params cqrs.GenerateCommandPublishTopicParams) (string, error) {
            return "commands." + params.CommandName, nil
        },
        Marshaler: cqrs.JSONMarshaler{GenerateName: cqrs.StructName},
        Logger:    watermillLogger,
    },
)

// Послать команду
err = commandBus.Send(ctx, &CreateReviewCommand{UserID: 42, PlaceID: 7, Rating: 5})

Когда нужен:

  • Сервис делит бизнес-операцию на этапы, каждый этап — независимый handler (pipeline: ValidateReviewStoreReviewIndexReview).
  • Тестируемая композиция этапов — легче ставить fake handler в CommandBus, чем мокать сам service.

Когда не нужен:

  • Обычный линейный service-метод. Простой вызов функции короче и понятнее.
  • Cross-service: никогда. Command — это внутри одного сервиса. Cross-service — через события.

Middleware stack

Важный момент: CQRS не заменяет message.Router, он встраивается поверх него. Router-middleware применяется к CQRS handler'ам автоматически.

Стандартный stack (см. ../conventions/events.md):

router, err := message.NewRouter(message.RouterConfig{}, watermillLogger)

router.AddMiddleware(
    middleware.CorrelationID,
    middleware.Recoverer,
    poisonQueue(kafkaPub, dlqTopic),
    middleware.Retry{
        MaxRetries:      5,
        InitialInterval: 500 * time.Millisecond,
        Multiplier:      2.0,
        MaxInterval:     30 * time.Second,
        Logger:          watermillLogger,
    }.Middleware,
    deduplicator(redisClient),
)

// EventProcessor регистрирует handler'ы в этом же router'е —
// middleware применяется прозрачно.
eventProcessor, _ := cqrs.NewEventProcessorWithConfig(router, cqrs.EventProcessorConfig{...})
eventProcessor.AddHandlers(...)

go func() { _ = router.Run(ctx) }()

Порядок middleware важен (объяснение — в ../conventions/events.md). CQRS ничего не меняет в этом порядке. Не добавляй per-handler middleware руками — единый stack на router'е достаточно.

Marshaler и имена событий

Default JSONMarshaler{GenerateName: cqrs.StructName} даёт:

type ReviewCreatedV1 struct { ... }
→ event name = "ReviewCreatedV1"

Но у нас в конвенции topic'ов — kazmaps.review.review.created, а в Event-Typereview.created. Как связать?

Вариант 1: маппинг в GeneratePublishTopic/GenerateSubscribeTopic.

func kafkaTopicFor(eventName string) string {
    // "ReviewCreatedV1" -> "review.created"
    eventType := eventTypeFromStructName(eventName)
    // "review.created" -> "kazmaps.review.review.created"
    service := strings.Split(eventType, ".")[0]
    return "kazmaps." + service + "." + eventType
}

func eventTypeFromStructName(name string) string {
    // Убираем версионный суффикс (V1/V2), splitcamel, lowercase.
    stripped := strings.TrimSuffix(name, "V1")
    stripped = strings.TrimSuffix(stripped, "V2")
    // "ReviewCreated" -> "review.created"
    return camelToDot(stripped)
}

И аналогичный маппинг в OnPublish для заполнения Metadata["Event-Type"]:

OnPublish: func(params cqrs.OnEventSendParams) error {
    params.Message.Metadata.Set("Event-Type", eventTypeFromStructName(params.EventName))
    return eventmeta.Enrich(params.Message, eventmeta.Envelope{...})
},

Вариант 2: собственный Marshaler.GenerateName. Пишешь функцию, которая возвращает уже готовый kebab-case/dot-case. Но тогда внутри handler'ов cqrs.NewEventHandler надо будет резолвить имя туда же. Дольше, не выигрывает читаемость.

Мы используем вариант 1 — struct-имена как в Go (ReviewCreatedV1), а маппинг — в CQRS-конфиге. Helper eventTypeFromStructName живёт в pkg/eventmeta/ сервис-репо.

Testing

Unit: gochannel как pub/sub

func TestOnReviewCreated_SendsPush(t *testing.T) {
    pubsub := gochannel.NewGoChannel(gochannel.Config{}, nil)
    defer pubsub.Close()

    router, _ := message.NewRouter(message.RouterConfig{}, nil)

    processor, _ := cqrs.NewEventProcessorWithConfig(router, cqrs.EventProcessorConfig{
        GenerateSubscribeTopic: func(p cqrs.GenerateEventHandlerSubscribeTopicParams) (string, error) {
            return "test.topic", nil
        },
        SubscriberConstructor: func(p cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
            return pubsub, nil
        },
        Marshaler: cqrs.JSONMarshaler{GenerateName: cqrs.StructName},
    })

    fakeSvc := &fakeNotificationService{}
    h := NewNotificationHandler(fakeSvc)
    _ = processor.AddHandlers(cqrs.NewEventHandler("OnReviewCreated", h.OnReviewCreated))

    bus, _ := cqrs.NewEventBusWithConfig(pubsub, cqrs.EventBusConfig{
        GeneratePublishTopic: func(p cqrs.GenerateEventPublishTopicParams) (string, error) {
            return "test.topic", nil
        },
        Marshaler: cqrs.JSONMarshaler{GenerateName: cqrs.StructName},
    })

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    go func() { _ = router.Run(ctx) }()
    <-router.Running()

    _ = bus.Publish(ctx, &events.ReviewCreatedV1{ReviewID: 42, UserID: 1, PlaceID: 7, Rating: 5})

    eventually(t, 2*time.Second, func() bool { return fakeSvc.handled == 1 })
}

gochannel подставляется и как publisher, и как subscriber — одна и та же in-memory шина. Marshaler и CQRS поверх неё работают абсолютно так же, как в проде.

Integration

Testcontainers Kafka + реальный watermill-kafka. CQRS поверх них — тот же код, что в проде, меняется только endpoint брокера. Полная схема — outbox.md §Testing и ../conventions/testing.md.

Anti-patterns

CQRS на CRUD-сервисе с одним событием

Один review.created → один handler в notification. Притянуть сюда CQRS с EventProcessor, EventBus, маппингом имён — over-engineering. Простой router.AddNoPublisherHandler(...) в 3 строчки — читается лучше.

Критерий: если в сервисе меньше 3 типизированных handler'ов на одну доменную модель, не вводи CQRS. Дождись роста.

CQRS без Deduplicator

CQRS делает код чище, но ничего не говорит про idempotency. Если подключил CQRS и не подключил Deduplicator middleware — под retry получишь дубликат обработки. Deduplicator подключается к router'у до CQRS-wiring'а, работает прозрачно для CQRS handler'ов. Детали — idempotent-consumer.md.

Command между сервисами

// Плохо: посылаем command другому сервису через Kafka
userServiceCommandBus.Send(ctx, &SendWelcomeEmailCommand{...})

Command — это in-process артефакт. Между сервисами — события (факты в past tense), не команды (императив). Command implies sync request/response или prerequisites, это плохо ложится на асинхронную шину.

Если надо «попросить» другой сервис что-то сделать:

  • Синхронно → HTTP internal endpoint (см. api-composition.md).
  • Асинхронно → опубликуй событие-факт (user.registered), и notification-сервис сам решит, нужно ли что-то делать.

Состояние в handler-struct

type NotificationHandler struct {
    svc   *NotificationService
    seen  map[int64]bool // ПЛОХО
    mu    sync.Mutex
}

Handler-struct должен быть stateless. Состояние — в service/repository слоях. Причин несколько:

  • Router может вызывать handler конкурентно — нужен sync, что дороже, чем transaction в БД.
  • Состояние в памяти теряется при рестарте pod'а.
  • Разные реплики не видят состояние друг друга.

Если нужен shared state — Redis или БД. Если нужен dedup — см. idempotent-consumer.md.

Один EventBus на несколько сервисов-реплик

Нельзя два разных сервиса использовать один и тот же ConsumerGroup в Kafka — партиции разделятся, каждый сервис увидит только часть сообщений. ConsumerGroup = <service>-<handler>, не handler без префикса.

Raw и CQRS в одном router'е

Включил CQRS → перевели все handler'ы сервиса, не держи параллельно raw AddNoPublisherHandler для того же топика. Двойной consumer на один event = два раза обработано. Cleanly switch, без «смешанных периодов». Dual-publish в пределах publisher'а допустим в миграции (см. outbox.md §Migration), dual-consume — нет.

Миграция от switch-driven handler'ов к CQRS

План, когда сервис созрел до CQRS (3+ handler'а, рост event-типов):

  1. Определи Go-типы событий. Если ещё нет — создай internal/events/types.go со struct'ами ReviewCreatedV1, UserBannedV1 и т.п. JSON-теги в snake_case, совпадают с тем, что сейчас идёт в payload.
  2. Настрой Marshaler + маппинг имён. cqrs.JSONMarshaler{ GenerateName: cqrs.StructName} + функция eventTypeFromStructName + kafkaTopicFor.
  3. Заведи EventProcessor. Подключи к существующему router'у. Middleware stack остаётся прежним — CQRS его наследует.
  4. Перепиши handler'ы в типизированный вид. Каждый case в switch превращается в метод OnReviewCreated(ctx, *ReviewCreatedV1).
  5. Зарегистрируй handler'ы через AddHandlers. Старые AddNoPublisherHandler удали.
  6. Заведи EventBus на publisher-сайде. Замени outboxPublisher. PublishTx(ctx, tx, topic, msg) на eventBus.Publish(ctx, &ReviewCreatedV1{...}). Внутри InTx — то же место, та же транзакция.
  7. Тесты. Unit-тесты перепиши через типизированные handler'ы и gochannel (см. §Testing).

Сначала на publisher-стороне, потом на consumer-стороне. Не наоборот — консьюмеру всё равно, какой API у publisher'а.

FAQ

«CQRS == Event Sourcing?» Нет. Это разные идеи, часто объединяют в литературе, но ES требует журнала событий как store-of-truth, а CQRS — нет. У нас CQRS без ES.

«Нужен отдельный read-model?» В lite-CQRS — нет, одна БД. В full-CQRS — да, read-model обновляется через projection по событиям. Если обнаружил, что read-запросы мешают write-нагрузке в БД (измерил!), сначала пробуй индексы, read-реплики, кэш — и только если не хватает, full CQRS. Это серьёзный шаг.

«Saga поверх CQRS?» Возможно, но выходит за рамки этой страницы. Saga — координация cross-service последовательности с компенсирующими действиями. В ../architecture-overview.md отмечено как «планируется к подключению», пока cross-service операций с откатом нет.

«Можно ли наполовину CQRS — типизировать только часть handler'ов?» Можно в переходный период, но в итоговом состоянии — всё через CQRS или всё через raw. Смешанный режим тяжело поддерживать: разные шаблоны в одном сервисе, разные тесты, новичок теряется.

«EventBus vs CommandBus — когда что?» EventBus — для фактов (past tense, прошедшее), может быть cross-service через Kafka. CommandBus — для намерений (императив), всегда in-process, in-memory. Если сомневаешься — скорее всего нужен EventBus.

«CQRS знает про outbox?» Не напрямую. EventBus — это обёртка над Publisher. Если в Publisher передать watermill-sql outbox-publisher — все публикации пойдут через outbox. Если передать прямой kafka. Publisher — пойдут напрямую (dual-write, не делай так).

Связанные разделы