Outbox, Inbox и CDC (Change Data Capture)
Зачем знать на Middle 3: Dual-write problem — это классическая трагедия микросервисов: записал в DB → отправил в Kafka. Что если первое успешно, а второе упало? Outbox pattern решает это атомарно через одну транзакцию. Inbox — защита потребителя. CDC через Debezium — современная замена ручного polling. На уровне Senior: реализуешь outbox с polling worker или CDC, понимаешь Debezium internals, выбираешь между pgoutput и trigger-based, обрабатываешь schema evolution в потоке events.
Содержание
Заголовок раздела «Содержание»- Концепция (Outbox, Inbox, CDC)
- Глубже / production-практики
- Gotchas
- Real cases
- Вопросы (25)
- Practice
- Источники
1. Концепция
Заголовок раздела «1. Концепция»1.1 Dual-write problem
Заголовок раздела «1.1 Dual-write problem»func CreateOrder(ctx, order) error { if err := db.SaveOrder(ctx, order); err != nil { return err } if err := kafka.Publish(ctx, "OrderCreated", order); err != nil { return err } return nil}Что может пойти не так:
- DB save OK → publish OK → ✓ (happy path).
- DB save fail → publish skipped → ✓ (consistent).
- DB save OK → publish fail → ✗ (DB has order, but no event).
- DB save OK → publish OK, но crash до return → возможен retry, duplicate publish.
Случай 3 — самый коварный. У вас в БД появился order, downstream services не знают. Например, inventory не уменьшился, payment не запустился.
Невозможно атомарно записать в две разные системы без distributed transactions (которых хотим избегать).
1.2 Outbox pattern: основная идея
Заголовок раздела «1.2 Outbox pattern: основная идея»Записать в одну DB и факт изменения, и факт «нужно отправить event». Атомарность гарантирована единой транзакцией.
BEGIN; INSERT INTO orders (...) VALUES (...); INSERT INTO outbox (event_type, payload) VALUES ('OrderCreated', {...});COMMIT;Отдельный процесс (polling worker или CDC) читает outbox и публикует в Kafka.
┌──────────────┐│ App ││ ┌────────┐ ││ │ TX: │ ││ │ orders │ ││ │ outbox │ ││ └────────┘ │└───┬──────────┘ │ ▼┌──────────────┐ ┌──────────────┐│ Postgres │ │ Kafka ││ - orders │ │ ││ - outbox │ ──worker│ │└──────────────┘ poll └──────────────┘ OR CDC1.3 Inbox pattern
Заголовок раздела «1.3 Inbox pattern»На потребительской стороне: deduplication через идempотентный store.
[Receive event from Kafka][Check inbox table: have I processed this event_id?] - Yes → ack, skip - No → process + INSERT INTO inbox in same transaction[ack to Kafka]Inbox table:
CREATE TABLE inbox ( event_id UUID PRIMARY KEY, processed_at TIMESTAMPTZ DEFAULT now());1.4 CDC (Change Data Capture)
Заголовок раздела «1.4 CDC (Change Data Capture)»Concept: вместо polling outbox table, читать DB transaction log (WAL для Postgres, binlog для MySQL) и автоматически публиковать changes.
Преимущества:
- Real-time (≤ 1 sec latency).
- No polling overhead.
- All table changes captured автоматически.
- Не требует outbox table — можно публиковать changes в
orderstable напрямую.
Недостатки:
- Tight coupling между internal schema и downstream events.
- Сложнее operate (CDC connectors, schema registry).
2. Глубже / production-практики
Заголовок раздела «2. Глубже / production-практики»2.1 Outbox table structure
Заголовок раздела «2.1 Outbox table structure»CREATE TABLE outbox ( id BIGSERIAL PRIMARY KEY, aggregate_id UUID NOT NULL, aggregate_type VARCHAR(64) NOT NULL, event_type VARCHAR(128) NOT NULL, payload JSONB NOT NULL, metadata JSONB, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), published_at TIMESTAMPTZ,
INDEX idx_outbox_unpublished (id) WHERE published_at IS NULL);Partial index WHERE published_at IS NULL — только unpublished для эффективного polling.
2.2 Polling worker
Заголовок раздела «2.2 Polling worker»func runOutboxPoller(db *sql.DB, kafka KafkaProducer) { ticker := time.NewTicker(100 * time.Millisecond) for range ticker.C { rows, err := db.Query(` SELECT id, aggregate_id, event_type, payload FROM outbox WHERE published_at IS NULL ORDER BY id LIMIT 100 FOR UPDATE SKIP LOCKED `) if err != nil { continue }
for rows.Next() { var id int64 var aggID, evtType string var payload []byte rows.Scan(&id, &aggID, &evtType, &payload)
err := kafka.Publish(evtType, aggID, payload) if err != nil { continue }
db.Exec(`UPDATE outbox SET published_at=now() WHERE id=$1`, id) } rows.Close() }}⚠️ FOR UPDATE SKIP LOCKED — критично. Без него, если worker зацепил row и медленно обрабатывает, другой worker будет блокирован.
⚠️ LIMIT 100 — batch size. Слишком маленький — много round-trips. Слишком большой — long transaction, lock contention.
2.3 At-least-once semantics
Заголовок раздела «2.3 At-least-once semantics»Outbox дает at-least-once delivery:
- Worker может опубликовать в Kafka и упасть до UPDATE published_at.
- На restart — re-publish. Duplicate event.
Поэтому consumers должны быть idempotent (через inbox или idempotent business logic).
2.4 Cleanup outbox
Заголовок раздела «2.4 Cleanup outbox»Published rows растут. Cleanup:
DELETE FROM outbox WHERE published_at < now() - INTERVAL '7 days';Или партиционирование по дате с DROP старых.
⚠️ Слишком быстрый cleanup может попасть в race: только что published, но downstream хочет re-read для replay. Compromise: 7–30 дней retention.
2.5 Idempotent producer (Kafka)
Заголовок раздела «2.5 Idempotent producer (Kafka)»config := &kafka.ConfigMap{ "bootstrap.servers": "kafka:9092", "enable.idempotence": true, // ← critical "acks": "all", "retries": "10", "max.in.flight.requests.per.connection": "5",}enable.idempotence=true:
- Kafka assigns producer ID + sequence number per partition.
- Дубликаты с тем же ID/sequence не записываются.
- Гарантия exactly-once внутри Kafka (но не end-to-end).
2.6 Inbox table в действии
Заголовок раздела «2.6 Inbox table в действии»func consumeEvent(ctx, msg KafkaMessage) error { tx, _ := db.BeginTx(ctx, nil) defer tx.Rollback()
var exists bool err := tx.QueryRow(`SELECT EXISTS(SELECT 1 FROM inbox WHERE event_id=$1)`, msg.Key).Scan(&exists) if err != nil { return err } if exists { return tx.Commit() // already processed, ack без processing }
// Process: update business state if err := processBusiness(ctx, tx, msg); err != nil { return err }
// Mark as processed _, err = tx.Exec(`INSERT INTO inbox (event_id) VALUES ($1)`, msg.Key) if err != nil { return err }
return tx.Commit()}⚠️ Inbox + business logic в одной транзакции. Это даёт «process exactly once или fully rolled back».
⚠️ Cleanup inbox: для bounded TTL после которого re-delivery невозможен. Обычно 7–30 дней.
2.7 Debezium: что это
Заголовок раздела «2.7 Debezium: что это»Debezium — open-source CDC platform (Red Hat, теперь IBM). Java-based (но downstream consumers могут быть на любом языке).
Architecture:
┌──────────────────┐ reads transaction log ┌──────────────────┐│ Postgres │ ────────────────────→ │ Debezium ││ - WAL │ │ Connector ││ - logical │ │ (Kafka Connect) ││ replication │ └────────┬─────────┘└──────────────────┘ │ produces ▼ ┌──────────────────┐ │ Kafka │ │ - topic │ │ `dbserver1. │ │ public.orders`│ └──────────────────┘Supports:
- PostgreSQL (через logical replication, plugin: pgoutput или wal2json).
- MySQL (через binlog).
- MongoDB (change streams).
- SQL Server (CDC feature).
- Oracle, Db2, Cassandra (более experimental).
2.8 Debezium для PostgreSQL
Заголовок раздела «2.8 Debezium для PostgreSQL»Setup:
-- enable logical replicationALTER SYSTEM SET wal_level = logical;ALTER SYSTEM SET max_wal_senders = 10;ALTER SYSTEM SET max_replication_slots = 10;-- restart Postgres
CREATE PUBLICATION debezium_pub FOR ALL TABLES;
CREATE USER debezium WITH REPLICATION PASSWORD '...';GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;Connector config (JSON):
{ "name": "orders-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "debezium", "database.password": "...", "database.dbname": "shop", "database.server.name": "dbserver1", "table.include.list": "public.orders,public.payments", "plugin.name": "pgoutput", "publication.name": "debezium_pub" }}Result: каждое изменение в orders → Kafka topic dbserver1.public.orders с CDC event:
{ "op": "c", // c=create, u=update, d=delete, r=read (snapshot) "before": null, "after": {"id": 123, "status": "pending", "total": 99.99}, "ts_ms": 1716000000000, "source": {"lsn": "0/12345", "txId": 999, ...}}2.9 Debezium events format
Заголовок раздела «2.9 Debezium events format»- Default: JSON, Avro, или Protobuf (через converter setting).
- Schema Registry integration: Avro/Proto + schema versioning.
Структура events: op, before, after, source, ts_ms. Schema registry хранит схему таблицы.
⚠️ Schema changes (ALTER TABLE) — Debezium emits schema change event в отдельный topic. Consumers должны handle.
2.10 Initial snapshot
Заголовок раздела «2.10 Initial snapshot»При первом старте Debezium делает snapshot всех target tables → publishes в Kafka. Потом switches на live WAL streaming.
Snapshot может быть:
initial(default): full snapshot then streaming.never: только streaming (миссит historical data).when_needed: snapshot if offset lost.
⚠️ Initial snapshot для large table может занять часы и блокирует table writes (зависит от Postgres version & isolation level).
2.11 Failure handling, offset management
Заголовок раздела «2.11 Failure handling, offset management»Debezium tracks LSN (Log Sequence Number) в Kafka Connect offsets topic. На restart resume from last commit.
⚠️ Если Debezium offsets потеряны (e.g., Connect cluster wipe), нужен новый snapshot или manual offset rewind.
2.12 Alternative CDC в Go: pglogrepl
Заголовок раздела «2.12 Alternative CDC в Go: pglogrepl»Если не хочется Java/Kafka Connect, можно read PG logical replication directly в Go:
import "github.com/jackc/pglogrepl"
func main() { conn, _ := pgconn.Connect(ctx, "postgres://...?replication=database")
pglogrepl.CreateReplicationSlot(ctx, conn, "my_slot", "pgoutput", ...) pglogrepl.StartReplication(ctx, conn, "my_slot", pglogrepl.LSN(0), ...)
for { msg, _ := conn.ReceiveMessage(ctx) if msg, ok := msg.(*pgproto3.CopyData); ok { xld, _ := pglogrepl.ParseXLogData(msg.Data[1:]) // parse xld.WALData — это logical decoding output } }}Pros: pure Go, no Kafka Connect. Cons: reimplement Debezium (schema handling, snapshot, retry).
Production use case: targeted CDC для one specific use (e.g., trigger cache invalidation), не enterprise-wide.
2.13 Use cases CDC
Заголовок раздела «2.13 Use cases CDC»- Microservice integration: order service writes в свою DB → Debezium publishes → other services subscribe.
- Materialized views in data lake: real-time data warehouse (S3 + Parquet + Athena).
- Cache invalidation: PG → Debezium → invalidate Redis keys.
- Search index sync: PG → Debezium → ElasticSearch.
- Audit log: capture all changes for compliance.
- Data replication between regions.
2.14 Outbox + Debezium hybrid
Заголовок раздела «2.14 Outbox + Debezium hybrid»Подход: писать в outbox table, использовать Debezium для capture changes.
[App] → INSERT orders, INSERT outbox (one TX) → Postgres[Postgres WAL] → Debezium → Kafka topic `outbox`Преимущества:
- Атомарность через transaction.
- Real-time delivery через Debezium (без polling overhead).
- Schema независимо от business tables.
Это EventBridge pattern или transactional outbox via CDC.
⚠️ Debezium routing single message transformer (SMT) может выпрямить outbox → правильный topic per event_type.
2.15 Outbox vs direct CDC choice
Заголовок раздела «2.15 Outbox vs direct CDC choice»- Outbox + CDC: для domain events, чёткий контракт с downstream.
- Direct CDC (no outbox): для data sync (warehouse, cache). Tight coupling acceptable.
В одной системе обычно используется оба паттерна для разных целей.
2.16 Headers и metadata в outbox
Заголовок раздела «2.16 Headers и metadata в outbox»В production payload event лучше дополнить метаданными для трассировки и обработки:
INSERT INTO outbox (aggregate_id, event_type, payload, metadata)VALUES ( $1, $2, $3, jsonb_build_object( 'trace_id', $4, 'user_id', $5, 'correlation_id', $6, 'causation_id', $7, 'occurred_at', now(), 'source_service', 'order-service', 'version', 'v2' ));При publish в Kafka — metadata едет в headers, payload — в value. Это позволяет downstream filtering без deserialization full payload.
err := kafka.Publish(&kafka.Message{ Key: []byte(aggregateID), Value: payload, Headers: []kafka.Header{ {Key: "trace_id", Value: []byte(traceID)}, {Key: "event_type", Value: []byte(evtType)}, {Key: "version", Value: []byte("v2")}, },})2.17 Ordering guarantees
Заголовок раздела «2.17 Ordering guarantees»Kafka гарантирует order per partition. Если ordering важен per aggregate (типично):
// Используем aggregate_id как partition keymsg := &kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Key: []byte(aggregateID), // hash(key) → partition Value: payload,}⚠️ Если aggregate содержит другие aggregate IDs (Order содержит UserID), и downstream нужен ordering per User — это конфликт. Выбираем dominant ordering, добавляем sequence number в payload для resolution.
⚠️ При rebalance Kafka consumer группы — повторная доставка возможна (consumer не commit-нул offset). Поэтому inbox или idempotent business logic обязательны.
2.18 Outbox worker scaling
Заголовок раздела «2.18 Outbox worker scaling»Single worker reads outbox table. Если throughput не хватает:
- Sharding: добавить колонку
shardв outbox, напримерaggregate_id % 10. Каждый worker обрабатывает свои shard-ы. Distributes load. - Polling parallelism через SKIP LOCKED: несколько workers одновременно — каждый берёт rows которые другие не lock-нули. Простейшее scaling.
- CDC вместо polling: убирает polling overhead, real-time.
// Sharded pollingconst shardCount = 10
func workerForShard(shard int) { for { rows, _ := db.Query(` SELECT id, payload FROM outbox WHERE published_at IS NULL AND shard = $1 ORDER BY id LIMIT 100 FOR UPDATE SKIP LOCKED `, shard) // process... }}
// Start one worker per shardfor i := 0; i < shardCount; i++ { go workerForShard(i)}2.19 Monitoring и алерты
Заголовок раздела «2.19 Monitoring и алерты»Ключевые метрики:
- Outbox lag:
count WHERE published_at IS NULL. Растёт → worker не успевает. - Outbox age:
max(now() - created_at) WHERE published_at IS NULL. Старая unpublished строка → stuck. - Publish errors: counter per error type.
- CDC lag:
max(now() - source.ts_ms)в downstream consumer. - Replication slot lag:
pg_replication_slots.confirmed_flush_lsnvs current WAL LSN.
-- Postgres query для outbox lag monitoringSELECT COUNT(*) AS unpublished_total, MAX(EXTRACT(EPOCH FROM (now() - created_at))) AS oldest_secondsFROM outboxWHERE published_at IS NULL;Export через pg_exporter или custom Prometheus metric.
Alerts:
unpublished_total > 10000for 5 min → page on-call.oldest_seconds > 300→ critical, означает worker dead.replication_slot_lag > 1GB→ CDC consumer behind.
2.20 Тестирование outbox + CDC
Заголовок раздела «2.20 Тестирование outbox + CDC»Integration test для outbox:
func TestOutboxAtomicity(t *testing.T) { // Setup PG container db := setupTestDB(t)
// Simulate transaction tx, _ := db.Begin() tx.Exec("INSERT INTO orders (id, total) VALUES ($1, $2)", "order-1", 100) tx.Exec("INSERT INTO outbox (...) VALUES (...)")
// Crash before commit — both rolled back tx.Rollback()
// Verify nothing committed var count int db.QueryRow("SELECT COUNT(*) FROM orders").Scan(&count) assert.Equal(t, 0, count) db.QueryRow("SELECT COUNT(*) FROM outbox").Scan(&count) assert.Equal(t, 0, count)}Для Debezium: testcontainers-go с Kafka + Debezium image. End-to-end test от INSERT до Kafka message receipt.
3. Gotchas
Заголовок раздела «3. Gotchas»⚠️ FOR UPDATE SKIP LOCKED обязательно для multi-worker polling. Без — workers взаимно блокируются.
⚠️ At-least-once → нужен inbox на consumer. Без — duplicates ломают бизнес-логику.
⚠️ Kafka idempotent producer — only intra-Kafka. Не помогает если ваш polling worker re-publishes after crash.
⚠️ Outbox cleanup race с downstream replay. Compromise on retention.
⚠️ Debezium initial snapshot blocks writes depending on Postgres version. Plan downtime window.
⚠️ WAL retention. Postgres max_wal_size + replication slot may holdить старый WAL пока Debezium не consume. Disk full risk!
⚠️ Replication slot abandoned. Если Debezium умер и slot остался — WAL накапливается. Monitor pg_replication_slots.active = false.
⚠️ Schema changes (ALTER TABLE) могут поломать Debezium. ADD COLUMN obычно ОК; DROP COLUMN, type change — careful.
⚠️ Big rows (>1 МБ): Kafka default message size 1 МБ. Need max.message.bytes increase.
⚠️ Debezium reorders events? No — preserves transaction order per partition.
⚠️ TOAST values in Postgres (large columns stored separately) — Debezium может миссить changes если column not in REPLICA IDENTITY. Use REPLICA IDENTITY FULL for affected tables.
⚠️ Inbox table growth. Bounded TTL essential. After TTL — if duplicate arrives, double-process.
⚠️ Ordering across aggregates. Kafka guarantees order within partition. If 2 entities related and you partition by entity_id, related entity events можно reorder.
⚠️ Outbox table contention. High write throughput → outbox table is hot. Index design matters; consider partitioning.
⚠️ JSON payload size. Don’t embed full state, only what changed.
4. Real cases
Заголовок раздела «4. Real cases»Case 1: E-commerce dual-write incident
Заголовок раздела «Case 1: E-commerce dual-write incident»Симптом: Order created, customer charged, но inventory не уменьшился. Кейс случается раз в неделю на peak hours.
Root cause: app записывает в orders DB, потом publishes Kafka event. Под нагрузкой Kafka periodically times out → event lost, но DB write committed.
Fix: outbox table + polling worker. Атомарность гарантирована, retry-able.
Outcome: incidents → zero за 6 месяцев.
Case 2: PG WAL bloat
Заголовок раздела «Case 2: PG WAL bloat»Контекст: Debezium connector умер ночью, никто не заметил weekend.
Symptom: Postgres disk 95%, alerts fired.
Anal: pg_replication_slots показал slot debezium_slot неактивен 48h. WAL накопился 200 ГБ.
Fix:
- Restart Debezium → resume consuming.
- Set up alert
pg_replication_slots.active = falseдля > 5 минут. - Set
max_slot_wal_keep_size = 50GB(PG 13+) — limit WAL retention per slot.
Case 3: Schema migration broke CDC
Заголовок раздела «Case 3: Schema migration broke CDC»Контекст: Renamed column customer_id → user_id in production DB.
Result: Debezium emit-нул schema change event. Downstream consumers expecting customer_id failed.
Fix:
- Process: never rename — always add new + dual-write + drop old.
- Migration: ADD COLUMN user_id, dual-write 2 weeks, DROP customer_id.
Case 4: Outbox table hotness
Заголовок раздела «Case 4: Outbox table hotness»Контекст: High-throughput service, 5000 events/sec. Outbox table становится bottleneck.
Anal: vacuum lagging behind, table size growing.
Fix:
- Partition outbox by hour:
outbox_2026_05_21_14. - DROP partitions older than 7 days.
- Per-partition VACUUM fast.
Case 5: Inbox missed deduplication
Заголовок раздела «Case 5: Inbox missed deduplication»Симптом: Order processed дважды. Customer charged twice.
Anal:
- Event A came at 10:00, processed, written to inbox.
- Inbox TTL = 24h. Cleaned at 10:00 next day.
- Same event A came at 10:01 next day (retry from broker). Inbox doesn’t know.
- Double-processed.
Fix: increased TTL to 7 days. Plus moved deduplication к idempotent business logic level (e.g. unique constraint on (order_id, payment_id)).
5. Вопросы (25)
Заголовок раздела «5. Вопросы (25)»- Что такое dual-write problem? Дайте пример.
- Опишите Outbox pattern: что хранится, как читается.
- Что такое Inbox pattern и где он на consumer side?
- CDC: что capture-ит и как?
- Структура outbox table (SQL).
- Зачем
FOR UPDATE SKIP LOCKEDв polling worker? - At-least-once vs exactly-once в outbox: что вы получаете?
- Kafka
enable.idempotence=true— что гарантирует? - Inbox + business logic в одной транзакции — зачем?
- Опишите архитектуру Debezium для Postgres.
- Что такое pgoutput vs wal2json?
- Initial snapshot в Debezium — что делает, какие проблемы?
- Что такое LSN в Postgres?
- Replication slot — что это, чем опасен заброшенный?
REPLICA IDENTITY FULL— зачем?- pglogrepl в Go — когда оправдано вместо Debezium?
- Outbox + Debezium hybrid: как работает.
- Outbox cleanup retention — какой выбрать и почему?
- Inbox TTL — какие риски короткого vs длинного?
- Schema change в Debezium: как handle.
- Reordering events: гарантия Kafka per partition.
- PG WAL bloat при abandoned slot — мониторинг.
- Big row > 1 МБ в Kafka — что настроить?
- Кейс «dual-write incident» — как outbox решает.
- Outbox vs direct CDC: когда какой подход?
6. Practice
Заголовок раздела «6. Practice»Задача 1: Реализовать outbox table + polling worker в Go. Publish в Kafka.
Задача 2: Симулировать crash worker mid-publish (kill -9). Restart, проверить, что нет lost events, но возможны duplicates.
Задача 3: Реализовать inbox на consumer side. Тест: подать дублированный event 5 раз, проверить, что бизнес-логика выполнилась 1 раз.
Задача 4: Поднять Postgres + Debezium + Kafka через docker-compose. Tablе orders → Kafka topic.
Задача 5: Написать Go consumer для CDC events. Обработать op=‘c’ (create) и op=‘u’ (update).
Задача 6: Сделать schema change (ADD COLUMN) и проверить, как Debezium это handle.
Задача 7: Реализовать pglogrepl-based CDC в Go (без Debezium). Compare complexity.
Задача 8: Outbox cleanup background job, retention 7 дней.
Задача 9: Партиционировать outbox table по часам, автоматизировать DROP старых partitions.
Задача 10 (advanced): Outbox + Debezium hybrid. Routing SMT в Debezium для маршрутизации события в правильный topic.
7. Источники
Заголовок раздела «7. Источники»- Chris Richardson, “Microservices Patterns”, Manning, 2018 — глава об outbox.
- Microservices.io: Transactional Outbox, https://microservices.io/patterns/data/transactional-outbox.html
- Debezium Documentation, https://debezium.io/documentation/
- Gunnar Morling, “Reliable Microservices Data Exchange With the Outbox Pattern”, 2019.
- Confluent Blog, “Implementing the Outbox Pattern”, 2020.
- PostgreSQL Logical Replication documentation.
- Jacks Bramos, “Architecting Data-Intensive Applications with PostgreSQL and Debezium”.
- pglogrepl: github.com/jackc/pglogrepl
- Kafka producer idempotence docs.
- Martin Kleppmann, “Designing Data-Intensive Applications”, chapters 11–12.
- Andy Pavlo, “Database Logging” lecture, CMU CS445.
- Pat Helland, “Life Beyond Distributed Transactions”, 2007.
- AWS DMS (Database Migration Service) — alternative CDC.
- ScyllaDB CDC docs (NoSQL CDC example).
- The Twelve-Factor App — III. Config (related для config of outbox workers).