Перейти к содержимому

Outbox, Inbox и CDC (Change Data Capture)

Зачем знать на Middle 3: Dual-write problem — это классическая трагедия микросервисов: записал в DB → отправил в Kafka. Что если первое успешно, а второе упало? Outbox pattern решает это атомарно через одну транзакцию. Inbox — защита потребителя. CDC через Debezium — современная замена ручного polling. На уровне Senior: реализуешь outbox с polling worker или CDC, понимаешь Debezium internals, выбираешь между pgoutput и trigger-based, обрабатываешь schema evolution в потоке events.

  1. Концепция (Outbox, Inbox, CDC)
  2. Глубже / production-практики
  3. Gotchas
  4. Real cases
  5. Вопросы (25)
  6. Practice
  7. Источники

func CreateOrder(ctx, order) error {
if err := db.SaveOrder(ctx, order); err != nil { return err }
if err := kafka.Publish(ctx, "OrderCreated", order); err != nil { return err }
return nil
}

Что может пойти не так:

  1. DB save OK → publish OK → ✓ (happy path).
  2. DB save fail → publish skipped → ✓ (consistent).
  3. DB save OK → publish fail → ✗ (DB has order, but no event).
  4. DB save OK → publish OK, но crash до return → возможен retry, duplicate publish.

Случай 3 — самый коварный. У вас в БД появился order, downstream services не знают. Например, inventory не уменьшился, payment не запустился.

Невозможно атомарно записать в две разные системы без distributed transactions (которых хотим избегать).

Записать в одну DB и факт изменения, и факт «нужно отправить event». Атомарность гарантирована единой транзакцией.

BEGIN;
INSERT INTO orders (...) VALUES (...);
INSERT INTO outbox (event_type, payload) VALUES ('OrderCreated', {...});
COMMIT;

Отдельный процесс (polling worker или CDC) читает outbox и публикует в Kafka.

┌──────────────┐
│ App │
│ ┌────────┐ │
│ │ TX: │ │
│ │ orders │ │
│ │ outbox │ │
│ └────────┘ │
└───┬──────────┘
┌──────────────┐ ┌──────────────┐
│ Postgres │ │ Kafka │
│ - orders │ │ │
│ - outbox │ ──worker│ │
└──────────────┘ poll └──────────────┘
OR CDC

На потребительской стороне: deduplication через идempотентный store.

[Receive event from Kafka]
[Check inbox table: have I processed this event_id?]
- Yes → ack, skip
- No → process + INSERT INTO inbox in same transaction
[ack to Kafka]

Inbox table:

CREATE TABLE inbox (
event_id UUID PRIMARY KEY,
processed_at TIMESTAMPTZ DEFAULT now()
);

Concept: вместо polling outbox table, читать DB transaction log (WAL для Postgres, binlog для MySQL) и автоматически публиковать changes.

Преимущества:

  • Real-time (≤ 1 sec latency).
  • No polling overhead.
  • All table changes captured автоматически.
  • Не требует outbox table — можно публиковать changes в orders table напрямую.

Недостатки:

  • Tight coupling между internal schema и downstream events.
  • Сложнее operate (CDC connectors, schema registry).

CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
aggregate_id UUID NOT NULL,
aggregate_type VARCHAR(64) NOT NULL,
event_type VARCHAR(128) NOT NULL,
payload JSONB NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ,
INDEX idx_outbox_unpublished (id) WHERE published_at IS NULL
);

Partial index WHERE published_at IS NULL — только unpublished для эффективного polling.

func runOutboxPoller(db *sql.DB, kafka KafkaProducer) {
ticker := time.NewTicker(100 * time.Millisecond)
for range ticker.C {
rows, err := db.Query(`
SELECT id, aggregate_id, event_type, payload
FROM outbox
WHERE published_at IS NULL
ORDER BY id
LIMIT 100
FOR UPDATE SKIP LOCKED
`)
if err != nil { continue }
for rows.Next() {
var id int64
var aggID, evtType string
var payload []byte
rows.Scan(&id, &aggID, &evtType, &payload)
err := kafka.Publish(evtType, aggID, payload)
if err != nil { continue }
db.Exec(`UPDATE outbox SET published_at=now() WHERE id=$1`, id)
}
rows.Close()
}
}

⚠️ FOR UPDATE SKIP LOCKED — критично. Без него, если worker зацепил row и медленно обрабатывает, другой worker будет блокирован.

⚠️ LIMIT 100 — batch size. Слишком маленький — много round-trips. Слишком большой — long transaction, lock contention.

