Перейти к содержимому

CQRS и Event Sourcing в production

Зачем знать на Middle 3: CQRS и ES — паттерны, которые звучат красиво на конференциях, но в проде их неправильное применение ведёт к проблемам: rebuild projections занимает дни, schema evolution ломает старые events, GDPR противоречит «immutable log». На уровне Senior: знаешь, когда CQRS оправдан, когда — overengineering, умеешь делать snapshots, версионировать events, обрабатывать right-to-be-forgotten через crypto-shredding, выбираешь между Kafka/EventStoreDB/PG-as-event-store.

  1. Концепция (CQRS + Event Sourcing)
  2. Глубже / production-практики
  3. Gotchas
  4. Real cases
  5. Вопросы (20)
  6. Practice
  7. Источники

Принцип: разделить модель данных для записи (command side) и для чтения (query side).

┌──────────────────┐
Commands → │ Write Model │ ──→ events ──→ ┌──────────────────┐
│ (normalized DB) │ │ Read Models │ ← Queries
└──────────────────┘ │ (denormalized, │
│ caching, etc) │
└──────────────────┘

Без CQRS: одна таблица orders, используется и для PUT/POST, и для GET. С CQRS: orders для writes; материализованные view (PG view, ClickHouse table, ElasticSearch index) для reads.

Преимущества:

  • Read model оптимизирован под query patterns (denormalized).
  • Independent scaling: writes на одном кластере, reads на 10 replicas.
  • Different storage tech: writes — PG (ACID), reads — ClickHouse (analytics) / ES (search).

Недостатки:

  • Eventual consistency между write и read.
  • Сложность: два места хранения, sync logic, versioning.

Принцип: вместо хранения текущего состояния, хранить последовательность событий, которые привели к состоянию.

Account 123:
Без ES:
state = { balance: 100 } ← мы знаем только сейчас
Event Sourcing:
events = [
{type: 'AccountCreated', balance: 0},
{type: 'Deposit', amount: 200},
{type: 'Withdraw', amount: 50},
{type: 'Deposit', amount: -50}, // correction
]
state = replay(events) → { balance: 100 }

Преимущества:

  • Audit trail by design: вся история действий.
  • Time travel: восстановить состояние на любую point in time.
  • Rebuild any view: новый read model? Replay events.
  • Domain insights: anal’итика по событиям.

Недостатки:

  • Storage: больше места.
  • Replay performance: для long-running aggregates медленно → нужны snapshots.
  • Schema evolution: старые events могут не соответствовать новой структуре.

Они часто идут вместе:

Commands → Write Side
↓ produce
Events ──→ Event Store (source of truth)
↓ project
Read Models (PG, ClickHouse, ES, cache)
↑ Queries

Можно использовать CQRS без ES: просто разделить command и query на разных DB, синхронизируя через CDC.

Можно использовать ES без CQRS: одна модель, но replay events для восстановления. Менее common.


type Command interface{}
type Query interface{}
type CommandHandler[C Command] interface {
Handle(ctx context.Context, cmd C) error
}
type QueryHandler[Q Query, R any] interface {
Handle(ctx context.Context, q Q) (R, error)
}
// Specific
type CreateOrderCommand struct {
UserID string
Items []Item
}
type CreateOrderHandler struct {
repo OrderRepository
eventBus EventBus
}
func (h *CreateOrderHandler) Handle(ctx context.Context, cmd CreateOrderCommand) error {
order := NewOrder(cmd.UserID, cmd.Items)
if err := h.repo.Save(ctx, order); err != nil {
return err
}
h.eventBus.Publish(ctx, OrderCreated{OrderID: order.ID, ...})
return nil
}

Mediator (e.g. wire для DI, кастомный router) направляет команды в правильный handler.

⚠️ Не overengineer. Для простых CRUD CQRS = бойлерплейт. Применяйте, когда правда есть divergence между write и read.

