Перейти к содержимому

Outbox Pattern, Event Sourcing и CQRS

Зачем знать: Outbox — must-have для надёжной публикации событий из микросервисов. Без него ты гарантированно потеряешь события или продублируешь их при сбоях. Event Sourcing и CQRS — продвинутые паттерны для систем с богатой бизнес-логикой, аудитом и высокой нагрузкой на чтение. На уровне Middle 2 ты должен уметь объяснить, зачем нужен Outbox, отличать ES от CQRS, и понимать, где они уместны, а где — overengineering.

  1. Концепция: Outbox, Inbox, Event Sourcing, CQRS
  2. Под капотом: реализация и схемы
  3. Gotchas
  4. Production-практики
  5. Вопросы для собеседования
  6. Practice
  7. Источники

Типичный сервис записывает в БД и публикует event в очередь:

func (s *OrderService) Create(ctx context.Context, order Order) error {
// 1. Save to DB
if err := s.repo.Save(ctx, order); err != nil {
return err
}
// 2. Publish event
if err := s.kafka.Publish(ctx, "order.created", order); err != nil {
return err // ⚠️ DB записала, event не опубликован → inconsistency
}
return nil
}

4 возможных сценария:

DB SaveKafka PublishРезультат
OK
OK (rollback)
❌ Inconsistency: order создан, никто не знает
❌ Inconsistency: event есть, order нет

Это dual-write problem — две независимые транзакции, нет atomicity.

Идея: записывать event в ту же БД-транзакцию в специальную таблицу outbox. Отдельный процесс читает outbox и публикует в Kafka.

┌─────────────────┐
[OrderService] ──BEGIN──► [DB] │ Outbox Worker │
│ │ │ (background) │
│ ▼ │ │
│ orders │ ┌──────────┐ │
│ + │ │ outbox │──┼──► [Kafka]
│ outbox │ └──────────┘ │
│ │ └─────────────────┘
│ COMMIT
ACID гарантия
BEGIN;
INSERT INTO orders (...) VALUES (...);
INSERT INTO outbox (aggregate_id, event_type, payload, created_at)
VALUES (1, 'OrderCreated', '{"id":1,"total":100}', now());
COMMIT;

Worker:

func (w *OutboxWorker) Process(ctx context.Context) error {
rows, _ := db.Query(ctx, `
SELECT id, event_type, payload FROM outbox
WHERE published_at IS NULL
ORDER BY id LIMIT 100
FOR UPDATE SKIP LOCKED
`)
for rows.Next() {
var ev Event
rows.Scan(&ev.ID, &ev.Type, &ev.Payload)
if err := w.publisher.Publish(ctx, ev.Type, ev.Payload); err != nil {
continue // retry next iteration
}
db.Exec(ctx, "UPDATE outbox SET published_at = now() WHERE id = $1", ev.ID)
}
return nil
}

Counterpart outbox — на consumer-стороне. Дедуплицировать входящие события.

Kafka → Consumer → Check inbox table
Already processed? → skip
No? → process + record in inbox (in same tx)
BEGIN;
-- Check
SELECT 1 FROM inbox WHERE event_id = $1;
-- Process
INSERT INTO orders ...;
-- Record
INSERT INTO inbox (event_id, processed_at) VALUES ($1, now());
COMMIT;

event_id — unique key события (генерируется producer’ом).

Kafka и большинство брокеров гарантируют at-least-once. Это значит:

  • Сообщение точно дойдёт, но может дублироваться.
  • Consumer должен быть идемпотентным (через inbox или idempotency key).

Exactly-once в Kafka — есть (transactional producer + read_committed consumer), но накладывает performance overhead и требует особой настройки. В реальности at-least-once + idempotent consumer — более популярный подход.

Идея: хранить не текущее состояние, а последовательность событий, которые к нему привели.

Традиционный подход (state-based):

account:
id: 1
balance: 50

Event Sourcing:

events:
1. AccountOpened(id=1, balance=0)
2. Deposited(id=1, amount=100)
3. Withdrawn(id=1, amount=30)
4. Withdrawn(id=1, amount=20)
Current state = apply все events = balance: 50