Outbox дает at-least-once delivery:

  • Worker может опубликовать в Kafka и упасть до UPDATE published_at.
  • На restart — re-publish. Duplicate event.

Поэтому consumers должны быть idempotent (через inbox или idempotent business logic).

Published rows растут. Cleanup:

DELETE FROM outbox WHERE published_at < now() - INTERVAL '7 days';

Или партиционирование по дате с DROP старых.

⚠️ Слишком быстрый cleanup может попасть в race: только что published, но downstream хочет re-read для replay. Compromise: 7–30 дней retention.

config := &kafka.ConfigMap{
"bootstrap.servers": "kafka:9092",
"enable.idempotence": true, // ← critical
"acks": "all",
"retries": "10",
"max.in.flight.requests.per.connection": "5",
}

enable.idempotence=true:

  • Kafka assigns producer ID + sequence number per partition.
  • Дубликаты с тем же ID/sequence не записываются.
  • Гарантия exactly-once внутри Kafka (но не end-to-end).
func consumeEvent(ctx, msg KafkaMessage) error {
tx, _ := db.BeginTx(ctx, nil)
defer tx.Rollback()
var exists bool
err := tx.QueryRow(`SELECT EXISTS(SELECT 1 FROM inbox WHERE event_id=$1)`, msg.Key).Scan(&exists)
if err != nil { return err }
if exists {
return tx.Commit() // already processed, ack без processing
}
// Process: update business state
if err := processBusiness(ctx, tx, msg); err != nil { return err }
// Mark as processed
_, err = tx.Exec(`INSERT INTO inbox (event_id) VALUES ($1)`, msg.Key)
if err != nil { return err }
return tx.Commit()
}

⚠️ Inbox + business logic в одной транзакции. Это даёт «process exactly once или fully rolled back».

⚠️ Cleanup inbox: для bounded TTL после которого re-delivery невозможен. Обычно 7–30 дней.

Debezium — open-source CDC platform (Red Hat, теперь IBM). Java-based (но downstream consumers могут быть на любом языке).

Architecture:

┌──────────────────┐ reads transaction log ┌──────────────────┐
│ Postgres │ ────────────────────→ │ Debezium │
│ - WAL │ │ Connector │
│ - logical │ │ (Kafka Connect) │
│ replication │ └────────┬─────────┘
└──────────────────┘ │ produces
┌──────────────────┐
│ Kafka │
│ - topic │
│ `dbserver1. │
│ public.orders`│
└──────────────────┘

Supports:

  • PostgreSQL (через logical replication, plugin: pgoutput или wal2json).
  • MySQL (через binlog).
  • MongoDB (change streams).
  • SQL Server (CDC feature).
  • Oracle, Db2, Cassandra (более experimental).

Setup:

-- enable logical replication
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_wal_senders = 10;
ALTER SYSTEM SET max_replication_slots = 10;
-- restart Postgres
CREATE PUBLICATION debezium_pub FOR ALL TABLES;
CREATE USER debezium WITH REPLICATION PASSWORD '...';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;

Connector config (JSON):

{
"name": "orders-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "...",
"database.dbname": "shop",
"database.server.name": "dbserver1",
"table.include.list": "public.orders,public.payments",
"plugin.name": "pgoutput",
"publication.name": "debezium_pub"
}
}

Result: каждое изменение в orders → Kafka topic dbserver1.public.orders с CDC event:

{
"op": "c", // c=create, u=update, d=delete, r=read (snapshot)
"before": null,
"after": {"id": 123, "status": "pending", "total": 99.99},
"ts_ms": 1716000000000,
"source": {"lsn": "0/12345", "txId": 999, ...}
}
  • Default: JSON, Avro, или Protobuf (через converter setting).
  • Schema Registry integration: Avro/Proto + schema versioning.

Структура events: op, before, after, source, ts_ms. Schema registry хранит схему таблицы.

⚠️ Schema changes (ALTER TABLE) — Debezium emits schema change event в отдельный topic. Consumers должны handle.

При первом старте Debezium делает snapshot всех target tables → publishes в Kafka. Потом switches на live WAL streaming.

Snapshot может быть:

  • initial (default): full snapshot then streaming.
  • never: только streaming (миссит historical data).
  • when_needed: snapshot if offset lost.

⚠️ Initial snapshot для large table может занять часы и блокирует table writes (зависит от Postgres version & isolation level).

Debezium tracks LSN (Log Sequence Number) в Kafka Connect offsets topic. На restart resume from last commit.