Use caseRead store
Точечный поиск по IDPostgreSQL view / projection
Full-text searchElasticSearch / OpenSearch
Real-time analyticsClickHouse / Druid
Aggregations on time seriesTimescaleDB / InfluxDB
Geo-spatialPostGIS / Redis Geo
RecommendationsVector DB (Pinecone, Weaviate, Qdrant)
Hot cacheRedis

Один write model → несколько read models, каждый под свой query pattern.

PostgreSQL с events table:

CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(64) NOT NULL,
event_type VARCHAR(128) NOT NULL,
event_version INT NOT NULL,
payload JSONB NOT NULL,
metadata JSONB,
occurred_at TIMESTAMPTZ DEFAULT now(),
UNIQUE (aggregate_id, event_version)
);
CREATE INDEX idx_events_aggregate ON events(aggregate_id, event_version);
CREATE INDEX idx_events_type_time ON events(event_type, occurred_at);

Pros: знакомый PG, ACID, easy debug. Cons: scaling writes за пределы single instance.

EventStoreDB: специализированная DB.

  • Native streams концепция.
  • Subscriptions (catch-up, persistent).
  • Snapshots, projections built-in.
  • Cons: ещё одна tech в стеке.

Kafka как event store:

  • Log-based, естественно immutable.
  • Topic per aggregate type или per stream.
  • Retention infinite (compacted).
  • Cons: hard to query by aggregate ID; обычно нужен side store.

AWS DynamoDB: stream + table. Partition key = aggregate ID, sort key = version.

В 2026 most common stack: PostgreSQL для writes + Kafka для events + ClickHouse/PG для read models.

type Account struct {
ID string
Balance int64
version int
}
func LoadAccount(ctx context.Context, store EventStore, id string) (*Account, error) {
events, err := store.LoadEvents(ctx, id)
if err != nil { return nil, err }
acc := &Account{ID: id}
for _, e := range events {
acc.apply(e)
}
return acc, nil
}
func (a *Account) apply(e Event) {
switch ev := e.(type) {
case AccountCreated:
a.Balance = 0
case Deposit:
a.Balance += ev.Amount
case Withdraw:
a.Balance -= ev.Amount
}
a.version++
}
func (a *Account) Deposit(amount int64) Event {
if amount <= 0 { panic("invalid") }
e := Deposit{Amount: amount}
a.apply(e)
return e
}

Каждая команда:

  1. Load events для aggregate.
  2. Replay → in-memory state.
  3. Apply business logic, produce new event.
  4. Save event (concurrent write check: version match).

Для long-living aggregates (тысячи events) replay медленный. Snapshot — это state at version N.

CREATE TABLE snapshots (
aggregate_id UUID PRIMARY KEY,
version INT NOT NULL,
state JSONB NOT NULL,
taken_at TIMESTAMPTZ DEFAULT now()
);

Load logic:

func LoadAccount(ctx, id) (*Account, error) {
snap, err := store.LoadSnapshot(ctx, id)
var acc *Account
var fromVersion int
if snap != nil {
acc = unmarshal(snap.State)
fromVersion = snap.Version + 1
} else {
acc = &Account{ID: id}
fromVersion = 0
}
events, _ := store.LoadEventsSince(ctx, id, fromVersion)
for _, e := range events {
acc.apply(e)
}
return acc, nil
}

Когда снимать snapshot:

  • Every N events (e.g. 100).
  • On schedule (background job).
  • After heavy event (e.g. BigBatchUpdated).

⚠️ Snapshot — не source of truth. Если он corrupted — replay from beginning recovers.

Projection = handler events, который пишет в read model.

type OrderListProjection struct {
db *sql.DB
}
func (p *OrderListProjection) Handle(ctx, e Event) error {
switch ev := e.(type) {
case OrderCreated:
_, err := p.db.ExecContext(ctx,
"INSERT INTO order_list (id, user_id, status, created_at) VALUES ($1, $2, $3, $4)",
ev.OrderID, ev.UserID, "pending", ev.OccurredAt)
return err
case OrderShipped:
_, err := p.db.ExecContext(ctx,
"UPDATE order_list SET status='shipped', shipped_at=$1 WHERE id=$2",
ev.OccurredAt, ev.OrderID)
return err
}
return nil
}

