CQRS и Event Sourcing в production
Зачем знать на Middle 3: CQRS и ES — паттерны, которые звучат красиво на конференциях, но в проде их неправильное применение ведёт к проблемам: rebuild projections занимает дни, schema evolution ломает старые events, GDPR противоречит «immutable log». На уровне Senior: знаешь, когда CQRS оправдан, когда — overengineering, умеешь делать snapshots, версионировать events, обрабатывать right-to-be-forgotten через crypto-shredding, выбираешь между Kafka/EventStoreDB/PG-as-event-store.
Содержание
Заголовок раздела «Содержание»- Концепция (CQRS + Event Sourcing)
- Глубже / production-практики
- Gotchas
- Real cases
- Вопросы (20)
- Practice
- Источники
1. Концепция
Заголовок раздела «1. Концепция»1.1 CQRS: Command Query Responsibility Segregation
Заголовок раздела «1.1 CQRS: Command Query Responsibility Segregation»Принцип: разделить модель данных для записи (command side) и для чтения (query side).
┌──────────────────┐ Commands → │ Write Model │ ──→ events ──→ ┌──────────────────┐ │ (normalized DB) │ │ Read Models │ ← Queries └──────────────────┘ │ (denormalized, │ │ caching, etc) │ └──────────────────┘Без CQRS: одна таблица orders, используется и для PUT/POST, и для GET.
С CQRS: orders для writes; материализованные view (PG view, ClickHouse table, ElasticSearch index) для reads.
Преимущества:
- Read model оптимизирован под query patterns (denormalized).
- Independent scaling: writes на одном кластере, reads на 10 replicas.
- Different storage tech: writes — PG (ACID), reads — ClickHouse (analytics) / ES (search).
Недостатки:
- Eventual consistency между write и read.
- Сложность: два места хранения, sync logic, versioning.
1.2 Event Sourcing (ES)
Заголовок раздела «1.2 Event Sourcing (ES)»Принцип: вместо хранения текущего состояния, хранить последовательность событий, которые привели к состоянию.
Account 123:
Без ES: state = { balance: 100 } ← мы знаем только сейчас
Event Sourcing: events = [ {type: 'AccountCreated', balance: 0}, {type: 'Deposit', amount: 200}, {type: 'Withdraw', amount: 50}, {type: 'Deposit', amount: -50}, // correction ] state = replay(events) → { balance: 100 }Преимущества:
- Audit trail by design: вся история действий.
- Time travel: восстановить состояние на любую point in time.
- Rebuild any view: новый read model? Replay events.
- Domain insights: anal’итика по событиям.
Недостатки:
- Storage: больше места.
- Replay performance: для long-running aggregates медленно → нужны snapshots.
- Schema evolution: старые events могут не соответствовать новой структуре.
1.3 CQRS + ES синергия
Заголовок раздела «1.3 CQRS + ES синергия»Они часто идут вместе:
Commands → Write Side ↓ produce Events ──→ Event Store (source of truth) ↓ project Read Models (PG, ClickHouse, ES, cache) ↑ QueriesМожно использовать CQRS без ES: просто разделить command и query на разных DB, синхронизируя через CDC.
Можно использовать ES без CQRS: одна модель, но replay events для восстановления. Менее common.
2. Глубже / production-практики
Заголовок раздела «2. Глубже / production-практики»2.1 Command/Query handlers (mediator pattern)
Заголовок раздела «2.1 Command/Query handlers (mediator pattern)»type Command interface{}type Query interface{}
type CommandHandler[C Command] interface { Handle(ctx context.Context, cmd C) error}
type QueryHandler[Q Query, R any] interface { Handle(ctx context.Context, q Q) (R, error)}
// Specifictype CreateOrderCommand struct { UserID string Items []Item}
type CreateOrderHandler struct { repo OrderRepository eventBus EventBus}
func (h *CreateOrderHandler) Handle(ctx context.Context, cmd CreateOrderCommand) error { order := NewOrder(cmd.UserID, cmd.Items) if err := h.repo.Save(ctx, order); err != nil { return err } h.eventBus.Publish(ctx, OrderCreated{OrderID: order.ID, ...}) return nil}Mediator (e.g. wire для DI, кастомный router) направляет команды в правильный handler.
⚠️ Не overengineer. Для простых CRUD CQRS = бойлерплейт. Применяйте, когда правда есть divergence между write и read.
2.2 Read model: разные хранилища
Заголовок раздела «2.2 Read model: разные хранилища»| Use case | Read store |
|---|---|
| Точечный поиск по ID | PostgreSQL view / projection |
| Full-text search | ElasticSearch / OpenSearch |
| Real-time analytics | ClickHouse / Druid |
| Aggregations on time series | TimescaleDB / InfluxDB |
| Geo-spatial | PostGIS / Redis Geo |
| Recommendations | Vector DB (Pinecone, Weaviate, Qdrant) |
| Hot cache | Redis |
Один write model → несколько read models, каждый под свой query pattern.
2.3 Event Store options
Заголовок раздела «2.3 Event Store options»PostgreSQL с events table:
CREATE TABLE events ( id BIGSERIAL PRIMARY KEY, aggregate_id UUID NOT NULL, aggregate_type VARCHAR(64) NOT NULL, event_type VARCHAR(128) NOT NULL, event_version INT NOT NULL, payload JSONB NOT NULL, metadata JSONB, occurred_at TIMESTAMPTZ DEFAULT now(),
UNIQUE (aggregate_id, event_version));
CREATE INDEX idx_events_aggregate ON events(aggregate_id, event_version);CREATE INDEX idx_events_type_time ON events(event_type, occurred_at);Pros: знакомый PG, ACID, easy debug. Cons: scaling writes за пределы single instance.
EventStoreDB: специализированная DB.
- Native streams концепция.
- Subscriptions (catch-up, persistent).
- Snapshots, projections built-in.
- Cons: ещё одна tech в стеке.
Kafka как event store:
- Log-based, естественно immutable.
- Topic per aggregate type или per stream.
- Retention infinite (compacted).
- Cons: hard to query by aggregate ID; обычно нужен side store.
AWS DynamoDB: stream + table. Partition key = aggregate ID, sort key = version.
В 2026 most common stack: PostgreSQL для writes + Kafka для events + ClickHouse/PG для read models.
2.4 Aggregate replay
Заголовок раздела «2.4 Aggregate replay»type Account struct { ID string Balance int64 version int}
func LoadAccount(ctx context.Context, store EventStore, id string) (*Account, error) { events, err := store.LoadEvents(ctx, id) if err != nil { return nil, err }
acc := &Account{ID: id} for _, e := range events { acc.apply(e) } return acc, nil}
func (a *Account) apply(e Event) { switch ev := e.(type) { case AccountCreated: a.Balance = 0 case Deposit: a.Balance += ev.Amount case Withdraw: a.Balance -= ev.Amount } a.version++}
func (a *Account) Deposit(amount int64) Event { if amount <= 0 { panic("invalid") } e := Deposit{Amount: amount} a.apply(e) return e}Каждая команда:
- Load events для aggregate.
- Replay → in-memory state.
- Apply business logic, produce new event.
- Save event (concurrent write check: version match).
2.5 Snapshots
Заголовок раздела «2.5 Snapshots»Для long-living aggregates (тысячи events) replay медленный. Snapshot — это state at version N.
CREATE TABLE snapshots ( aggregate_id UUID PRIMARY KEY, version INT NOT NULL, state JSONB NOT NULL, taken_at TIMESTAMPTZ DEFAULT now());Load logic:
func LoadAccount(ctx, id) (*Account, error) { snap, err := store.LoadSnapshot(ctx, id) var acc *Account var fromVersion int if snap != nil { acc = unmarshal(snap.State) fromVersion = snap.Version + 1 } else { acc = &Account{ID: id} fromVersion = 0 }
events, _ := store.LoadEventsSince(ctx, id, fromVersion) for _, e := range events { acc.apply(e) } return acc, nil}Когда снимать snapshot:
- Every N events (e.g. 100).
- On schedule (background job).
- After heavy event (e.g.
BigBatchUpdated).
⚠️ Snapshot — не source of truth. Если он corrupted — replay from beginning recovers.
2.6 Projections (read models)
Заголовок раздела «2.6 Projections (read models)»Projection = handler events, который пишет в read model.
type OrderListProjection struct { db *sql.DB}
func (p *OrderListProjection) Handle(ctx, e Event) error { switch ev := e.(type) { case OrderCreated: _, err := p.db.ExecContext(ctx, "INSERT INTO order_list (id, user_id, status, created_at) VALUES ($1, $2, $3, $4)", ev.OrderID, ev.UserID, "pending", ev.OccurredAt) return err case OrderShipped: _, err := p.db.ExecContext(ctx, "UPDATE order_list SET status='shipped', shipped_at=$1 WHERE id=$2", ev.OccurredAt, ev.OrderID) return err } return nil}⚠️ Projection — idempotent (one event может обрабатываться несколько раз при retry).
⚠️ Projection tracks offset (last processed event ID). При restart resume from there.
2.7 Rebuild projections
Заголовок раздела «2.7 Rebuild projections»Сценарий: новая projection или баг в старой → нужен rebuild from scratch.
[Stop projection][Truncate read table][Reset offset to 0][Replay all events][Start serving from new read model]⚠️ Время rebuild — bounded? Если у вас 1 млрд events, rebuild может занять часы или дни. Стратегии:
- Parallel projection: партиционировать по aggregate_id.
- Blue-green projection: новая projection строится рядом, после готовности — switch.
- Bounded retention: не replay полностью, а только последние N дней (для analytics это часто ОК).
2.8 Versioning projections
Заголовок раздела «2.8 Versioning projections»Изменили projection schema → нужен rebuild. Хранить версию:
CREATE TABLE projection_meta ( name VARCHAR(64) PRIMARY KEY, version INT NOT NULL, offset BIGINT NOT NULL);При deploy projection v2:
- Создать новые read tables (e.g.
order_list_v2). - Replay events into v2.
- Switch readers from v1 to v2.
- Drop v1.
2.9 Event versioning (schema evolution)
Заголовок раздела «2.9 Event versioning (schema evolution)»События живут вечно (в идеале). Через год вы хотите изменить event schema.
Approach 1: Upcasting (recommended):
type OrderCreatedV1 struct { OrderID string Total float64}
type OrderCreatedV2 struct { OrderID string Total float64 Currency string // new field}
func upcastOrderCreated(raw []byte, version int) (OrderCreatedV2, error) { switch version { case 1: var v1 OrderCreatedV1 json.Unmarshal(raw, &v1) return OrderCreatedV2{ OrderID: v1.OrderID, Total: v1.Total, Currency: "USD", // default }, nil case 2: var v2 OrderCreatedV2 json.Unmarshal(raw, &v2) return v2, nil }}Старые events хранятся в исходном формате; при чтении upcast в current schema.
Approach 2: Migrate events (rewrite in store):
UPDATE eventsSET payload = jsonb_set(payload, '{currency}', '"USD"', true), event_version = 2WHERE event_type = 'OrderCreated' AND event_version = 1;⚠️ ES purist скажут: never modify history. Реальность: иногда нужно.
Approach 3: Weak schema (Avro/Protobuf with default values):
Protobuf forward/backward compatibility automatically handles added fields.
2.10 GDPR + ES
Заголовок раздела «2.10 GDPR + ES»«Right to be forgotten» противоречит «events are immutable forever».
Crypto-shredding:
Per-user encryption key in separate KMS.All PII в events encrypted with this key.User requests deletion → delete the key.Events still exist, но decryption невозможна → effectively forgotten.type PIIEvent struct { UserID string PIIBlob []byte // encrypted с user's KMS key}
func encryptPII(userID string, pii Personal) []byte { key := kms.GetUserKey(userID) return aesGcmEncrypt(key, marshal(pii))}После удаления key из KMS — все исторические events этого user становятся unreadable.
Tombstone events:
UserDeletedEvent — projections handle: remove user's data.But raw events остаются.⚠️ Юридический аспект: проверяйте с lawyer, считается ли это «forgetting» в смысле GDPR. В EU чаще требуют physical deletion. Crypto-shredding — pragmatic compromise.
2.11 Performance: write throughput
Заголовок раздела «2.11 Performance: write throughput»Bottleneck: aggregate-level concurrency.
Aggregate Order:abc - process command 1 → check version, append event - process command 2 → check version, конфликт если v1 не commitedВ Postgres это SELECT FOR UPDATE или unique constraint на (aggregate_id, version).
Single aggregate ≤ N events/sec where N зависит от DB latency (~100–1000/sec).
Если нужно больше — partition aggregate (e.g., per-product inventory → per-warehouse-product).
2.12 Distributed Event Sourcing — sharding
Заголовок раздела «2.12 Distributed Event Sourcing — sharding»Партиционирование по hash(aggregate_id) % N:
- Каждая partition имеет свой event store / Kafka partition.
- Aggregate всегда читается/пишется в один shard.
- Cross-aggregate transactions impossible → saga.
2.13 Real cases по индустриям
Заголовок раздела «2.13 Real cases по индустриям»Finance:
- Banking: every account event = audit log mandatory.
- Trading: order book is naturally event-sourced (place, modify, cancel, fill).
E-commerce:
- Order lifecycle: created, paid, picked, shipped, delivered.
- Inventory: receive, reserve, release, adjust.
Healthcare:
- Patient records: every observation, prescription, lab result as event.
- HIPAA requires audit log → ES gives this naturally.
Gaming:
- Player actions, achievements, items.
- Replay для game review / cheat detection.
2.14 Go libraries
Заголовок раздела «2.14 Go libraries»Mostly homegrown (each company builds their own ES framework).
Available libraries:
github.com/modernice/goes— generic ES framework для Go.github.com/EventStore/EventStore-Client-Go— official EventStoreDB client.github.com/segmentio/kafka-go— Kafka client, can build ES on top.
⚠️ Не verging «ES framework» в Go нет industry standard. Большинство production projects писали свою тонкую обёртку над PG/Kafka.
2.15 Когда НЕ использовать ES
Заголовок раздела «2.15 Когда НЕ использовать ES»- Простой CRUD без сложной бизнес-логики.
- Стартап в early stage — overengineering замедлит iterate.
- Team не имеет experience — kompleksnost высокая.
- Нет требования audit trail / time travel.
Когда использовать:
- Highly regulated domains (finance, healthcare).
- Сложные бизнес-процессы с долгой историей (insurance claims, supply chain).
- Когда replay events для analytics / ML — value add.
3. Gotchas
Заголовок раздела «3. Gotchas»⚠️ Eventual consistency между write и read. UI может показать «order created», а через 100 мс GET вернёт «not found». Решение: optimistic UI или wait-for-projection pattern.
⚠️ Snapshot becomes wrong после schema change. Need versioning snapshot too или drop all snapshots при schema upgrade.
⚠️ Projection lag в продакшене. Под нагрузкой 100K events/sec, projection processing 50K/sec — lag растёт. Monitor lag, alert если > X minutes.
⚠️ Reordering events. Если processing в параллель — events за aggregate могут прийти в wrong order. Solution: keyed processing (Kafka partition by aggregate_id).
⚠️ Idempotency vs ordering. Идемпотентный handler можно retry; но если processed event 1 после event 2 — состояние неконсистентно. Both нужны.
⚠️ Storage growth. 1 billion events × 1 KB = 1 ТБ. Plan retention или archival.
⚠️ Event size bloat. Don’t put full state in event. Только delta / what changed.
⚠️ Schema evolution breaks replay. Тестировать replay от oldest event каждый release.
⚠️ Distributed transactions desire — иногда tempting запустить «событие в PG + событие в Kafka» атомарно. Это outbox pattern (file 17).
⚠️ CQRS premature application. Маленький CRUD без real read/write divergence не нуждается в CQRS. Можно начать без, добавить когда нужно.
⚠️ Read model rebuild downtime. Не делайте blue-green правильно — простой может быть часами.
⚠️ GDPR crypto-shredding не universally accepted. Legal counsel required.
⚠️ Event versions naming clash. Use semver or explicit v1, v2 suffixes в event types: OrderCreated.v2.
⚠️ Concurrent writes to same aggregate. PG unique constraint or optimistic locking. Retry with backoff.
4. Real cases
Заголовок раздела «4. Real cases»Case 1: Banking transaction log
Заголовок раздела «Case 1: Banking transaction log»Контекст: traditional bank, migration to event-sourced ledger.
Setup:
- Event store: PG с events table, partitioned by month.
- Aggregates: accounts. Events: Credit, Debit, FeeApplied, InterestEarned.
- Snapshots каждые 100 events.
- Read models: balance (Redis), transactions list (PG view), monthly statements (S3).
Outcome:
- Audit complete since day 1.
- Regulator compliance: easy to prove «who changed what when».
- Time travel: «what was balance on 2024-03-15?» — replay до того момента.
Case 2: E-commerce order replay
Заголовок раздела «Case 2: E-commerce order replay»Контекст: support team часто запрашивает «who clicked refund button, when».
Without ES: order_status_log table, partial info.
With ES: events RefundRequested, RefundApproved, RefundProcessed. Full causality. Support открывает customer service tool, видит full timeline.
Case 3: Projection rebuild стал bottleneck
Заголовок раздела «Case 3: Projection rebuild стал bottleneck»Контекст: новая feature — search по orders. Создали ElasticSearch projection.
Проблема: rebuild from 5-year event store — 3 дня в single-thread.
Solution:
- Partition rebuild по hash(order_id) % 10.
- 10 parallel workers.
- Rebuild = 7 hours.
Case 4: Schema migration broke replay
Заголовок раздела «Case 4: Schema migration broke replay»Симптом: new deploy, replay broken at events from 2 years ago.
Анализ: dev добавил поле string в event payload без default. Старые events не имеют поля → panic on unmarshal.
Fix:
- Made поле
*string(pointer = nullable). - Upcaster fills default for nil.
- Added test: replay all events from oldest period в CI.
Case 5: GDPR crypto-shredding incident
Заголовок раздела «Case 5: GDPR crypto-shredding incident»Контекст: user requested deletion. Legal: «всё удалить».
Implementation: KMS key для user removed. Events become unreadable.
Issue (2 года спустя): ML team захотела использовать historical events для training. 5% events un-decryptable. Был кризис: «у нас data loss».
Resolution: aggregated ML features computed BEFORE deletion. После deletion — только anonymized aggregates. Legal был ОК.
5. Вопросы (20)
Заголовок раздела «5. Вопросы (20)»- CQRS: что разделяется и почему?
- Event Sourcing: чем отличается от хранения state?
- CQRS + ES синергия — какие преимущества?
- CQRS без ES возможен? Привести пример.
- Mediator pattern в CQRS — зачем?
- Какие read storage tech выбрать для full-text search vs analytics vs точечный поиск?
- Сравнить event store: PG, EventStoreDB, Kafka.
- Aggregate replay: алгоритм.
- Snapshot: когда снимать, как версионировать.
- Projections rebuild стратегии (parallel, blue-green).
- Event versioning через upcasting — пример.
- Crypto-shredding для GDPR: как работает.
- Tombstone events vs deletion: что выбрать?
- Write throughput limit: что bottleneck в aggregate?
- Distributed ES: sharding по aggregate_id.
- Reordering events в kafka — как избежать?
- Идемпотентность projection handler — почему критично?
- Когда CQRS overengineering?
- Когда ES не подходит?
- Опишите кейс, где ES спас аудит или ML team.
6. Practice
Заголовок раздела «6. Practice»Задача 1: Реализовать простой ES store на Postgres: events table, append, load by aggregate_id.
Задача 2: Aggregate Account с methods Deposit, Withdraw. Persist events, replay.
Задача 3: Добавить snapshot mechanism — каждые 10 events snapshot.
Задача 4: Реализовать projection BalanceView в Redis. Subscribe to events.
Задача 5: Schema evolution: add field currency в event. Реализовать upcaster v1 → v2.
Задача 6: Rebuild projection from scratch — script для production-like load.
Задача 7: Реализовать crypto-shredding: user-specific encryption key, encrypted PII.
Задача 8 (advanced): Distributed ES с partitioning по hash(aggregate_id). Kafka partitions, projection per partition.
Задача 9: Поднять EventStoreDB через docker-compose, написать basic Go client.
7. Источники
Заголовок раздела «7. Источники»- Greg Young, “CQRS Documents”, архив, https://cqrs.files.wordpress.com/2010/11/cqrs_documents.pdf
- Greg Young, “Event Sourcing” talks (multiple), 2010–2018.
- Martin Fowler, “Event Sourcing”, https://martinfowler.com/eaaDev/EventSourcing.html
- Vaughn Vernon, “Implementing Domain-Driven Design”, Addison-Wesley, 2013.
- Eric Evans, “Domain-Driven Design”, Addison-Wesley, 2003.
- Chris Richardson, “Microservices Patterns”, 2018.
- EventStoreDB Documentation, https://developers.eventstore.com/
- Adam Dymitruk, “Event Sourcing for the masses”, talks.
- Confluent Blog, “Event Sourcing with Kafka”, 2018.
- ModernAuth/goes — github.com/modernice/goes
- Bartosz Sypytkowski, “GDPR and Event Sourcing”, 2018.
- PostgreSQL documentation: jsonb, advisory locks for aggregate concurrency.
- ClickHouse для projections, materialized views docs.
- Pat Helland, “Immutability Changes Everything”, CIDR 2015.
- Mike Amundsen, “RESTful Web APIs”, chapters on hypermedia + state.