⚠️ Если Debezium offsets потеряны (e.g., Connect cluster wipe), нужен новый snapshot или manual offset rewind.

Если не хочется Java/Kafka Connect, можно read PG logical replication directly в Go:

import "github.com/jackc/pglogrepl"
func main() {
conn, _ := pgconn.Connect(ctx, "postgres://...?replication=database")
pglogrepl.CreateReplicationSlot(ctx, conn, "my_slot", "pgoutput", ...)
pglogrepl.StartReplication(ctx, conn, "my_slot", pglogrepl.LSN(0), ...)
for {
msg, _ := conn.ReceiveMessage(ctx)
if msg, ok := msg.(*pgproto3.CopyData); ok {
xld, _ := pglogrepl.ParseXLogData(msg.Data[1:])
// parse xld.WALData — это logical decoding output
}
}
}

Pros: pure Go, no Kafka Connect. Cons: reimplement Debezium (schema handling, snapshot, retry).

Production use case: targeted CDC для one specific use (e.g., trigger cache invalidation), не enterprise-wide.

  1. Microservice integration: order service writes в свою DB → Debezium publishes → other services subscribe.
  2. Materialized views in data lake: real-time data warehouse (S3 + Parquet + Athena).
  3. Cache invalidation: PG → Debezium → invalidate Redis keys.
  4. Search index sync: PG → Debezium → ElasticSearch.
  5. Audit log: capture all changes for compliance.
  6. Data replication between regions.

Подход: писать в outbox table, использовать Debezium для capture changes.

[App] → INSERT orders, INSERT outbox (one TX) → Postgres
[Postgres WAL] → Debezium → Kafka topic `outbox`

Преимущества:

  • Атомарность через transaction.
  • Real-time delivery через Debezium (без polling overhead).
  • Schema независимо от business tables.

Это EventBridge pattern или transactional outbox via CDC.

⚠️ Debezium routing single message transformer (SMT) может выпрямить outbox → правильный topic per event_type.

  • Outbox + CDC: для domain events, чёткий контракт с downstream.
  • Direct CDC (no outbox): для data sync (warehouse, cache). Tight coupling acceptable.

В одной системе обычно используется оба паттерна для разных целей.

В production payload event лучше дополнить метаданными для трассировки и обработки:

INSERT INTO outbox (aggregate_id, event_type, payload, metadata)
VALUES (
$1, $2, $3,
jsonb_build_object(
'trace_id', $4,
'user_id', $5,
'correlation_id', $6,
'causation_id', $7,
'occurred_at', now(),
'source_service', 'order-service',
'version', 'v2'
)
);

При publish в Kafka — metadata едет в headers, payload — в value. Это позволяет downstream filtering без deserialization full payload.

err := kafka.Publish(&kafka.Message{
Key: []byte(aggregateID),
Value: payload,
Headers: []kafka.Header{
{Key: "trace_id", Value: []byte(traceID)},
{Key: "event_type", Value: []byte(evtType)},
{Key: "version", Value: []byte("v2")},
},
})

Kafka гарантирует order per partition. Если ordering важен per aggregate (типично):

// Используем aggregate_id как partition key
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Key: []byte(aggregateID), // hash(key) → partition
Value: payload,
}

⚠️ Если aggregate содержит другие aggregate IDs (Order содержит UserID), и downstream нужен ordering per User — это конфликт. Выбираем dominant ordering, добавляем sequence number в payload для resolution.

⚠️ При rebalance Kafka consumer группы — повторная доставка возможна (consumer не commit-нул offset). Поэтому inbox или idempotent business logic обязательны.

Single worker reads outbox table. Если throughput не хватает:

  1. Sharding: добавить колонку shard в outbox, например aggregate_id % 10. Каждый worker обрабатывает свои shard-ы. Distributes load.
  2. Polling parallelism через SKIP LOCKED: несколько workers одновременно — каждый берёт rows которые другие не lock-нули. Простейшее scaling.
  3. CDC вместо polling: убирает polling overhead, real-time.
// Sharded polling
const shardCount = 10
func workerForShard(shard int) {
for {
rows, _ := db.Query(`
SELECT id, payload FROM outbox
WHERE published_at IS NULL AND shard = $1
ORDER BY id LIMIT 100
FOR UPDATE SKIP LOCKED
`, shard)
// process...
}
}
// Start one worker per shard
for i := 0; i < shardCount; i++ {
go workerForShard(i)
}