⚠️ Projection — idempotent (one event может обрабатываться несколько раз при retry).

⚠️ Projection tracks offset (last processed event ID). При restart resume from there.

Сценарий: новая projection или баг в старой → нужен rebuild from scratch.

[Stop projection]
[Truncate read table]
[Reset offset to 0]
[Replay all events]
[Start serving from new read model]

⚠️ Время rebuild — bounded? Если у вас 1 млрд events, rebuild может занять часы или дни. Стратегии:

  • Parallel projection: партиционировать по aggregate_id.
  • Blue-green projection: новая projection строится рядом, после готовности — switch.
  • Bounded retention: не replay полностью, а только последние N дней (для analytics это часто ОК).

Изменили projection schema → нужен rebuild. Хранить версию:

CREATE TABLE projection_meta (
name VARCHAR(64) PRIMARY KEY,
version INT NOT NULL,
offset BIGINT NOT NULL
);

При deploy projection v2:

  1. Создать новые read tables (e.g. order_list_v2).
  2. Replay events into v2.
  3. Switch readers from v1 to v2.
  4. Drop v1.

События живут вечно (в идеале). Через год вы хотите изменить event schema.

Approach 1: Upcasting (recommended):

type OrderCreatedV1 struct {
OrderID string
Total float64
}
type OrderCreatedV2 struct {
OrderID string
Total float64
Currency string // new field
}
func upcastOrderCreated(raw []byte, version int) (OrderCreatedV2, error) {
switch version {
case 1:
var v1 OrderCreatedV1
json.Unmarshal(raw, &v1)
return OrderCreatedV2{
OrderID: v1.OrderID, Total: v1.Total,
Currency: "USD", // default
}, nil
case 2:
var v2 OrderCreatedV2
json.Unmarshal(raw, &v2)
return v2, nil
}
}

Старые events хранятся в исходном формате; при чтении upcast в current schema.

Approach 2: Migrate events (rewrite in store):

UPDATE events
SET payload = jsonb_set(payload, '{currency}', '"USD"', true),
event_version = 2
WHERE event_type = 'OrderCreated' AND event_version = 1;

⚠️ ES purist скажут: never modify history. Реальность: иногда нужно.

Approach 3: Weak schema (Avro/Protobuf with default values):

Protobuf forward/backward compatibility automatically handles added fields.

«Right to be forgotten» противоречит «events are immutable forever».

Crypto-shredding:

Per-user encryption key in separate KMS.
All PII в events encrypted with this key.
User requests deletion → delete the key.
Events still exist, но decryption невозможна → effectively forgotten.
type PIIEvent struct {
UserID string
PIIBlob []byte // encrypted с user's KMS key
}
func encryptPII(userID string, pii Personal) []byte {
key := kms.GetUserKey(userID)
return aesGcmEncrypt(key, marshal(pii))
}

После удаления key из KMS — все исторические events этого user становятся unreadable.

Tombstone events:

UserDeletedEvent — projections handle: remove user's data.
But raw events остаются.

⚠️ Юридический аспект: проверяйте с lawyer, считается ли это «forgetting» в смысле GDPR. В EU чаще требуют physical deletion. Crypto-shredding — pragmatic compromise.

Bottleneck: aggregate-level concurrency.

Aggregate Order:abc
- process command 1 → check version, append event
- process command 2 → check version, конфликт если v1 не commited

В Postgres это SELECT FOR UPDATE или unique constraint на (aggregate_id, version).

Single aggregate ≤ N events/sec where N зависит от DB latency (~100–1000/sec).

Если нужно больше — partition aggregate (e.g., per-product inventory → per-warehouse-product).

Партиционирование по hash(aggregate_id) % N:

  • Каждая partition имеет свой event store / Kafka partition.
  • Aggregate всегда читается/пишется в один shard.
  • Cross-aggregate transactions impossible → saga.