Преимущества:

  • Audit log бесплатно — каждое изменение записано.
  • Time-travel — состояние на любую дату.
  • Multiple projections — разные read models из одних events.
  • Replay — восстановить state из events (например, после bug).
  • Temporal queries — “сколько было денег вчера?”.

Недостатки:

  • Сложность — не CRUD, требует другого mindset.
  • Eventual consistency — projections отстают.
  • Schema evolution — старые events нужно поддерживать вечно.
  • Performance — replay миллионов events медленный → нужны snapshots.
  • Learning curve — команда должна понимать.

Command Query Responsibility Segregation — раздельные модели для чтения и записи.

┌─────────────┐ ┌─────────────────┐ ┌──────────────┐
│ Client │─Command─► Write side │─Events─►│ Read side │
│ │ │ (commands, │ │ (projections,│
│ │◄─Query──┤ validation) │ │ views) │
└─────────────┘ └─────────────────┘ └──────────────┘
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Write │ │ Read │
│ DB │ │ DBs │
└──────────┘ └──────────┘

Write side:

  • Принимает commands (CreateOrder, UpdateUser).
  • Validation, business logic.
  • Записывает в write DB (или event store).
  • Публикует events.

Read side:

  • Слушает events.
  • Обновляет read models (projections).
  • Read models — denormalized, оптимизированы для конкретного query.

CQRS без Event Sourcing: ок. Просто 2 модели — write (нормализованная) и read (denormalized).

Event Sourcing без CQRS: трудно. Если хранишь events, нужно как-то их читать → projections → де-факто CQRS.

Каноническая комбинация: ES + CQRS.

Outbox/Inbox: почти всегда, если есть микросервисы с messaging.

Event Sourcing:

  • ✓ Финансовые системы (audit требуется).
  • ✓ Bookings, reservations (history важна).
  • ✓ Сложный домен (DDD aggregate с rich behavior).
  • ✗ Простой CRUD (overkill).
  • ✗ Команда не знает ES (learning curve > польза).

CQRS:

  • ✓ Сильно отличающиеся paths чтения/записи.
  • ✓ Высокая нагрузка на чтение (read replicas, materialized views).
  • ✓ Сложные query (full-text search, аналитика).
  • ✗ Маленькое приложение (overhead не окупается).

CREATE TABLE outbox (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
aggregate_type TEXT NOT NULL, -- 'Order', 'User'
aggregate_id TEXT NOT NULL, -- ID агрегата (FK на orders/users — опционально)
event_id UUID NOT NULL UNIQUE, -- для идемпотентности
event_type TEXT NOT NULL, -- 'OrderCreated', 'OrderShipped'
payload JSONB NOT NULL,
headers JSONB, -- trace_id, correlation_id
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ
);
-- Индекс для worker'а
CREATE INDEX idx_outbox_unpublished ON outbox(id) WHERE published_at IS NULL;

Опционально: TTL на старые записи (удалять через 30 дней).

type OrderRepository struct {
pool *pgxpool.Pool
}
func (r *OrderRepository) CreateOrder(ctx context.Context, order Order) error {
tx, err := r.pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
// 1. Insert order
if _, err := tx.Exec(ctx, `
INSERT INTO orders (id, user_id, total, status) VALUES ($1, $2, $3, $4)
`, order.ID, order.UserID, order.Total, order.Status); err != nil {
return err
}
// 2. Insert outbox event
payload, _ := json.Marshal(OrderCreatedEvent{
OrderID: order.ID,
UserID: order.UserID,
Total: order.Total,
})
if _, err := tx.Exec(ctx, `
INSERT INTO outbox (aggregate_type, aggregate_id, event_id, event_type, payload)
VALUES ($1, $2, $3, $4, $5)
`, "Order", order.ID, uuid.New(), "OrderCreated", payload); err != nil {
return err
}
return tx.Commit(ctx)
}
type OutboxWorker struct {
pool *pgxpool.Pool
publisher EventPublisher
interval time.Duration
}
func (w *OutboxWorker) Run(ctx context.Context) error {
ticker := time.NewTicker(w.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := w.processBatch(ctx); err != nil {
log.Printf("outbox process failed: %v", err)
}
}
}
}
func (w *OutboxWorker) processBatch(ctx context.Context) error {
tx, err := w.pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
// Берём батч с SKIP LOCKED — параллельные worker'ы не конкурируют
rows, err := tx.Query(ctx, `
SELECT id, event_id, event_type, payload, headers
FROM outbox
WHERE published_at IS NULL
ORDER BY id
LIMIT 100
FOR UPDATE SKIP LOCKED
`)
if err != nil {
return err
}
var events []outboxEvent
for rows.Next() {
var e outboxEvent
if err := rows.Scan(&e.ID, &e.EventID, &e.Type, &e.Payload, &e.Headers); err != nil {
return err
}
events = append(events, e)
}
rows.Close()
// Публикуем
for _, e := range events {
if err := w.publisher.Publish(ctx, e.Type, e.EventID, e.Payload); err != nil {
return err // не commit'им — попробуем в следующий тик
}
}
// Mark as published
if len(events) > 0 {
ids := make([]int64, len(events))
for i, e := range events {
ids[i] = e.ID
}
if _, err := tx.Exec(ctx, `
UPDATE outbox SET published_at = now() WHERE id = ANY($1)
`, ids); err != nil {
return err
}
}
return tx.Commit(ctx)
}