Ключевые метрики:

  • Outbox lag: count WHERE published_at IS NULL. Растёт → worker не успевает.
  • Outbox age: max(now() - created_at) WHERE published_at IS NULL. Старая unpublished строка → stuck.
  • Publish errors: counter per error type.
  • CDC lag: max(now() - source.ts_ms) в downstream consumer.
  • Replication slot lag: pg_replication_slots.confirmed_flush_lsn vs current WAL LSN.
-- Postgres query для outbox lag monitoring
SELECT
COUNT(*) AS unpublished_total,
MAX(EXTRACT(EPOCH FROM (now() - created_at))) AS oldest_seconds
FROM outbox
WHERE published_at IS NULL;

Export через pg_exporter или custom Prometheus metric.

Alerts:

  • unpublished_total > 10000 for 5 min → page on-call.
  • oldest_seconds > 300 → critical, означает worker dead.
  • replication_slot_lag > 1GB → CDC consumer behind.

Integration test для outbox:

func TestOutboxAtomicity(t *testing.T) {
// Setup PG container
db := setupTestDB(t)
// Simulate transaction
tx, _ := db.Begin()
tx.Exec("INSERT INTO orders (id, total) VALUES ($1, $2)", "order-1", 100)
tx.Exec("INSERT INTO outbox (...) VALUES (...)")
// Crash before commit — both rolled back
tx.Rollback()
// Verify nothing committed
var count int
db.QueryRow("SELECT COUNT(*) FROM orders").Scan(&count)
assert.Equal(t, 0, count)
db.QueryRow("SELECT COUNT(*) FROM outbox").Scan(&count)
assert.Equal(t, 0, count)
}

Для Debezium: testcontainers-go с Kafka + Debezium image. End-to-end test от INSERT до Kafka message receipt.


⚠️ FOR UPDATE SKIP LOCKED обязательно для multi-worker polling. Без — workers взаимно блокируются.

⚠️ At-least-once → нужен inbox на consumer. Без — duplicates ломают бизнес-логику.

⚠️ Kafka idempotent producer — only intra-Kafka. Не помогает если ваш polling worker re-publishes after crash.

⚠️ Outbox cleanup race с downstream replay. Compromise on retention.

⚠️ Debezium initial snapshot blocks writes depending on Postgres version. Plan downtime window.

⚠️ WAL retention. Postgres max_wal_size + replication slot may holdить старый WAL пока Debezium не consume. Disk full risk!

⚠️ Replication slot abandoned. Если Debezium умер и slot остался — WAL накапливается. Monitor pg_replication_slots.active = false.

⚠️ Schema changes (ALTER TABLE) могут поломать Debezium. ADD COLUMN obычно ОК; DROP COLUMN, type change — careful.

⚠️ Big rows (>1 МБ): Kafka default message size 1 МБ. Need max.message.bytes increase.

⚠️ Debezium reorders events? No — preserves transaction order per partition.

⚠️ TOAST values in Postgres (large columns stored separately) — Debezium может миссить changes если column not in REPLICA IDENTITY. Use REPLICA IDENTITY FULL for affected tables.

⚠️ Inbox table growth. Bounded TTL essential. After TTL — if duplicate arrives, double-process.

⚠️ Ordering across aggregates. Kafka guarantees order within partition. If 2 entities related and you partition by entity_id, related entity events можно reorder.

⚠️ Outbox table contention. High write throughput → outbox table is hot. Index design matters; consider partitioning.

⚠️ JSON payload size. Don’t embed full state, only what changed.


Симптом: Order created, customer charged, но inventory не уменьшился. Кейс случается раз в неделю на peak hours.

Root cause: app записывает в orders DB, потом publishes Kafka event. Под нагрузкой Kafka periodically times out → event lost, но DB write committed.

Fix: outbox table + polling worker. Атомарность гарантирована, retry-able.

Outcome: incidents → zero за 6 месяцев.

Контекст: Debezium connector умер ночью, никто не заметил weekend.

Symptom: Postgres disk 95%, alerts fired.

Anal: pg_replication_slots показал slot debezium_slot неактивен 48h. WAL накопился 200 ГБ.

Fix:

  1. Restart Debezium → resume consuming.
  2. Set up alert pg_replication_slots.active = false для > 5 минут.
  3. Set max_slot_wal_keep_size = 50GB (PG 13+) — limit WAL retention per slot.

Контекст: Renamed column customer_iduser_id in production DB.

Result: Debezium emit-нул schema change event. Downstream consumers expecting customer_id failed.

Fix:

  • Process: never rename — always add new + dual-write + drop old.
  • Migration: ADD COLUMN user_id, dual-write 2 weeks, DROP customer_id.