Finance:

  • Banking: every account event = audit log mandatory.
  • Trading: order book is naturally event-sourced (place, modify, cancel, fill).

E-commerce:

  • Order lifecycle: created, paid, picked, shipped, delivered.
  • Inventory: receive, reserve, release, adjust.

Healthcare:

  • Patient records: every observation, prescription, lab result as event.
  • HIPAA requires audit log → ES gives this naturally.

Gaming:

  • Player actions, achievements, items.
  • Replay для game review / cheat detection.

Mostly homegrown (each company builds their own ES framework).

Available libraries:

  • github.com/modernice/goes — generic ES framework для Go.
  • github.com/EventStore/EventStore-Client-Go — official EventStoreDB client.
  • github.com/segmentio/kafka-go — Kafka client, can build ES on top.

⚠️ Не verging «ES framework» в Go нет industry standard. Большинство production projects писали свою тонкую обёртку над PG/Kafka.

  • Простой CRUD без сложной бизнес-логики.
  • Стартап в early stage — overengineering замедлит iterate.
  • Team не имеет experience — kompleksnost высокая.
  • Нет требования audit trail / time travel.

Когда использовать:

  • Highly regulated domains (finance, healthcare).
  • Сложные бизнес-процессы с долгой историей (insurance claims, supply chain).
  • Когда replay events для analytics / ML — value add.

⚠️ Eventual consistency между write и read. UI может показать «order created», а через 100 мс GET вернёт «not found». Решение: optimistic UI или wait-for-projection pattern.

⚠️ Snapshot becomes wrong после schema change. Need versioning snapshot too или drop all snapshots при schema upgrade.

⚠️ Projection lag в продакшене. Под нагрузкой 100K events/sec, projection processing 50K/sec — lag растёт. Monitor lag, alert если > X minutes.

⚠️ Reordering events. Если processing в параллель — events за aggregate могут прийти в wrong order. Solution: keyed processing (Kafka partition by aggregate_id).

⚠️ Idempotency vs ordering. Идемпотентный handler можно retry; но если processed event 1 после event 2 — состояние неконсистентно. Both нужны.

⚠️ Storage growth. 1 billion events × 1 KB = 1 ТБ. Plan retention или archival.

⚠️ Event size bloat. Don’t put full state in event. Только delta / what changed.

⚠️ Schema evolution breaks replay. Тестировать replay от oldest event каждый release.

⚠️ Distributed transactions desire — иногда tempting запустить «событие в PG + событие в Kafka» атомарно. Это outbox pattern (file 17).

⚠️ CQRS premature application. Маленький CRUD без real read/write divergence не нуждается в CQRS. Можно начать без, добавить когда нужно.

⚠️ Read model rebuild downtime. Не делайте blue-green правильно — простой может быть часами.

⚠️ GDPR crypto-shredding не universally accepted. Legal counsel required.

⚠️ Event versions naming clash. Use semver or explicit v1, v2 suffixes в event types: OrderCreated.v2.

⚠️ Concurrent writes to same aggregate. PG unique constraint or optimistic locking. Retry with backoff.


Контекст: traditional bank, migration to event-sourced ledger.

Setup:

  • Event store: PG с events table, partitioned by month.
  • Aggregates: accounts. Events: Credit, Debit, FeeApplied, InterestEarned.
  • Snapshots каждые 100 events.
  • Read models: balance (Redis), transactions list (PG view), monthly statements (S3).

Outcome:

  • Audit complete since day 1.
  • Regulator compliance: easy to prove «who changed what when».
  • Time travel: «what was balance on 2024-03-15?» — replay до того момента.

Контекст: support team часто запрашивает «who clicked refund button, when».

Without ES: order_status_log table, partial info.

With ES: events RefundRequested, RefundApproved, RefundProcessed. Full causality. Support открывает customer service tool, видит full timeline.

Контекст: новая feature — search по orders. Создали ElasticSearch projection.

Проблема: rebuild from 5-year event store — 3 дня в single-thread.