Pitfalls:

  • FOR UPDATE SKIP LOCKED — несколько worker’ов могут работать параллельно.
  • Если publisher.Publish() кидает, но Kafka реально получил — на следующем тике мы опубликуем повторно → consumer должен быть идемпотентным (через event_id).

Альтернатива polling — Debezium читает WAL Postgres и эмитит change events в Kafka.

[PG]──WAL──►[Debezium]──change events──►[Kafka]

Преимущества:

  • Меньше latency (читает WAL непрерывно).
  • Не нагружает основную БД polling-запросами.
  • Не нужно писать worker.

Недостатки:

  • Сложнее инфра.
  • События не имеют семантической формы — нужно фильтровать (insert в outbox vs другие таблицы).
  • Debezium читает физическую replication slot — нужно мониторить.

Pattern: писать в outbox таблицу, Debezium ловит изменения только этой таблицы.

CREATE TABLE inbox (
event_id UUID PRIMARY KEY,
event_type TEXT NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

Consumer:

func (c *OrderConsumer) Handle(ctx context.Context, event Event) error {
tx, err := c.pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
// Check inbox
var exists bool
if err := tx.QueryRow(ctx, `
SELECT EXISTS(SELECT 1 FROM inbox WHERE event_id = $1)
`, event.ID).Scan(&exists); err != nil {
return err
}
if exists {
return tx.Commit(ctx) // already processed
}
// Process event
switch event.Type {
case "OrderCreated":
if err := c.processOrderCreated(ctx, tx, event); err != nil {
return err
}
}
// Record in inbox
if _, err := tx.Exec(ctx, `
INSERT INTO inbox (event_id, event_type) VALUES ($1, $2)
`, event.ID, event.Type); err != nil {
return err
}
return tx.Commit(ctx)
}
CREATE TABLE events (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type TEXT NOT NULL,
event_type TEXT NOT NULL,
event_version INT NOT NULL, -- порядковый в aggregate
payload JSONB NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (aggregate_id, event_version) -- optimistic concurrency control
);
CREATE INDEX idx_events_aggregate ON events(aggregate_id, event_version);

Concurrency control:

  • При append’е events для aggregate — INSERT с event_version = current + 1.
  • Если параллельно — UNIQUE constraint failure → retry с актуальным version.
type Account struct {
ID uuid.UUID
Balance int64
Version int
pending []Event // not yet persisted
}
func (a *Account) Apply(e Event) {
switch ev := e.(type) {
case AccountOpened:
a.ID = ev.ID
a.Balance = 0
case Deposited:
a.Balance += ev.Amount
case Withdrawn:
a.Balance -= ev.Amount
}
a.Version++
}
func (a *Account) Deposit(amount int64) error {
if amount <= 0 {
return errors.New("amount must be positive")
}
e := Deposited{ID: a.ID, Amount: amount}
a.Apply(e)
a.pending = append(a.pending, e)
return nil
}
func (a *Account) Withdraw(amount int64) error {
if a.Balance < amount {
return errors.New("insufficient funds")
}
e := Withdrawn{ID: a.ID, Amount: amount}
a.Apply(e)
a.pending = append(a.pending, e)
return nil
}

Repository:

func (r *EventStore) Save(ctx context.Context, a *Account) error {
tx, _ := r.pool.Begin(ctx)
defer tx.Rollback(ctx)
for i, e := range a.pending {
payload, _ := json.Marshal(e)
if _, err := tx.Exec(ctx, `
INSERT INTO events (aggregate_id, aggregate_type, event_type, event_version, payload)
VALUES ($1, 'Account', $2, $3, $4)
`, a.ID, eventTypeName(e), a.Version-len(a.pending)+i+1, payload); err != nil {
return err
}
}
a.pending = nil
return tx.Commit(ctx)
}
func (r *EventStore) Load(ctx context.Context, id uuid.UUID) (*Account, error) {
rows, _ := r.pool.Query(ctx, `
SELECT event_type, payload FROM events
WHERE aggregate_id = $1 ORDER BY event_version
`, id)
a := &Account{ID: id}
for rows.Next() {
var etype string
var payload []byte
rows.Scan(&etype, &payload)
e := decodeEvent(etype, payload)
a.Apply(e)
}
return a, nil
}

При длинной истории (>10K events) load становится медленным. Snapshot — сохранённое состояние на момент N-го event:

CREATE TABLE snapshots (
aggregate_id UUID PRIMARY KEY,
aggregate_type TEXT NOT NULL,
version INT NOT NULL,
state JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

Load:

func (r *EventStore) LoadFromSnapshot(ctx context.Context, id uuid.UUID) (*Account, error) {
var version int
var state []byte
err := r.pool.QueryRow(ctx, `
SELECT version, state FROM snapshots WHERE aggregate_id = $1
`, id).Scan(&version, &state)
a := &Account{}
if err == nil {
json.Unmarshal(state, a)
}
rows, _ := r.pool.Query(ctx, `
SELECT event_type, payload FROM events
WHERE aggregate_id = $1 AND event_version > $2 ORDER BY event_version
`, id, version)
for rows.Next() {
var etype string; var payload []byte
rows.Scan(&etype, &payload)
a.Apply(decodeEvent(etype, payload))
}
return a, nil
}

Snapshot создаётся каждые N events (например, каждые 100).

Projections — read models. Слушают events, обновляют read tables.

type OrderListProjection struct {
pool *pgxpool.Pool
}
func (p *OrderListProjection) Handle(ctx context.Context, e Event) error {
switch ev := e.(type) {
case OrderCreated:
_, err := p.pool.Exec(ctx, `
INSERT INTO orders_list_view (order_id, user_id, total, status, created_at)
VALUES ($1, $2, $3, 'pending', $4)
`, ev.OrderID, ev.UserID, ev.Total, ev.CreatedAt)
return err
case OrderShipped:
_, err := p.pool.Exec(ctx, `
UPDATE orders_list_view SET status = 'shipped' WHERE order_id = $1
`, ev.OrderID)
return err
}
return nil
}

Read query идёт напрямую к orders_list_view — быстро, без JOIN’ов.

Write side → PostgreSQL (events, normalized)
events
┌───────┴────────┬──────────┐
▼ ▼ ▼
[orders_view] [analytics] [search]
(PostgreSQL) (ClickHouse) (Elasticsearch)

Каждый read model оптимизирован под свои запросы.

[Client] ──HTTP──► [API Layer]
├─Command──► [Command Handler]
│ │
│ ├─► Load aggregate (Event Store)
│ ├─► Apply business logic
│ ├─► Save events (+ outbox in same tx)
│ └─► Return result
└─Query────► [Query Handler] ──► Read DB (view)
[Outbox Worker] ──► [Kafka] ──► [Projection Workers] ──► Read DBs

3.1 ⚠️ Не пиши в Kafka напрямую из транзакции

Заголовок раздела «3.1 ⚠️ Не пиши в Kafka напрямую из транзакции»
// ПЛОХО
tx.Exec("INSERT INTO orders ...")
kafka.Publish("order.created", order) // вне tx
tx.Commit()

Если Publish уже опубликовал, а Commit упал → event есть, заказа нет.

Используй outbox.

Миллионы published записей → bloat. Удалять старые:

DELETE FROM outbox WHERE published_at < now() - INTERVAL '7 days';

Или partition by month + drop старых partitions.

Outbox + Kafka = at-least-once. Consumer обязан дедуплицировать через event_id.

Inbox table или unique constraint на natural key.

Worker запрашивает БД каждые N секунд. Если outbox пустой 99% времени — лишняя нагрузка.

Решения:

  • LISTEN/NOTIFY в Postgres: producer делает NOTIFY outbox_new, worker слушает.
  • Debezium (CDC) — реагирует на WAL немедленно.
  • Adaptive polling: если ничего нет — увеличить interval.

Polling каждые 5 sec → +5 sec latency для events. Если бизнес требует real-time → Debezium.

Event “OrderCreated v1” в БД 2 года назад. Сейчас v3 с дополнительными полями.

Подходы:

  • Upcasting: при load — transform v1 → v3.
  • Versioned events: OrderCreated_v1, OrderCreated_v2.
  • Backward-compatible: новые поля только optional.

⚠️ Никогда не меняй semantics existing event. Только добавляй новые fields.

Если у тебя users (id, email, name) и единственная операция — UPDATE name → ES бесполезен. Лишний код, лишняя сложность.

ES оправдан, когда бизнес-процесс сложный и история важна.

Read replica latency. Обычно ms, но при больших batch’ах — secs.

UI: “Order created” но в list не появился сразу. Решение:

  • Optimistic UI update (показать сразу, потом перезагрузить).
  • Read-your-writes: после command — query напрямую к write store.

Изменился projection схема? Нужно re-build из всех events. Если их 10M — это часы.

Решения:

  • Параллельная rebuild (несколько workers).
  • Постепенный rollover (blue-green projections).

Сделал snapshot v=100. Потом изменилось aggregate code. Старый snapshot — incompatible.

Версионирование snapshots. При load — проверять compatibility, если не подходит — rebuild from events.

Outbox worker’ы конкурируют за rows. SKIP LOCKED спасает.

SELECT ... FROM outbox WHERE published_at IS NULL
FOR UPDATE SKIP LOCKED LIMIT 100;

Без SKIP LOCKED — worker’ы будут блокировать друг друга.

Kafka гарантирует order внутри partition. Если все события aggregate идут в разные partitions — order ломается.

Partition key = aggregate_id — все события одного aggregate в одной partition.

Worker упал во время обработки batch’а. Не успел UPDATE outbox SET published_at. После reboot повторно опубликует → duplicates (но дедупликация на consumer).

Двойная инфраструктура (write + read DBs), 2 модели в коде. Если read и write paths не сильно отличаются — добавляешь complexity без пользы.

Event store оптимизирован для append + load by aggregate_id. Не для “найди все orders, где total > 1000”. Для query — projections.

Право на удаление (GDPR Article 17) конфликтует с immutable events.

Подходы:

  • Crypto-shredding: события зашифрованы per-user. Удаляешь key → event нечитаем.
  • Tombstone events: маркируется “user deleted”, projections удаляют PII.
  • Erasure: явно удалить events (нарушает immutability, но иногда нужно).

  1. Partition outbox by month для эффективного cleanup.
  2. Index on WHERE published_at IS NULL — partial index.
  3. Batch publish (100-1000 events за раз).
  4. Retry с backoff для publisher failures.
  5. Metrics: outbox size, lag (oldest unpublished age).
  6. Alert если lag > 1 min или size > 10K.
  7. TTL на published events (cleanup).
debezium.conf.json
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "pg-master",
"database.dbname": "orders",
"database.user": "debezium",
"table.include.list": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.fields.additional.placement": "headers:header"
}
}