Контекст: High-throughput service, 5000 events/sec. Outbox table становится bottleneck.

Anal: vacuum lagging behind, table size growing.

Fix:

  • Partition outbox by hour: outbox_2026_05_21_14.
  • DROP partitions older than 7 days.
  • Per-partition VACUUM fast.

Симптом: Order processed дважды. Customer charged twice.

Anal:

  • Event A came at 10:00, processed, written to inbox.
  • Inbox TTL = 24h. Cleaned at 10:00 next day.
  • Same event A came at 10:01 next day (retry from broker). Inbox doesn’t know.
  • Double-processed.

Fix: increased TTL to 7 days. Plus moved deduplication к idempotent business logic level (e.g. unique constraint on (order_id, payment_id)).


  1. Что такое dual-write problem? Дайте пример.
  2. Опишите Outbox pattern: что хранится, как читается.
  3. Что такое Inbox pattern и где он на consumer side?
  4. CDC: что capture-ит и как?
  5. Структура outbox table (SQL).
  6. Зачем FOR UPDATE SKIP LOCKED в polling worker?
  7. At-least-once vs exactly-once в outbox: что вы получаете?
  8. Kafka enable.idempotence=true — что гарантирует?
  9. Inbox + business logic в одной транзакции — зачем?
  10. Опишите архитектуру Debezium для Postgres.
  11. Что такое pgoutput vs wal2json?
  12. Initial snapshot в Debezium — что делает, какие проблемы?
  13. Что такое LSN в Postgres?
  14. Replication slot — что это, чем опасен заброшенный?
  15. REPLICA IDENTITY FULL — зачем?
  16. pglogrepl в Go — когда оправдано вместо Debezium?
  17. Outbox + Debezium hybrid: как работает.
  18. Outbox cleanup retention — какой выбрать и почему?
  19. Inbox TTL — какие риски короткого vs длинного?
  20. Schema change в Debezium: как handle.
  21. Reordering events: гарантия Kafka per partition.
  22. PG WAL bloat при abandoned slot — мониторинг.
  23. Big row > 1 МБ в Kafka — что настроить?
  24. Кейс «dual-write incident» — как outbox решает.
  25. Outbox vs direct CDC: когда какой подход?

Задача 1: Реализовать outbox table + polling worker в Go. Publish в Kafka.

Задача 2: Симулировать crash worker mid-publish (kill -9). Restart, проверить, что нет lost events, но возможны duplicates.

Задача 3: Реализовать inbox на consumer side. Тест: подать дублированный event 5 раз, проверить, что бизнес-логика выполнилась 1 раз.

Задача 4: Поднять Postgres + Debezium + Kafka через docker-compose. Tablе orders → Kafka topic.

Задача 5: Написать Go consumer для CDC events. Обработать op=‘c’ (create) и op=‘u’ (update).

Задача 6: Сделать schema change (ADD COLUMN) и проверить, как Debezium это handle.

Задача 7: Реализовать pglogrepl-based CDC в Go (без Debezium). Compare complexity.

Задача 8: Outbox cleanup background job, retention 7 дней.

Задача 9: Партиционировать outbox table по часам, автоматизировать DROP старых partitions.

Задача 10 (advanced): Outbox + Debezium hybrid. Routing SMT в Debezium для маршрутизации события в правильный topic.


  1. Chris Richardson, “Microservices Patterns”, Manning, 2018 — глава об outbox.
  2. Microservices.io: Transactional Outbox, https://microservices.io/patterns/data/transactional-outbox.html
  3. Debezium Documentation, https://debezium.io/documentation/
  4. Gunnar Morling, “Reliable Microservices Data Exchange With the Outbox Pattern”, 2019.
  5. Confluent Blog, “Implementing the Outbox Pattern”, 2020.
  6. PostgreSQL Logical Replication documentation.
  7. Jacks Bramos, “Architecting Data-Intensive Applications with PostgreSQL and Debezium”.
  8. pglogrepl: github.com/jackc/pglogrepl
  9. Kafka producer idempotence docs.
  10. Martin Kleppmann, “Designing Data-Intensive Applications”, chapters 11–12.
  11. Andy Pavlo, “Database Logging” lecture, CMU CS445.
  12. Pat Helland, “Life Beyond Distributed Transactions”, 2007.
  13. AWS DMS (Database Migration Service) — alternative CDC.
  14. ScyllaDB CDC docs (NoSQL CDC example).
  15. The Twelve-Factor App — III. Config (related для config of outbox workers).