Solution:

  • Partition rebuild по hash(order_id) % 10.
  • 10 parallel workers.
  • Rebuild = 7 hours.

Симптом: new deploy, replay broken at events from 2 years ago.

Анализ: dev добавил поле string в event payload без default. Старые events не имеют поля → panic on unmarshal.

Fix:

  • Made поле *string (pointer = nullable).
  • Upcaster fills default for nil.
  • Added test: replay all events from oldest period в CI.

Контекст: user requested deletion. Legal: «всё удалить».

Implementation: KMS key для user removed. Events become unreadable.

Issue (2 года спустя): ML team захотела использовать historical events для training. 5% events un-decryptable. Был кризис: «у нас data loss».

Resolution: aggregated ML features computed BEFORE deletion. После deletion — только anonymized aggregates. Legal был ОК.


  1. CQRS: что разделяется и почему?
  2. Event Sourcing: чем отличается от хранения state?
  3. CQRS + ES синергия — какие преимущества?
  4. CQRS без ES возможен? Привести пример.
  5. Mediator pattern в CQRS — зачем?
  6. Какие read storage tech выбрать для full-text search vs analytics vs точечный поиск?
  7. Сравнить event store: PG, EventStoreDB, Kafka.
  8. Aggregate replay: алгоритм.
  9. Snapshot: когда снимать, как версионировать.
  10. Projections rebuild стратегии (parallel, blue-green).
  11. Event versioning через upcasting — пример.
  12. Crypto-shredding для GDPR: как работает.
  13. Tombstone events vs deletion: что выбрать?
  14. Write throughput limit: что bottleneck в aggregate?
  15. Distributed ES: sharding по aggregate_id.
  16. Reordering events в kafka — как избежать?
  17. Идемпотентность projection handler — почему критично?
  18. Когда CQRS overengineering?
  19. Когда ES не подходит?
  20. Опишите кейс, где ES спас аудит или ML team.

Задача 1: Реализовать простой ES store на Postgres: events table, append, load by aggregate_id.

Задача 2: Aggregate Account с methods Deposit, Withdraw. Persist events, replay.

Задача 3: Добавить snapshot mechanism — каждые 10 events snapshot.

Задача 4: Реализовать projection BalanceView в Redis. Subscribe to events.

Задача 5: Schema evolution: add field currency в event. Реализовать upcaster v1 → v2.

Задача 6: Rebuild projection from scratch — script для production-like load.

Задача 7: Реализовать crypto-shredding: user-specific encryption key, encrypted PII.

Задача 8 (advanced): Distributed ES с partitioning по hash(aggregate_id). Kafka partitions, projection per partition.

Задача 9: Поднять EventStoreDB через docker-compose, написать basic Go client.


  1. Greg Young, “CQRS Documents”, архив, https://cqrs.files.wordpress.com/2010/11/cqrs_documents.pdf
  2. Greg Young, “Event Sourcing” talks (multiple), 2010–2018.
  3. Martin Fowler, “Event Sourcing”, https://martinfowler.com/eaaDev/EventSourcing.html
  4. Vaughn Vernon, “Implementing Domain-Driven Design”, Addison-Wesley, 2013.
  5. Eric Evans, “Domain-Driven Design”, Addison-Wesley, 2003.
  6. Chris Richardson, “Microservices Patterns”, 2018.
  7. EventStoreDB Documentation, https://developers.eventstore.com/
  8. Adam Dymitruk, “Event Sourcing for the masses”, talks.
  9. Confluent Blog, “Event Sourcing with Kafka”, 2018.
  10. ModernAuth/goes — github.com/modernice/goes
  11. Bartosz Sypytkowski, “GDPR and Event Sourcing”, 2018.
  12. PostgreSQL documentation: jsonb, advisory locks for aggregate concurrency.
  13. ClickHouse для projections, materialized views docs.
  14. Pat Helland, “Immutability Changes Everything”, CIDR 2015.
  15. Mike Amundsen, “RESTful Web APIs”, chapters on hypermedia + state.