Debezium SMT (Single Message Transform) для outbox — routes по event_type в правильный topic.

var (
outboxSize = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "outbox_unpublished_count",
})
outboxLag = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "outbox_oldest_unpublished_seconds",
})
outboxPublishDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "outbox_publish_duration_seconds",
})
)
// Periodic update
func updateMetrics(ctx context.Context, pool *pgxpool.Pool) {
var size int64
var lag *float64
pool.QueryRow(ctx, `
SELECT COUNT(*), EXTRACT(EPOCH FROM (now() - MIN(created_at)))
FROM outbox WHERE published_at IS NULL
`).Scan(&size, &lag)
outboxSize.Set(float64(size))
if lag != nil {
outboxLag.Set(*lag)
}
}

Для миллиардов events:

CREATE TABLE events (
...
) PARTITION BY HASH (aggregate_id);
CREATE TABLE events_p0 PARTITION OF events FOR VALUES WITH (MODULUS 16, REMAINDER 0);
-- ... p1...p15

Aggregate всегда в одной partition — load остаётся быстрым.

EventStoreDB — purpose-built event store (от Greg Young). Альтернатива PostgreSQL для ES.

  • Native append-only log.
  • Subscriptions (catchup, persistent).
  • Projections (запрашиваемые проекции через JS).
  • Cluster mode.

