Outbox Pattern, Event Sourcing и CQRS
Зачем знать: Outbox — must-have для надёжной публикации событий из микросервисов. Без него ты гарантированно потеряешь события или продублируешь их при сбоях. Event Sourcing и CQRS — продвинутые паттерны для систем с богатой бизнес-логикой, аудитом и высокой нагрузкой на чтение. На уровне Middle 2 ты должен уметь объяснить, зачем нужен Outbox, отличать ES от CQRS, и понимать, где они уместны, а где — overengineering.
Содержание
Заголовок раздела «Содержание»- Концепция: Outbox, Inbox, Event Sourcing, CQRS
- Под капотом: реализация и схемы
- Gotchas
- Production-практики
- Вопросы для собеседования
- Practice
- Источники
1. Концепция
Заголовок раздела «1. Концепция»1.1 Проблема dual-write
Заголовок раздела «1.1 Проблема dual-write»Типичный сервис записывает в БД и публикует event в очередь:
func (s *OrderService) Create(ctx context.Context, order Order) error { // 1. Save to DB if err := s.repo.Save(ctx, order); err != nil { return err }
// 2. Publish event if err := s.kafka.Publish(ctx, "order.created", order); err != nil { return err // ⚠️ DB записала, event не опубликован → inconsistency }
return nil}4 возможных сценария:
| DB Save | Kafka Publish | Результат |
|---|---|---|
| ✓ | ✓ | OK |
| ✗ | ─ | OK (rollback) |
| ✓ | ✗ | ❌ Inconsistency: order создан, никто не знает |
| ✗ | ✓ | ❌ Inconsistency: event есть, order нет |
Это dual-write problem — две независимые транзакции, нет atomicity.
1.2 Outbox Pattern
Заголовок раздела «1.2 Outbox Pattern»Идея: записывать event в ту же БД-транзакцию в специальную таблицу outbox. Отдельный процесс читает outbox и публикует в Kafka.
┌─────────────────┐[OrderService] ──BEGIN──► [DB] │ Outbox Worker │ │ │ │ (background) │ │ ▼ │ │ │ orders │ ┌──────────┐ │ │ + │ │ outbox │──┼──► [Kafka] │ outbox │ └──────────┘ │ │ │ └─────────────────┘ │ COMMIT ▼ ACID гарантияBEGIN;INSERT INTO orders (...) VALUES (...);INSERT INTO outbox (aggregate_id, event_type, payload, created_at)VALUES (1, 'OrderCreated', '{"id":1,"total":100}', now());COMMIT;Worker:
func (w *OutboxWorker) Process(ctx context.Context) error { rows, _ := db.Query(ctx, ` SELECT id, event_type, payload FROM outbox WHERE published_at IS NULL ORDER BY id LIMIT 100 FOR UPDATE SKIP LOCKED `)
for rows.Next() { var ev Event rows.Scan(&ev.ID, &ev.Type, &ev.Payload)
if err := w.publisher.Publish(ctx, ev.Type, ev.Payload); err != nil { continue // retry next iteration }
db.Exec(ctx, "UPDATE outbox SET published_at = now() WHERE id = $1", ev.ID) } return nil}1.3 Inbox Pattern
Заголовок раздела «1.3 Inbox Pattern»Counterpart outbox — на consumer-стороне. Дедуплицировать входящие события.
Kafka → Consumer → Check inbox table │ Already processed? → skip No? → process + record in inbox (in same tx)BEGIN;-- CheckSELECT 1 FROM inbox WHERE event_id = $1;-- ProcessINSERT INTO orders ...;-- RecordINSERT INTO inbox (event_id, processed_at) VALUES ($1, now());COMMIT;event_id — unique key события (генерируется producer’ом).
1.4 At-least-once Delivery
Заголовок раздела «1.4 At-least-once Delivery»Kafka и большинство брокеров гарантируют at-least-once. Это значит:
- Сообщение точно дойдёт, но может дублироваться.
- Consumer должен быть идемпотентным (через inbox или idempotency key).
Exactly-once в Kafka — есть (transactional producer + read_committed consumer), но накладывает performance overhead и требует особой настройки. В реальности at-least-once + idempotent consumer — более популярный подход.
1.5 Event Sourcing
Заголовок раздела «1.5 Event Sourcing»Идея: хранить не текущее состояние, а последовательность событий, которые к нему привели.
Традиционный подход (state-based):
account: id: 1 balance: 50Event Sourcing:
events: 1. AccountOpened(id=1, balance=0) 2. Deposited(id=1, amount=100) 3. Withdrawn(id=1, amount=30) 4. Withdrawn(id=1, amount=20)
Current state = apply все events = balance: 50Преимущества:
- Audit log бесплатно — каждое изменение записано.
- Time-travel — состояние на любую дату.
- Multiple projections — разные read models из одних events.
- Replay — восстановить state из events (например, после bug).
- Temporal queries — “сколько было денег вчера?”.
Недостатки:
- Сложность — не CRUD, требует другого mindset.
- Eventual consistency — projections отстают.
- Schema evolution — старые events нужно поддерживать вечно.
- Performance — replay миллионов events медленный → нужны snapshots.
- Learning curve — команда должна понимать.
1.6 CQRS
Заголовок раздела «1.6 CQRS»Command Query Responsibility Segregation — раздельные модели для чтения и записи.
┌─────────────┐ ┌─────────────────┐ ┌──────────────┐│ Client │─Command─► Write side │─Events─►│ Read side ││ │ │ (commands, │ │ (projections,││ │◄─Query──┤ validation) │ │ views) │└─────────────┘ └─────────────────┘ └──────────────┘ │ │ ▼ ▼ ┌──────────┐ ┌──────────┐ │ Write │ │ Read │ │ DB │ │ DBs │ └──────────┘ └──────────┘Write side:
- Принимает commands (CreateOrder, UpdateUser).
- Validation, business logic.
- Записывает в write DB (или event store).
- Публикует events.
Read side:
- Слушает events.
- Обновляет read models (projections).
- Read models — denormalized, оптимизированы для конкретного query.
CQRS без Event Sourcing: ок. Просто 2 модели — write (нормализованная) и read (denormalized).
Event Sourcing без CQRS: трудно. Если хранишь events, нужно как-то их читать → projections → де-факто CQRS.
Каноническая комбинация: ES + CQRS.
1.7 Когда использовать
Заголовок раздела «1.7 Когда использовать»Outbox/Inbox: почти всегда, если есть микросервисы с messaging.
Event Sourcing:
- ✓ Финансовые системы (audit требуется).
- ✓ Bookings, reservations (history важна).
- ✓ Сложный домен (DDD aggregate с rich behavior).
- ✗ Простой CRUD (overkill).
- ✗ Команда не знает ES (learning curve > польза).
CQRS:
- ✓ Сильно отличающиеся paths чтения/записи.
- ✓ Высокая нагрузка на чтение (read replicas, materialized views).
- ✓ Сложные query (full-text search, аналитика).
- ✗ Маленькое приложение (overhead не окупается).
2. Под капотом / Архитектура
Заголовок раздела «2. Под капотом / Архитектура»2.1 Outbox: схема таблицы
Заголовок раздела «2.1 Outbox: схема таблицы»CREATE TABLE outbox ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, aggregate_type TEXT NOT NULL, -- 'Order', 'User' aggregate_id TEXT NOT NULL, -- ID агрегата (FK на orders/users — опционально) event_id UUID NOT NULL UNIQUE, -- для идемпотентности event_type TEXT NOT NULL, -- 'OrderCreated', 'OrderShipped' payload JSONB NOT NULL, headers JSONB, -- trace_id, correlation_id created_at TIMESTAMPTZ NOT NULL DEFAULT now(), published_at TIMESTAMPTZ);
-- Индекс для worker'аCREATE INDEX idx_outbox_unpublished ON outbox(id) WHERE published_at IS NULL;Опционально: TTL на старые записи (удалять через 30 дней).
2.2 Outbox: write side
Заголовок раздела «2.2 Outbox: write side»type OrderRepository struct { pool *pgxpool.Pool}
func (r *OrderRepository) CreateOrder(ctx context.Context, order Order) error { tx, err := r.pool.Begin(ctx) if err != nil { return err } defer tx.Rollback(ctx)
// 1. Insert order if _, err := tx.Exec(ctx, ` INSERT INTO orders (id, user_id, total, status) VALUES ($1, $2, $3, $4) `, order.ID, order.UserID, order.Total, order.Status); err != nil { return err }
// 2. Insert outbox event payload, _ := json.Marshal(OrderCreatedEvent{ OrderID: order.ID, UserID: order.UserID, Total: order.Total, }) if _, err := tx.Exec(ctx, ` INSERT INTO outbox (aggregate_type, aggregate_id, event_id, event_type, payload) VALUES ($1, $2, $3, $4, $5) `, "Order", order.ID, uuid.New(), "OrderCreated", payload); err != nil { return err }
return tx.Commit(ctx)}2.3 Outbox: polling worker
Заголовок раздела «2.3 Outbox: polling worker»type OutboxWorker struct { pool *pgxpool.Pool publisher EventPublisher interval time.Duration}
func (w *OutboxWorker) Run(ctx context.Context) error { ticker := time.NewTicker(w.interval) defer ticker.Stop()
for { select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: if err := w.processBatch(ctx); err != nil { log.Printf("outbox process failed: %v", err) } } }}
func (w *OutboxWorker) processBatch(ctx context.Context) error { tx, err := w.pool.Begin(ctx) if err != nil { return err } defer tx.Rollback(ctx)
// Берём батч с SKIP LOCKED — параллельные worker'ы не конкурируют rows, err := tx.Query(ctx, ` SELECT id, event_id, event_type, payload, headers FROM outbox WHERE published_at IS NULL ORDER BY id LIMIT 100 FOR UPDATE SKIP LOCKED `) if err != nil { return err }
var events []outboxEvent for rows.Next() { var e outboxEvent if err := rows.Scan(&e.ID, &e.EventID, &e.Type, &e.Payload, &e.Headers); err != nil { return err } events = append(events, e) } rows.Close()
// Публикуем for _, e := range events { if err := w.publisher.Publish(ctx, e.Type, e.EventID, e.Payload); err != nil { return err // не commit'им — попробуем в следующий тик } }
// Mark as published if len(events) > 0 { ids := make([]int64, len(events)) for i, e := range events { ids[i] = e.ID } if _, err := tx.Exec(ctx, ` UPDATE outbox SET published_at = now() WHERE id = ANY($1) `, ids); err != nil { return err } }
return tx.Commit(ctx)}Pitfalls:
FOR UPDATE SKIP LOCKED— несколько worker’ов могут работать параллельно.- Если publisher.Publish() кидает, но Kafka реально получил — на следующем тике мы опубликуем повторно → consumer должен быть идемпотентным (через event_id).
2.4 Outbox через CDC (Change Data Capture)
Заголовок раздела «2.4 Outbox через CDC (Change Data Capture)»Альтернатива polling — Debezium читает WAL Postgres и эмитит change events в Kafka.
[PG]──WAL──►[Debezium]──change events──►[Kafka]Преимущества:
- Меньше latency (читает WAL непрерывно).
- Не нагружает основную БД polling-запросами.
- Не нужно писать worker.
Недостатки:
- Сложнее инфра.
- События не имеют семантической формы — нужно фильтровать (insert в outbox vs другие таблицы).
- Debezium читает физическую replication slot — нужно мониторить.
Pattern: писать в outbox таблицу, Debezium ловит изменения только этой таблицы.
2.5 Inbox: схема и логика
Заголовок раздела «2.5 Inbox: схема и логика»CREATE TABLE inbox ( event_id UUID PRIMARY KEY, event_type TEXT NOT NULL, processed_at TIMESTAMPTZ NOT NULL DEFAULT now());Consumer:
func (c *OrderConsumer) Handle(ctx context.Context, event Event) error { tx, err := c.pool.Begin(ctx) if err != nil { return err } defer tx.Rollback(ctx)
// Check inbox var exists bool if err := tx.QueryRow(ctx, ` SELECT EXISTS(SELECT 1 FROM inbox WHERE event_id = $1) `, event.ID).Scan(&exists); err != nil { return err } if exists { return tx.Commit(ctx) // already processed }
// Process event switch event.Type { case "OrderCreated": if err := c.processOrderCreated(ctx, tx, event); err != nil { return err } }
// Record in inbox if _, err := tx.Exec(ctx, ` INSERT INTO inbox (event_id, event_type) VALUES ($1, $2) `, event.ID, event.Type); err != nil { return err }
return tx.Commit(ctx)}2.6 Event Sourcing: store
Заголовок раздела «2.6 Event Sourcing: store»CREATE TABLE events ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, aggregate_id UUID NOT NULL, aggregate_type TEXT NOT NULL, event_type TEXT NOT NULL, event_version INT NOT NULL, -- порядковый в aggregate payload JSONB NOT NULL, metadata JSONB, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), UNIQUE (aggregate_id, event_version) -- optimistic concurrency control);
CREATE INDEX idx_events_aggregate ON events(aggregate_id, event_version);Concurrency control:
- При append’е events для aggregate —
INSERTсevent_version = current + 1. - Если параллельно — UNIQUE constraint failure → retry с актуальным version.
2.7 Event Sourcing: aggregate
Заголовок раздела «2.7 Event Sourcing: aggregate»type Account struct { ID uuid.UUID Balance int64 Version int
pending []Event // not yet persisted}
func (a *Account) Apply(e Event) { switch ev := e.(type) { case AccountOpened: a.ID = ev.ID a.Balance = 0 case Deposited: a.Balance += ev.Amount case Withdrawn: a.Balance -= ev.Amount } a.Version++}
func (a *Account) Deposit(amount int64) error { if amount <= 0 { return errors.New("amount must be positive") } e := Deposited{ID: a.ID, Amount: amount} a.Apply(e) a.pending = append(a.pending, e) return nil}
func (a *Account) Withdraw(amount int64) error { if a.Balance < amount { return errors.New("insufficient funds") } e := Withdrawn{ID: a.ID, Amount: amount} a.Apply(e) a.pending = append(a.pending, e) return nil}Repository:
func (r *EventStore) Save(ctx context.Context, a *Account) error { tx, _ := r.pool.Begin(ctx) defer tx.Rollback(ctx)
for i, e := range a.pending { payload, _ := json.Marshal(e) if _, err := tx.Exec(ctx, ` INSERT INTO events (aggregate_id, aggregate_type, event_type, event_version, payload) VALUES ($1, 'Account', $2, $3, $4) `, a.ID, eventTypeName(e), a.Version-len(a.pending)+i+1, payload); err != nil { return err } }
a.pending = nil return tx.Commit(ctx)}
func (r *EventStore) Load(ctx context.Context, id uuid.UUID) (*Account, error) { rows, _ := r.pool.Query(ctx, ` SELECT event_type, payload FROM events WHERE aggregate_id = $1 ORDER BY event_version `, id)
a := &Account{ID: id} for rows.Next() { var etype string var payload []byte rows.Scan(&etype, &payload)
e := decodeEvent(etype, payload) a.Apply(e) } return a, nil}2.8 Snapshots
Заголовок раздела «2.8 Snapshots»При длинной истории (>10K events) load становится медленным. Snapshot — сохранённое состояние на момент N-го event:
CREATE TABLE snapshots ( aggregate_id UUID PRIMARY KEY, aggregate_type TEXT NOT NULL, version INT NOT NULL, state JSONB NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now());Load:
func (r *EventStore) LoadFromSnapshot(ctx context.Context, id uuid.UUID) (*Account, error) { var version int var state []byte err := r.pool.QueryRow(ctx, ` SELECT version, state FROM snapshots WHERE aggregate_id = $1 `, id).Scan(&version, &state)
a := &Account{} if err == nil { json.Unmarshal(state, a) }
rows, _ := r.pool.Query(ctx, ` SELECT event_type, payload FROM events WHERE aggregate_id = $1 AND event_version > $2 ORDER BY event_version `, id, version)
for rows.Next() { var etype string; var payload []byte rows.Scan(&etype, &payload) a.Apply(decodeEvent(etype, payload)) } return a, nil}Snapshot создаётся каждые N events (например, каждые 100).
2.9 CQRS: projections
Заголовок раздела «2.9 CQRS: projections»Projections — read models. Слушают events, обновляют read tables.
type OrderListProjection struct { pool *pgxpool.Pool}
func (p *OrderListProjection) Handle(ctx context.Context, e Event) error { switch ev := e.(type) { case OrderCreated: _, err := p.pool.Exec(ctx, ` INSERT INTO orders_list_view (order_id, user_id, total, status, created_at) VALUES ($1, $2, $3, 'pending', $4) `, ev.OrderID, ev.UserID, ev.Total, ev.CreatedAt) return err case OrderShipped: _, err := p.pool.Exec(ctx, ` UPDATE orders_list_view SET status = 'shipped' WHERE order_id = $1 `, ev.OrderID) return err } return nil}Read query идёт напрямую к orders_list_view — быстро, без JOIN’ов.
2.10 CQRS: разные хранилища
Заголовок раздела «2.10 CQRS: разные хранилища»Write side → PostgreSQL (events, normalized) ↓ events ↓ ┌───────┴────────┬──────────┐ ▼ ▼ ▼[orders_view] [analytics] [search](PostgreSQL) (ClickHouse) (Elasticsearch)Каждый read model оптимизирован под свои запросы.
2.11 Полная схема в Go (CQRS + Outbox + EventSourcing)
Заголовок раздела «2.11 Полная схема в Go (CQRS + Outbox + EventSourcing)»[Client] ──HTTP──► [API Layer] │ ├─Command──► [Command Handler] │ │ │ ├─► Load aggregate (Event Store) │ ├─► Apply business logic │ ├─► Save events (+ outbox in same tx) │ └─► Return result │ └─Query────► [Query Handler] ──► Read DB (view)
[Outbox Worker] ──► [Kafka] ──► [Projection Workers] ──► Read DBs3. Gotchas
Заголовок раздела «3. Gotchas»3.1 ⚠️ Не пиши в Kafka напрямую из транзакции
Заголовок раздела «3.1 ⚠️ Не пиши в Kafka напрямую из транзакции»// ПЛОХОtx.Exec("INSERT INTO orders ...")kafka.Publish("order.created", order) // вне txtx.Commit()Если Publish уже опубликовал, а Commit упал → event есть, заказа нет.
Используй outbox.
3.2 ⚠️ Outbox без TTL раздувается
Заголовок раздела «3.2 ⚠️ Outbox без TTL раздувается»Миллионы published записей → bloat. Удалять старые:
DELETE FROM outbox WHERE published_at < now() - INTERVAL '7 days';Или partition by month + drop старых partitions.
3.3 ⚠️ At-least-once → idempotent consumers обязательны
Заголовок раздела «3.3 ⚠️ At-least-once → idempotent consumers обязательны»Outbox + Kafka = at-least-once. Consumer обязан дедуплицировать через event_id.
Inbox table или unique constraint на natural key.
3.4 ⚠️ Polling overhead
Заголовок раздела «3.4 ⚠️ Polling overhead»Worker запрашивает БД каждые N секунд. Если outbox пустой 99% времени — лишняя нагрузка.
Решения:
- LISTEN/NOTIFY в Postgres: producer делает
NOTIFY outbox_new, worker слушает. - Debezium (CDC) — реагирует на WAL немедленно.
- Adaptive polling: если ничего нет — увеличить interval.
3.5 ⚠️ Outbox + transactional outbox latency
Заголовок раздела «3.5 ⚠️ Outbox + transactional outbox latency»Polling каждые 5 sec → +5 sec latency для events. Если бизнес требует real-time → Debezium.
3.6 ⚠️ Schema evolution events
Заголовок раздела «3.6 ⚠️ Schema evolution events»Event “OrderCreated v1” в БД 2 года назад. Сейчас v3 с дополнительными полями.
Подходы:
- Upcasting: при load — transform v1 → v3.
- Versioned events:
OrderCreated_v1,OrderCreated_v2. - Backward-compatible: новые поля только optional.
⚠️ Никогда не меняй semantics existing event. Только добавляй новые fields.
3.7 ⚠️ Event Sourcing для CRUD = overkill
Заголовок раздела «3.7 ⚠️ Event Sourcing для CRUD = overkill»Если у тебя users (id, email, name) и единственная операция — UPDATE name → ES бесполезен. Лишний код, лишняя сложность.
ES оправдан, когда бизнес-процесс сложный и история важна.
3.8 ⚠️ Projections могут отставать
Заголовок раздела «3.8 ⚠️ Projections могут отставать»Read replica latency. Обычно ms, но при больших batch’ах — secs.
UI: “Order created” но в list не появился сразу. Решение:
- Optimistic UI update (показать сразу, потом перезагрузить).
- Read-your-writes: после command — query напрямую к write store.
3.9 ⚠️ Rebuild projections больно
Заголовок раздела «3.9 ⚠️ Rebuild projections больно»Изменился projection схема? Нужно re-build из всех events. Если их 10M — это часы.
Решения:
- Параллельная rebuild (несколько workers).
- Постепенный rollover (blue-green projections).
3.10 ⚠️ Snapshots устаревают
Заголовок раздела «3.10 ⚠️ Snapshots устаревают»Сделал snapshot v=100. Потом изменилось aggregate code. Старый snapshot — incompatible.
Версионирование snapshots. При load — проверять compatibility, если не подходит — rebuild from events.
3.11 ⚠️ Distributed deadlock с FOR UPDATE
Заголовок раздела «3.11 ⚠️ Distributed deadlock с FOR UPDATE»Outbox worker’ы конкурируют за rows. SKIP LOCKED спасает.
SELECT ... FROM outbox WHERE published_at IS NULLFOR UPDATE SKIP LOCKED LIMIT 100;Без SKIP LOCKED — worker’ы будут блокировать друг друга.
3.12 ⚠️ Order events критичен
Заголовок раздела «3.12 ⚠️ Order events критичен»Kafka гарантирует order внутри partition. Если все события aggregate идут в разные partitions — order ломается.
Partition key = aggregate_id — все события одного aggregate в одной partition.
3.13 ⚠️ Outbox events после reboot
Заголовок раздела «3.13 ⚠️ Outbox events после reboot»Worker упал во время обработки batch’а. Не успел UPDATE outbox SET published_at. После reboot повторно опубликует → duplicates (но дедупликация на consumer).
3.14 ⚠️ CQRS without need
Заголовок раздела «3.14 ⚠️ CQRS without need»Двойная инфраструктура (write + read DBs), 2 модели в коде. Если read и write paths не сильно отличаются — добавляешь complexity без пользы.
3.15 ⚠️ Event store не для query
Заголовок раздела «3.15 ⚠️ Event store не для query»Event store оптимизирован для append + load by aggregate_id. Не для “найди все orders, где total > 1000”. Для query — projections.
3.16 ⚠️ GDPR и Event Sourcing
Заголовок раздела «3.16 ⚠️ GDPR и Event Sourcing»Право на удаление (GDPR Article 17) конфликтует с immutable events.
Подходы:
- Crypto-shredding: события зашифрованы per-user. Удаляешь key → event нечитаем.
- Tombstone events: маркируется “user deleted”, projections удаляют PII.
- Erasure: явно удалить events (нарушает immutability, но иногда нужно).
4. Production-практики
Заголовок раздела «4. Production-практики»4.1 Outbox: best practices
Заголовок раздела «4.1 Outbox: best practices»- Partition outbox by month для эффективного cleanup.
- Index on
WHERE published_at IS NULL— partial index. - Batch publish (100-1000 events за раз).
- Retry с backoff для publisher failures.
- Metrics: outbox size, lag (oldest unpublished age).
- Alert если lag > 1 min или size > 10K.
- TTL на published events (cleanup).
4.2 Debezium setup
Заголовок раздела «4.2 Debezium setup»{ "name": "outbox-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "pg-master", "database.dbname": "orders", "database.user": "debezium", "table.include.list": "public.outbox", "transforms": "outbox", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "transforms.outbox.table.fields.additional.placement": "headers:header" }}Debezium SMT (Single Message Transform) для outbox — routes по event_type в правильный topic.
4.3 Outbox metrics
Заголовок раздела «4.3 Outbox metrics»var ( outboxSize = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "outbox_unpublished_count", }) outboxLag = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "outbox_oldest_unpublished_seconds", }) outboxPublishDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "outbox_publish_duration_seconds", }))
// Periodic updatefunc updateMetrics(ctx context.Context, pool *pgxpool.Pool) { var size int64 var lag *float64 pool.QueryRow(ctx, ` SELECT COUNT(*), EXTRACT(EPOCH FROM (now() - MIN(created_at))) FROM outbox WHERE published_at IS NULL `).Scan(&size, &lag) outboxSize.Set(float64(size)) if lag != nil { outboxLag.Set(*lag) }}4.4 Event Store: партиционирование
Заголовок раздела «4.4 Event Store: партиционирование»Для миллиардов events:
CREATE TABLE events ( ...) PARTITION BY HASH (aggregate_id);
CREATE TABLE events_p0 PARTITION OF events FOR VALUES WITH (MODULUS 16, REMAINDER 0);-- ... p1...p15Aggregate всегда в одной partition — load остаётся быстрым.
4.5 EventStoreDB
Заголовок раздела «4.5 EventStoreDB»EventStoreDB — purpose-built event store (от Greg Young). Альтернатива PostgreSQL для ES.
- Native append-only log.
- Subscriptions (catchup, persistent).
- Projections (запрашиваемые проекции через JS).
- Cluster mode.
В Go: EventStore/EventStore-Client-Go.
client, _ := esdb.NewClient(esdbConfig)events := []esdb.EventData{{ EventType: "OrderCreated", Data: payload,}}client.AppendToStream(ctx, "Order-"+orderID, esdb.AppendToStreamOptions{}, events...)4.6 Реальный пример: e-commerce заказ
Заголовок раздела «4.6 Реальный пример: e-commerce заказ»1. POST /orders ↓2. OrderService.CreateOrder ├─ Validate ├─ Begin tx ├─ INSERT INTO orders ... ├─ INSERT INTO outbox (OrderCreated event) └─ Commit ↓3. Outbox Worker → Kafka topic "order.events" ↓4. Subscribers: ├─ InventoryService → reserve items → InventoryReserved event ├─ NotificationService → email "Order received" └─ AnalyticsProjection → INSERT INTO orders_analytics4.7 Реальный пример: banking (Event Sourcing)
Заголовок раздела «4.7 Реальный пример: banking (Event Sourcing)»Aggregate: AccountEvents: AccountOpened, Deposited, Withdrawn, Frozen, Unfrozen
CommandHandler.Withdraw: ├─ Load Account (replay events) ├─ Apply business rule: balance >= amount ├─ Generate Withdrawn event ├─ Append to events table (in tx with outbox) └─ Publish event
Projections: - account_balances (current balance per account) - transaction_history (denormalized list) - monthly_statements (aggregated) - fraud_detection (rule-based)4.8 CQRS отдельные сервисы
Заголовок раздела «4.8 CQRS отдельные сервисы»Write API (commands) и Read API (queries) можно деплоить как разные сервисы:
- Write service: low traffic, complex logic.
- Read service: high traffic, simple logic, scale horizontally.
Write: 2 pods (HA)Read: 20 pods (load)4.9 Test strategy
Заголовок раздела «4.9 Test strategy»Outbox tests:
- Integration test: tx COMMIT → event в outbox.
- Worker test: outbox row → publish called.
- Failure test: publish error → row остаётся unpublished.
Event Sourcing tests:
- Aggregate unit tests: command → expected events.
- Replay tests: events → expected state.
- Projection tests: events → expected read model.
func TestAccount_Withdraw(t *testing.T) { a := &Account{} a.Apply(AccountOpened{ID: "1"}) a.Apply(Deposited{Amount: 100})
err := a.Withdraw(50) require.NoError(t, err) require.Equal(t, int64(50), a.Balance) require.Len(t, a.pending, 1)
err = a.Withdraw(100) // insufficient require.Error(t, err)}4.10 Cleanup и archiving
Заголовок раздела «4.10 Cleanup и archiving»Outbox: published_at > 30 days → archive to S3 / delete. Events: usually keep forever (audit). Если really много — archive cold events.
5. Вопросы для собеседования
Заголовок раздела «5. Вопросы для собеседования»-
Что такое dual-write problem? Запись в 2 системы (DB + queue) без atomicity → возможна inconsistency: одна успешна, другая нет.
-
Как Outbox pattern решает dual-write? Event пишется в outbox table в той же транзакции с бизнес-данными. Отдельный процесс читает outbox и публикует event. Atomicity гарантирована.
-
Какие есть способы реализации Outbox worker? Polling (запрос к outbox раз в N сек) или CDC (Debezium читает WAL).
-
Чем CDC лучше polling? Меньше latency (читает изменения немедленно), меньше нагрузки на основную БД. Сложнее в setup.
-
Что такое Inbox pattern? Counterpart Outbox на consumer стороне. Дедупликация входящих events через таблицу processed event IDs.
-
Что такое at-least-once delivery? Гарантия, что message доставлен минимум 1 раз. Может быть несколько раз — consumer должен быть идемпотентным.
-
Можно ли exactly-once? Kafka поддерживает (transactional producer + read_committed consumer), но с overhead. Обычно проще at-least-once + idempotent consumer.
-
Что такое Event Sourcing? Хранение последовательности events вместо текущего state. State получается replay’ем events.
-
Преимущества Event Sourcing? Полный audit log, time-travel, multiple projections из тех же events, replay для bug-fix.
-
Недостатки Event Sourcing? Сложность, eventual consistency, schema evolution, learning curve, performance (replay long histories).
-
Что такое snapshot в Event Sourcing? Сохранённое state на момент N-го event. Load: snapshot + events after N. Ускоряет load.
-
Что такое CQRS? Command Query Responsibility Segregation. Раздельные модели для write (commands) и read (queries).
-
Можно ли CQRS без Event Sourcing? Да. Просто 2 модели — write (normalized) и read (denormalized). Часто используется в legacy migration.
-
Можно ли Event Sourcing без CQRS? Технически да, но непрактично. Query по events напрямую — медленно. ES → projections → де-факто CQRS.
-
Когда использовать Event Sourcing? Финансы, bookings, complex domain с DDD. Когда audit нужен. Когда нужна история состояний.
-
Когда НЕ использовать Event Sourcing? Простой CRUD. Команда не знает ES. Нет требования к audit.
-
Что такое projection? Read model в CQRS. Обновляется по events. Оптимизирован для конкретного query.
-
Как обновлять projection при изменении схемы? Rebuild из events (если есть event log). Параллельно — blue-green deployment projections.
-
Какие гарантии порядка дают events? В Kafka — внутри partition. Partition key = aggregate_id → события одного aggregate в порядке.
-
Что такое concurrency control в Event Store? Optimistic: UNIQUE constraint на (aggregate_id, event_version). Если параллельный append — UNIQUE violation → retry.
-
Как решать problem удаления данных (GDPR) в Event Store? Crypto-shredding (зашифрованные events per-user, удаление key). Или tombstone + erase из projections.
-
Что такое event versioning? Events evolve со временем. Подходы: upcasting (transform old → new при load), versioned events (Event_v1, Event_v2), backward-compatible (только optional fields).
-
Чем outbox размер worry? Bloat (раздувание таблицы). TTL на published events. Партиционирование по дате.
-
Как обеспечить exactly-once с Outbox? Невозможно строго. Outbox + Kafka = at-least-once. Consumer должен быть идемпотентным через event_id + inbox.
-
Что такое eventual consistency в контексте CQRS? Между write и read models — задержка (ms-secs). UI должен обрабатывать “только что записал, ещё не виден в list”.
6. Practice
Заголовок раздела «6. Practice»Задача 1: Реализовать Outbox с polling
Заголовок раздела «Задача 1: Реализовать Outbox с polling»Таблица outbox + worker (Go). Тест: создать order → проверить, что event опубликован в Kafka (или log).
Задача 2: Outbox + Inbox flow
Заголовок раздела «Задача 2: Outbox + Inbox flow»2 сервиса. Service A пишет в outbox → Worker → Kafka → Service B Consumer → Inbox check → process. Симулировать duplicate event — проверить дедупликацию.
Задача 3: Event Sourcing для bank account
Заголовок раздела «Задача 3: Event Sourcing для bank account»Aggregate Account с командами Open, Deposit, Withdraw. Events: AccountOpened, Deposited, Withdrawn. Repository: append events, replay events. Тесты бизнес-правил.
Задача 4: Snapshot для аккаунта
Заголовок раздела «Задача 4: Snapshot для аккаунта»Расширить задачу 3. После каждых 50 events — snapshot. Load использует snapshot + events after.
Задача 5: CQRS с разными хранилищами
Заголовок раздела «Задача 5: CQRS с разными хранилищами»Write side: PostgreSQL (orders). Read side: ElasticSearch (orders search index). Projection updater слушает events из Kafka.
Задача 6: Outbox через Debezium
Заголовок раздела «Задача 6: Outbox через Debezium»Запустить Postgres + Debezium + Kafka в Docker. Сконфигурировать outbox connector. Проверить, что INSERT в outbox → message в Kafka.
Задача 7: Schema evolution
Заголовок раздела «Задача 7: Schema evolution»Реализовать upcasting: OrderCreated v1 (без currency field) → v2 (с currency, default USD). При load старых events — добавлять default.
Задача 8: Crypto-shredding
Заголовок раздела «Задача 8: Crypto-shredding»Каждый user имеет AES key. Сохранённые events для user — encrypted by his key. Delete user = delete key. Проверить, что после delete события расшифровать нельзя.
7. Источники
Заголовок раздела «7. Источники»- Microservices Patterns by Chris Richardson — Outbox pattern, Saga, CQRS.
- Designing Data-Intensive Applications by Martin Kleppmann — Chapter 11 (Stream Processing) и connections с Event Sourcing.
- Implementing Domain-Driven Design by Vaughn Vernon — DDD + Event Sourcing.
- Event Sourcing by Martin Fowler — https://martinfowler.com/eaaDev/EventSourcing.html
- CQRS by Martin Fowler — https://martinfowler.com/bliki/CQRS.html
- Debezium Documentation — https://debezium.io/documentation/
- EventStoreDB Documentation — https://developers.eventstore.com/
- Greg Young Talks на YouTube — отец Event Sourcing.
- Confluent Blog: Outbox Pattern — https://www.confluent.io/blog/transactional-outbox-pattern/
- microservices.io: Transactional Outbox — https://microservices.io/patterns/data/transactional-outbox.html