В Go: EventStore/EventStore-Client-Go.

client, _ := esdb.NewClient(esdbConfig)
events := []esdb.EventData{{
EventType: "OrderCreated",
Data: payload,
}}
client.AppendToStream(ctx, "Order-"+orderID, esdb.AppendToStreamOptions{}, events...)
1. POST /orders
2. OrderService.CreateOrder
├─ Validate
├─ Begin tx
├─ INSERT INTO orders ...
├─ INSERT INTO outbox (OrderCreated event)
└─ Commit
3. Outbox Worker → Kafka topic "order.events"
4. Subscribers:
├─ InventoryService → reserve items → InventoryReserved event
├─ NotificationService → email "Order received"
└─ AnalyticsProjection → INSERT INTO orders_analytics
Aggregate: Account
Events: AccountOpened, Deposited, Withdrawn, Frozen, Unfrozen
CommandHandler.Withdraw:
├─ Load Account (replay events)
├─ Apply business rule: balance >= amount
├─ Generate Withdrawn event
├─ Append to events table (in tx with outbox)
└─ Publish event
Projections:
- account_balances (current balance per account)
- transaction_history (denormalized list)
- monthly_statements (aggregated)
- fraud_detection (rule-based)

Write API (commands) и Read API (queries) можно деплоить как разные сервисы:

  • Write service: low traffic, complex logic.
  • Read service: high traffic, simple logic, scale horizontally.
Write: 2 pods (HA)
Read: 20 pods (load)

Outbox tests:

  • Integration test: tx COMMIT → event в outbox.
  • Worker test: outbox row → publish called.
  • Failure test: publish error → row остаётся unpublished.

Event Sourcing tests:

  • Aggregate unit tests: command → expected events.
  • Replay tests: events → expected state.
  • Projection tests: events → expected read model.
func TestAccount_Withdraw(t *testing.T) {
a := &Account{}
a.Apply(AccountOpened{ID: "1"})
a.Apply(Deposited{Amount: 100})
err := a.Withdraw(50)
require.NoError(t, err)
require.Equal(t, int64(50), a.Balance)
require.Len(t, a.pending, 1)
err = a.Withdraw(100) // insufficient
require.Error(t, err)
}

Outbox: published_at > 30 days → archive to S3 / delete. Events: usually keep forever (audit). Если really много — archive cold events.


  1. Что такое dual-write problem? Запись в 2 системы (DB + queue) без atomicity → возможна inconsistency: одна успешна, другая нет.

  2. Как Outbox pattern решает dual-write? Event пишется в outbox table в той же транзакции с бизнес-данными. Отдельный процесс читает outbox и публикует event. Atomicity гарантирована.

  3. Какие есть способы реализации Outbox worker? Polling (запрос к outbox раз в N сек) или CDC (Debezium читает WAL).

  4. Чем CDC лучше polling? Меньше latency (читает изменения немедленно), меньше нагрузки на основную БД. Сложнее в setup.

  5. Что такое Inbox pattern? Counterpart Outbox на consumer стороне. Дедупликация входящих events через таблицу processed event IDs.

  6. Что такое at-least-once delivery? Гарантия, что message доставлен минимум 1 раз. Может быть несколько раз — consumer должен быть идемпотентным.

  7. Можно ли exactly-once? Kafka поддерживает (transactional producer + read_committed consumer), но с overhead. Обычно проще at-least-once + idempotent consumer.

  8. Что такое Event Sourcing? Хранение последовательности events вместо текущего state. State получается replay’ем events.

  9. Преимущества Event Sourcing? Полный audit log, time-travel, multiple projections из тех же events, replay для bug-fix.

  10. Недостатки Event Sourcing? Сложность, eventual consistency, schema evolution, learning curve, performance (replay long histories).

  11. Что такое snapshot в Event Sourcing? Сохранённое state на момент N-го event. Load: snapshot + events after N. Ускоряет load.

  12. Что такое CQRS? Command Query Responsibility Segregation. Раздельные модели для write (commands) и read (queries).

  13. Можно ли CQRS без Event Sourcing? Да. Просто 2 модели — write (normalized) и read (denormalized). Часто используется в legacy migration.

  14. Можно ли Event Sourcing без CQRS? Технически да, но непрактично. Query по events напрямую — медленно. ES → projections → де-факто CQRS.

  15. Когда использовать Event Sourcing? Финансы, bookings, complex domain с DDD. Когда audit нужен. Когда нужна история состояний.

  16. Когда НЕ использовать Event Sourcing? Простой CRUD. Команда не знает ES. Нет требования к audit.

  17. Что такое projection? Read model в CQRS. Обновляется по events. Оптимизирован для конкретного query.

  18. Как обновлять projection при изменении схемы? Rebuild из events (если есть event log). Параллельно — blue-green deployment projections.

  19. Какие гарантии порядка дают events? В Kafka — внутри partition. Partition key = aggregate_id → события одного aggregate в порядке.

  20. Что такое concurrency control в Event Store? Optimistic: UNIQUE constraint на (aggregate_id, event_version). Если параллельный append — UNIQUE violation → retry.

  21. Как решать problem удаления данных (GDPR) в Event Store? Crypto-shredding (зашифрованные events per-user, удаление key). Или tombstone + erase из projections.

  22. Что такое event versioning? Events evolve со временем. Подходы: upcasting (transform old → new при load), versioned events (Event_v1, Event_v2), backward-compatible (только optional fields).

  23. Чем outbox размер worry? Bloat (раздувание таблицы). TTL на published events. Партиционирование по дате.

  24. Как обеспечить exactly-once с Outbox? Невозможно строго. Outbox + Kafka = at-least-once. Consumer должен быть идемпотентным через event_id + inbox.

  25. Что такое eventual consistency в контексте CQRS? Между write и read models — задержка (ms-secs). UI должен обрабатывать “только что записал, ещё не виден в list”.


Таблица outbox + worker (Go). Тест: создать order → проверить, что event опубликован в Kafka (или log).

2 сервиса. Service A пишет в outbox → Worker → Kafka → Service B Consumer → Inbox check → process. Симулировать duplicate event — проверить дедупликацию.

Aggregate Account с командами Open, Deposit, Withdraw. Events: AccountOpened, Deposited, Withdrawn. Repository: append events, replay events. Тесты бизнес-правил.

Расширить задачу 3. После каждых 50 events — snapshot. Load использует snapshot + events after.

Write side: PostgreSQL (orders). Read side: ElasticSearch (orders search index). Projection updater слушает events из Kafka.

Запустить Postgres + Debezium + Kafka в Docker. Сконфигурировать outbox connector. Проверить, что INSERT в outbox → message в Kafka.

Реализовать upcasting: OrderCreated v1 (без currency field) → v2 (с currency, default USD). При load старых events — добавлять default.

Каждый user имеет AES key. Сохранённые events для user — encrypted by his key. Delete user = delete key. Проверить, что после delete события расшифровать нельзя.


  1. Microservices Patterns by Chris Richardson — Outbox pattern, Saga, CQRS.
  2. Designing Data-Intensive Applications by Martin Kleppmann — Chapter 11 (Stream Processing) и connections с Event Sourcing.
  3. Implementing Domain-Driven Design by Vaughn Vernon — DDD + Event Sourcing.
  4. Event Sourcing by Martin Fowler — https://martinfowler.com/eaaDev/EventSourcing.html
  5. CQRS by Martin Fowler — https://martinfowler.com/bliki/CQRS.html
  6. Debezium Documentationhttps://debezium.io/documentation/
  7. EventStoreDB Documentationhttps://developers.eventstore.com/
  8. Greg Young Talks на YouTube — отец Event Sourcing.
  9. Confluent Blog: Outbox Patternhttps://www.confluent.io/blog/transactional-outbox-pattern/
  10. microservices.io: Transactional Outboxhttps://microservices.io/patterns/data/transactional-outbox.html