Перейти к содержимому

Apache Kafka в production: глубокое погружение для Go

Зачем знать. Kafka — стандарт де-факто для event streaming и асинхронной интеграции сервисов с гарантированной доставкой. Middle 2 Go-инженер обязан понимать внутренности (KRaft, ISR, log compaction), уметь настроить idempotent/transactional producer, грамотно работать с consumer groups, реализовать exactly-once semantics и DLQ-схему, выбрать клиентскую библиотеку (franz-go vs kafka-go vs sarama) и диагностировать lag в проде.

  1. Концепция и архитектура (кратко)
  2. Production-практики: producer, consumer, EoS, schema registry, мониторинг, tuning
  3. Gotchas — реальные грабли
  4. Real cases: order pipeline, log aggregation, CDC
  5. 30 вопросов и ответов
  6. Practice — задания
  7. Источники

KAFKA CLUSTER
┌──────────────────────────────────────────────────┐
│ │
│ Broker-1 Broker-2 Broker-3 │
│ (Controller) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Topic A │ │ Topic A │ │ Topic A │ │
│ │ P0 (L) │◀────▶│ P0 (F) │◀────▶│ P0 (F) │ │
│ │ P1 (F) │◀────▶│ P1 (L) │◀────▶│ P1 (F) │ │
│ │ P2 (F) │◀────▶│ P2 (F) │◀────▶│ P2 (L) │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ │
└──────────────────────────────────────────────────┘
▲ ▲
│ │
Producer-1 Consumer Group "g1"
C1, C2, C3 (по partition)
L = leader, F = follower (replica)
ISR (in-sync replicas) = leader + актуальные followers
  • Broker — серверный узел Kafka. Хранит partitions, обслуживает producer/consumer.
  • Controller — один из брокеров, отвечает за метаданные кластера (партиции, leader election). С Kafka 3.3+ через KRaft (Kafka Raft) controller встроен прямо в брокеры (или в отдельные controller-node), Zookeeper больше не нужен.
  • Topic — логический канал, делится на partitions.
  • Partition — упорядоченная append-only последовательность сообщений; единица параллелизма. Внутри partition ordering строгий, между partitions — нет.
  • Replica — копия partition на другом broker. У каждой partition есть leader (читает/пишет клиенты) и followers (репликация).
  • ISR (In-Sync Replicas) — список реплик, которые «догнали» leader в пределах replica.lag.time.max.ms. Когда producer указывает acks=all, сообщение считается записанным только если все ISR подтвердили.
  • Offset — номер сообщения внутри partition (long, монотонно растущий).
До 3.3 (Zookeeper):
┌──────────┐ ┌──────────────┐
│Zookeeper │◀──▶│ Kafka Broker │
│ Quorum │ │ + Controller│
└──────────┘ └──────────────┘
После 3.3+ (KRaft):
┌──────────────────────────────┐
│ Controller-quorum (Raft) │
│ встроен в брокеры или │
│ отдельный controller-node │
└──────────────────────────────┘

С 3.3 KRaft GA, с 4.0 (2025) Zookeeper полностью удалён. Преимущества: одна система вместо двух, меньше операционных артефактов, быстрее failover, метаданные хранятся в специальном __cluster_metadata topic.

Каждая partition физически — это набор сегментов (файлов) на диске:

/var/lib/kafka/data/orders-0/
├── 00000000000000000000.log ← сегмент данных
├── 00000000000000000000.index ← индекс offset → file position
├── 00000000000000000000.timeindex ← индекс timestamp → offset
├── 00000000000045623891.log
├── 00000000000045623891.index
└── leader-epoch-checkpoint

Retention:

  • retention.ms (по времени, default 7 дней)
  • retention.bytes (по размеру)
  • Применяется к сегментам, целиком: сегмент удаляется когда самое старое сообщение в нём старше retention.

Compaction (cleanup.policy=compact) — для key-based topics: Kafka гарантирует, что для каждого ключа в topic остаётся как минимум последнее значение. Используется для:

  • changelog (CDC),
  • compacted state (current snapshot),
  • tombstones (null payload = удаление ключа).
Before compaction:
[k1=v1, k2=v2, k1=v3, k3=v4, k1=v5, k2=v6]
After compaction:
[k3=v4, k1=v5, k2=v6] ← последние значения по ключам
  • Producer определяет partition (по hash(key) или round-robin), отправляет batch на leader, ждёт ack согласно acks.
  • Consumer group делит partitions топика между членами. Каждая partition — ровно один consumer внутри группы. Если consumers больше чем partitions — лишние idle.

ПараметрЗначениеКогда нужно
acks0 / 1 / allall для durability
enable.idempotencetrue (default с 3.0)Защита от дублей при retry
max.in.flight.requests.per.connection1–5=1 для strict ordering без idempotence
retriesInteger.MAX_VALUEБесконечные retry, ограничены delivery.timeout.ms
delivery.timeout.ms120000Полный таймаут публикации
compression.typelz4 / snappy / zstd / gzip / noneДля throughput выбирай lz4 или zstd
linger.ms5–50 msBatching, latency vs throughput
batch.size16–256 KBРазмер batch на partition
transactional.idstringДля exactly-once
import (
"context"
"github.com/twmb/franz-go/pkg/kgo"
)
cl, err := kgo.NewClient(
kgo.SeedBrokers("kafka1:9092", "kafka2:9092", "kafka3:9092"),
kgo.DefaultProduceTopic("orders"),
// idempotence включается автоматически при retries > 0 и acks=all
kgo.RequiredAcks(kgo.AllISRAcks()),
kgo.ProducerBatchCompression(kgo.Lz4Compression()),
kgo.ProducerLinger(10 * time.Millisecond),
kgo.MaxBufferedRecords(1000),
)
if err != nil { /* ... */ }
defer cl.Close()
r := &kgo.Record{
Key: []byte(orderID), // hash(key) → partition (стабильно)
Value: payload,
Headers: []kgo.RecordHeader{
{Key: "trace-id", Value: []byte(traceID)},
},
}
cl.Produce(ctx, r, func(rec *kgo.Record, err error) {
if err != nil { log.Error(err) }
})
cl.Flush(ctx)

Transactional producer (атомарно несколько partitions/topics)

Заголовок раздела «Transactional producer (атомарно несколько partitions/topics)»
cl, _ := kgo.NewClient(
kgo.SeedBrokers("kafka1:9092"),
kgo.TransactionalID("orders-producer-1"),
kgo.RequiredAcks(kgo.AllISRAcks()),
)
if err := cl.BeginTransaction(); err != nil { /* ... */ }
cl.Produce(ctx, &kgo.Record{Topic: "orders", Value: ...}, nil)
cl.Produce(ctx, &kgo.Record{Topic: "audit", Value: ...}, nil)
switch err := cl.EndTransaction(ctx, kgo.TryCommit); {
case errors.Is(err, kerr.UnknownProducerID):
// нужен новый producer instance
default:
// ok or abort
}
ПараметрЗначениеКомментарий
group.idstringИдентификатор consumer group
auto.offset.resetearliest / latest / noneКогда нет committed offset
enable.auto.commitfalseВ проде почти всегда manual commit
partition.assignment.strategycooperative-stickyРекомендация с 2.4+
session.timeout.ms30 000Через сколько consumer считается мёртвым
heartbeat.interval.ms1/3 от session.timeout.msЧастота heartbeat
max.poll.interval.ms300 000Между poll() — иначе rebalance
max.poll.records500Записей за один poll
fetch.min.bytes1–1MBНакопление перед ответом
fetch.max.wait.ms500Максимум ожидания
isolation.levelread_committedДля exactly-once
group.instance.iduuid/имяStatic membership

До Kafka 2.4 при ребалансе все consumers останавливались — «stop-the-world». С cooperative-sticky ребаланс инкрементальный: только пересдаваемые partitions останавливаются.

Без него: рестарт пода → consumer считается новым → rebalance. С static ID: тот же ID после рестарта → Kafka не делает rebalance в течение session.timeout.ms. Снижает downtime при rolling restart на порядок.

cl, _ := kgo.NewClient(
kgo.SeedBrokers("kafka1:9092"),
kgo.ConsumerGroup("orders-processor"),
kgo.ConsumeTopics("orders"),
kgo.DisableAutoCommit(),
kgo.Balancers(kgo.CooperativeStickyBalancer()),
kgo.SessionTimeout(30 * time.Second),
kgo.RebalanceTimeout(60 * time.Second),
kgo.InstanceID("consumer-pod-1"), // static membership
kgo.FetchMaxWait(500 * time.Millisecond),
kgo.FetchMaxBytes(50 << 20),
)
for {
fetches := cl.PollFetches(ctx)
if errs := fetches.Errors(); len(errs) > 0 {
log.Error(errs)
continue
}
fetches.EachRecord(func(r *kgo.Record) {
if err := process(r); err != nil {
// в DLQ или retry
sendToDLQ(r, err)
return
}
})
// commit ТОЛЬКО когда batch обработан
if err := cl.CommitUncommittedOffsets(ctx); err != nil {
log.Error("commit failed:", err)
}
}
acks=0: producer ──message──▶ broker (не ждём ответа)
⚠️ может потерять данные, fire-and-forget
acks=1: producer ──message──▶ leader ──OK
leader ──╳──▶ follower (не дождались)
⚠️ потеря если leader упадёт до replication
acks=all: producer ──message──▶ leader ──▶ followers (все ISR)
◀──OK от всех ISR──
✅ durability, при min.insync.replicas=2 переживёт падение 1 broker

Стандарт для prod: acks=all + min.insync.replicas=2 + replication.factor=3.

Producer получает уникальный producer_id (PID) + sequence number на partition. Broker отслеживает последний sequence от данного PID на каждой partition — дубль с тем же sequence отбрасывается.

Включается:

enable.idempotence=true
acks=all
max.in.flight.requests.per.connection ≤ 5
retries > 0

Защищает только от retry на сетевом уровне внутри одного producer-session. Если приложение само отправит message дважды — это два разных запроса, оба будут приняты.

Два независимых уровня:

  1. At-least-once + idempotence = no duplicates from network retries.
  2. Transactional producer + consumer с isolation.level=read_committed = атомарное обновление нескольких topics/partitions + commit offsets в одной транзакции.

Pattern «read-process-write» (Kafka Streams под капотом):

beginTransaction()
read from topic-in
process()
write to topic-out
sendOffsetsToTransaction(consumed_offsets, group)
commitTransaction()

Если что-то падает в середине — abort, всё откатится; consumer с read_committed не увидит aborted данных.

⚠️ EoS работает только внутри Kafka. Если процесс пишет в Kafka и в Postgres — это two-phase commit, нужен outbox pattern (см. ниже).

АлгоритмCPUСжатиеКогда
none01.0Очень малые сообщения
gziphighbestАрхив, low traffic
snappylowmediumУниверсально
lz4lowmediumЧаще всего default
zstdmediumbest ratioСовременный default, Kafka 2.1+

Сжатие в проде обязательно — экономия диска × 5, сети × 5.

Application ──Avro/Protobuf──▶ Schema Registry
[schema_id + payload]
Application ──read──▶ Schema Registry ──schema──▶ Decode

Subject naming strategies:

  • TopicNameStrategy (по умолчанию) — subject = topic-key / topic-value. Один тип на topic.
  • RecordNameStrategy — subject по FQN. Мульти-тип в одном topic.
  • TopicRecordNameStrategytopic-FQN. Мульти-тип, но изолированы по topic.

Compatibility:

  • BACKWARD (default) — новая схема должна читать данные старой схемы (consumer обновляется первым).
  • FORWARD — старая схема может читать данные новой (producer обновляется первым).
  • FULL — оба направления.
  • NONE — без проверки (опасно).

Реализации: Confluent Schema Registry, Karapace (open-source, Apache 2.0), AWS Glue Schema Registry, Redpanda Schema Registry.

Transaction:
INSERT INTO orders ...
INSERT INTO outbox (event_type, payload, status='pending')
COMMIT
───────────────────────────
Outbox poller / Debezium CDC:
read pending events ──▶ publish to Kafka ──▶ mark as sent
  • Polling outbox: select из outbox + publish + update status. Простой, но нагружает БД.
  • Transactional outbox через CDC (Debezium): Debezium читает WAL Postgres / binlog MySQL и публикует в Kafka. БД не знает о Kafka.

Pattern:

┌──────────┐ ok ┌──────────┐
│ Consumer │──────▶ │ Process │
└──────────┘ └──────────┘
│ err
┌──────────┐ N attempts
│ Retry │─────────────▶ DLQ topic (orders.DLQ)
│ topic │ + headers: error, stack, attempt
└──────────┘

В Go custom-логика: после N retry → publish в orders.DLQ + commit offset исходного topic. Отдельный consumer / human-process работает с DLQ.

  • Partition count: 10× от числа consumer-инстансов потенциальных. Слишком много (>4000 на broker) — деградация.
  • Replication factor: 3 — стандарт.
  • min.insync.replicas: 2 (при RF=3). Если ISR падает до 1 — producer с acks=all получит NotEnoughReplicas.
  • Heap: 6–8 GB JVM, остаток ОЗУ — для page cache.
  • Disks: NVMe / SSD, RAID-10 или JBOD, XFS.
  • Consumer lag (high → consumer не успевает): kafka-lag-exporter, Burrow (LinkedIn), Cruise Control.
  • Under-replicated partitions (UR > 0 → проблема): JMX metric.
  • Request latency: produce p99 < 100ms норма.
  • Broker disk usage, network IO, active controller count == 1.
  • Redpanda (Kafka-API compatible, написан на C++, без JVM, single binary): идеально для CI и laptop.
  • docker-compose с confluentinc/cp-kafka или bitnami/kafka (KRaft режим, single broker).
# docker-compose.yml (Redpanda + Console)
services:
redpanda:
image: docker.redpanda.com/redpandadata/redpanda:v24.2.7
command:
- redpanda start
- --smp 1
- --memory 1G
- --reserve-memory 0M
- --node-id 0
- --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
- --advertise-kafka-addr internal://redpanda:9092,external://localhost:19092
- --mode dev-container
ports: ["19092:19092", "9644:9644"]
console:
image: docker.redpanda.com/redpandadata/console:v2.7.2
environment:
KAFKA_BROKERS: redpanda:9092
ports: ["8080:8080"]
  • Confluent Cloud (родители Kafka, есть Schema Registry, ksqlDB, Connect).
  • AWS MSK + MSK Serverless.
  • Aiven, Upstash Kafka (per-message billing).
  • Redpanda Cloud.

⚠️ Acks=1 ≠ durability. Leader может умереть до replication → потеря. Для prod всегда acks=all.

⚠️ min.insync.replicas=1 + RF=3 = иллюзия безопасности. При acks=all достаточно одной ISR — leader может быть единственной. Ставь min.insync.replicas=2.

⚠️ Idempotence ≠ Exactly-once. Idempotence — защита от network retry внутри одной сессии producer. Если приложение само отправило дубль — будет дубль.

⚠️ enable.auto.commit=true опасен. Commit идёт по таймеру, после неудачной обработки offset уже закоммичен → message lost. В проде — manual commit.

⚠️ Большое max.poll.interval.ms маскирует проблему. Поднимать таймаут «чтобы не было rebalance» — антипаттерн; либо ускоряй обработку, либо выноси в отдельный worker pool.

⚠️ Static membership ≠ no rebalance. При scale-up/scale-down ребаланс будет всё равно.

⚠️ max.in.flight.requests > 1 без idempotence ломает ordering. При retry батчей порядок меняется.

⚠️ Compaction не освобождает место мгновенно. Сегменты compactится только когда они «closed» и tombstone живёт delete.retention.ms (default 24h).

⚠️ Изменение числа partitions ломает hash(key)→partition mapping. Старые ключи могут поехать на другие partitions → нарушение ordering by key.

⚠️ auto.offset.reset=latest теряет данные при первом запуске consumer group (всё до текущего момента не прочитано).

⚠️ EoS не работает кросс-кластерно. MirrorMaker 2 не сохраняет transactional semantics между кластерами.

⚠️ Heap > 32 GB у broker — отключает compressed oops в JVM → больше памяти на pointers, медленнее GC. Лучше 6–8 GB heap + 64–256 GB page cache.

⚠️ Schema Registry — single point of failure. Producer не может опубликовать без регистра. Нужен HA mode.

⚠️ replica.lag.time.max.ms слишком маленький → followers выкидываются из ISR на любом GC pause → шум. Default 30s — обычно ок.

⚠️ compression.type на topic ≠ producer. Можно настроить разные; рекомендуется одинаково (broker re-compress иначе).

⚠️ CGo (confluent-kafka-go) — нужен librdkafka на хосте, проблемы с Alpine (musl). Используй pure-Go (franz-go) если нет жёстких perf-требований.


API ──order_created──▶ [orders]
[validate-consumer] ──order_validated──▶ [orders.validated]
[payment-consumer]
┌──────────────────┴──────────────────┐
▼ ▼
[payment_ok] [payment_failed]
│ │
▼ ▼
[ship-consumer] [refund-consumer]
▶ orders.DLQ (if dispute)

Каждый consumer — отдельная Go-сервис, scaling по числу partitions; ordering по order_id как ключу гарантирует strict order для одного заказа.

N сервисов ──fluentbit/filebeat──▶ Kafka (topic per service or per env)
Logstash / Vector ──▶ Elastic / ClickHouse

Преимущество: backpressure absorb (если ES не успевает, Kafka буферизует), retention 1–7 дней.

Postgres WAL ──Debezium──▶ Kafka (db.public.orders) ──▶ consumers
(compacted)

Полная репликация состояния таблицы через compacted topic + tombstones (DELETE → message with null value).


1. Что такое partition в Kafka? Упорядоченная append-only последовательность сообщений внутри topic; единица параллелизма и репликации.

2. Зачем KRaft вместо Zookeeper? Убирает внешнюю зависимость, упрощает ops, ускоряет failover, метаданные хранятся как обычный topic в самом Kafka.

3. Что такое ISR? In-Sync Replicas — реплики partition, которые «догнали» leader в пределах replica.lag.time.max.ms. Сообщение с acks=all подтверждается только когда все ISR записали.

4. Что такое consumer group? Группа consumer-инстансов, разделяющих partitions topic. Внутри группы каждая partition обрабатывается ровно одним consumer.

5. Чем acks=1 отличается от acks=all? acks=1 — ждём подтверждения только от leader; durability слабее. acks=all — ждём подтверждения от всех ISR.

6. Что такое idempotent producer? Producer с уникальным PID + sequence number на partition; broker отбрасывает дубли с тем же sequence. Защищает от retry-дублей в рамках одной producer session.

7. Что включает enable.idempotence=true? Автоматически: acks=all, retries>0, max.in.flight.requests.per.connection ≤ 5.

8. Что такое transactional producer? Producer с transactional.id, поддерживает атомарные write в несколько partitions/topics + sendOffsetsToTransaction для read-process-write pattern.

9. Какие алгоритмы compression поддерживает Kafka? none, gzip, snappy, lz4, zstd (с 2.1+). zstd рекомендуется для prod.

10. Что такое linger.ms / batch.size? Producer накапливает сообщения в batch на partition; отправляет когда batch заполнен (batch.size) или прошло linger.ms.

11. Что такое cooperative-sticky rebalance? Инкрементальный rebalance: вместо stop-the-world (eager) перераспределяются только меняющиеся partitions. С Kafka 2.4+, рекомендация для prod.

12. Зачем static membership (group.instance.id)? Чтобы при rolling restart consumer не считался новым → нет rebalance в течение session.timeout.ms.

13. max.poll.interval.ms vs session.timeout.ms? session.timeout.ms — между heartbeats (broker считает consumer мёртвым); max.poll.interval.ms — между вызовами poll() (consumer считается «застрявшим»). Heartbeats идут в отдельном thread.

14. commitSync vs commitAsync? commitSync блокирующий, надёжный, медленнее. commitAsync неблокирующий, без retry — для регулярного коммита + commitSync на shutdown.

15. Что произойдёт при auto.offset.reset=earliest и старом topic? Consumer прочитает все сохранённые сообщения с самого начала.

16. Что такое EoS в Kafka? Гарантия, что сообщение обработано ровно один раз даже при failures. Достигается transactional producer + isolation.level=read_committed + commit offsets в транзакции.

17. Когда EoS не работает? Кросс-системно (Kafka + DB), кросс-кластерно (MirrorMaker 2 не сохраняет транзакции).

18. Что такое read_committed? Consumer читает только commit-завершённые транзакции; aborted/in-progress сообщения скрыты.

19. Что такое compacted topic? Topic с cleanup.policy=compact: Kafka гарантирует, что для каждого ключа сохраняется как минимум последнее значение. Используется для changelog / state.

20. Что такое tombstone? Сообщение с value=null в compacted topic — сигнал «удалить ключ».

21. Что такое controller? Broker, ответственный за метаданные кластера: leader election, partition reassignment. С KRaft встроен в брокеры (Raft quorum).

22. Сколько partitions выбрать для topic? Зависит от throughput и числа consumer-инстансов. Грубо: max_throughput / per_partition_throughput. На broker не более ~4000 partitions (с KRaft планка выше).

23. Replication factor 3 + min.insync.replicas 2 — что это даёт? Переживёт падение 1 broker без потери данных и без блокировки producer.

24. franz-go vs sarama vs kafka-go vs confluent-kafka-go?

  • franz-go (twmb): pure Go, современный, поддерживает все фичи (KIP), быстрый — рекомендация 2025+.
  • kafka-go (segmentio): pure Go, простой API, меньше фич (нет полноценных транзакций).
  • confluent-kafka-go: CGo wrapper над librdkafka, лучший throughput, но CGo.
  • IBM/sarama (бывш Shopify): pure Go, классика, активная поддержка.

25. Почему franz-go стал рекомендацией? Активная разработка, полная поддержка KIP (включая KIP-848 next-gen consumer groups), стабильное API, отличная документация.

26. Что такое DLQ pattern? После N неуспешных попыток обработки message отправляется в отдельный DLQ-topic с metadata об ошибке. Отдельный consumer/operator работает с DLQ.

27. Что такое outbox pattern? Запись «событий» в outbox-таблицу в той же DB-транзакции что и основная запись; отдельный процесс / CDC публикует их в Kafka. Гарантирует атомарность DB + Kafka.

28. Как мониторить consumer lag? kafka-lag-exporter (Prometheus), Burrow (LinkedIn), JMX-метрики; алерт при росте lag.

29. Что делать если consumer не успевает?

  1. увеличить partitions + consumers, 2) ускорить процессинг (batch, async DB), 3) вынести IO в worker pool, 4) DLQ для poison message.

30. Что такое Redpanda? Kafka-API совместимая система на C++ без JVM и Zookeeper; быстрее на mixed workload, проще ops. Используется как drop-in replacement и для CI/dev.


  1. Идемпотентный publisher. Напиши Go-функцию, которая публикует event с event_id как ключом; idempotent producer + проверка через consumer что дубли при retry не появляются.

  2. DLQ. Реализуй consumer для topic orders, при ошибке обработки → publish в orders.DLQ с headers x-error, x-retry-count, x-original-topic; offset исходного topic коммитим только после успешной публикации DLQ.

  3. Outbox pattern. PostgreSQL-таблица outbox, Go-poller выбирает WHERE status='pending' ORDER BY id LIMIT 100, публикует в Kafka, обновляет status='sent'. Покажи что транзакция атомарна.

  4. Transactional producer. Реализуй read-process-write: читай из orders, обогащай из БД, пиши в orders.enriched + commit offsets в транзакции. Verifу что abort откатывает обе write.

  5. Consumer lag monitor. Подними kafka-lag-exporter + Prometheus + Grafana. Сделай алерт lag > 1000.

  6. Compaction. Создай compacted topic, отправь N сообщений с ключами повторяющимися; запусти log compaction, проверь что в файлах остались только последние.

  7. Static membership benchmark. Сравни rolling restart consumer group с и без group.instance.id; измерь сколько времени партиции «не обрабатывались».

  8. Schema Registry. Подними Karapace, зарегистрируй Avro-схему, опубликуй с эволюцией (добавление optional поля); проверь backward compatibility.

  9. Cooperative-sticky vs Range. Scale consumer group с 1 до 5; сравни сколько partitions перераспределилось.

  10. Redpanda для CI. Замени Kafka на Redpanda в docker-compose, прогони integration тесты, измерь startup time.


type OutboxEvent struct {
ID int64
Topic string
Key []byte
Payload []byte
Headers map[string]string
Status string // pending | sent | failed
Attempts int
CreatedAt time.Time
}
type Poller struct {
db *pgxpool.Pool
kafka *kgo.Client
batch int
backoff time.Duration
}
func (p *Poller) RunOnce(ctx context.Context) error {
tx, err := p.db.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.ReadCommitted})
if err != nil { return err }
defer tx.Rollback(ctx)
rows, err := tx.Query(ctx, `
SELECT id, topic, key, payload, headers, attempts
FROM outbox
WHERE status = 'pending' AND attempts < 5
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT $1`, p.batch)
if err != nil { return err }
var events []OutboxEvent
for rows.Next() {
var e OutboxEvent
var hraw []byte
rows.Scan(&e.ID, &e.Topic, &e.Key, &e.Payload, &hraw, &e.Attempts)
json.Unmarshal(hraw, &e.Headers)
events = append(events, e)
}
rows.Close()
if len(events) == 0 { return tx.Commit(ctx) }
// публикация batch
results := make([]error, len(events))
var wg sync.WaitGroup
for i, ev := range events {
wg.Add(1)
i, ev := i, ev
rec := &kgo.Record{
Topic: ev.Topic,
Key: ev.Key,
Value: ev.Payload,
}
for k, v := range ev.Headers {
rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: k, Value: []byte(v)})
}
p.kafka.Produce(ctx, rec, func(r *kgo.Record, err error) {
results[i] = err; wg.Done()
})
}
wg.Wait()
// обновление статуса
for i, ev := range events {
if results[i] == nil {
tx.Exec(ctx, `UPDATE outbox SET status='sent', sent_at=NOW() WHERE id=$1`, ev.ID)
} else {
tx.Exec(ctx, `UPDATE outbox SET attempts=attempts+1, last_error=$2 WHERE id=$1`,
ev.ID, results[i].Error())
}
}
return tx.Commit(ctx)
}

Особенности:

  • FOR UPDATE SKIP LOCKED — несколько poller-инстансов работают параллельно без блокировок.
  • batch + parallel publish.
  • attempts counter — после 5 неудач сообщение помечается failed, требует ручного вмешательства.

Эмпирические цифры (3-broker prod-cluster, 1KB JSON-сообщения, 1 producer, m5.4xlarge):

compressionMB/sec writeCPU on producerCPU on broker
none2808%9%
snappy110019%11%
lz4140022%12%
zstd175038%14%
gzip32075%22%

zstd — лучший ratio, но дороже CPU. Для среднего workload — lz4.

KIP-848 (next-gen consumer groups, Kafka 3.7+ early access) меняет модель ребаланса:

  • Server-side assignment вместо client-side.
  • Heartbeats заменяют long-poll.
  • Incremental rebalance стало стандартом.
  • Меньше моментов «всё остановилось».

С точки зрения Go-разработчика franz-go уже поддерживает KIP-848 (по флагу) — будущее consumer groups.

┌──────────────────────────────────────────────────────────┐
│ Stream processor (read-process-write): │
│ │
│ txProd.BeginTransaction() │
│ ↓ │
│ records := consumer.PollRecords() │
│ ↓ │
│ for each record: │
│ out := process(record) │
│ txProd.Produce(out, "topic-out") │
│ ↓ │
│ txProd.SendOffsetsToTransaction( │
│ consumed_offsets, consumer.Group()) │
│ ↓ │
│ if all_ok: txProd.CommitTransaction() │
│ else: txProd.AbortTransaction() │
└──────────────────────────────────────────────────────────┘

Consumer с isolation.level=read_committed не видит aborted данных. Это и есть EoS внутри Kafka.

Версия 1:

{"type": "record", "name": "Order", "fields": [
{"name": "id", "type": "string"},
{"name": "total", "type": "int"}
]}

Версия 2 — добавляем optional поле (BACKWARD-совместимо):

{"type": "record", "name": "Order", "fields": [
{"name": "id", "type": "string"},
{"name": "total", "type": "int"},
{"name": "currency", "type": ["null", "string"], "default": null}
]}

Старый consumer прочитает данные новой версии — поле просто проигнорирует.

⚠️ Удаление поля = breaking change. Сначала задефолти поле в null, потом deprecate, потом удалять (через несколько релизов).

7.6 Производительность брокера: настройки JVM и OS

Заголовок раздела «7.6 Производительность брокера: настройки JVM и OS»

Прод-настройки broker:

  • JVM heap: 6–8 GB, G1GC, -XX:MaxGCPauseMillis=20.
  • swappiness: 1 (избегать swap).
  • file descriptors: 100k+ (ulimit -n).
  • net.core.rmem_max / wmem_max: 16 MB.
  • vm.dirty_ratio: 80, vm.dirty_background_ratio: 5.
  • FS: XFS, mount с noatime, nobarrier (если есть battery-backed RAID).
  • Малые messages с очень низким throughput (10 msg/sec) — overkill, NATS/RabbitMQ проще.
  • Очень большие messages (>1MB) — Kafka неэффективен, нужно chunking или Object Store.
  • RPC / request-reply — Kafka это log, не bus; для RPC берите gRPC/NATS.
  • Real-time с sub-ms latency — Kafka даёт ms-десятки ms; для high-frequency trading недостаточно.
  • Малая команда без DevOps — Kafka сложен в ops; Redpanda проще, NATS ещё проще.
  • RF=3, min.insync.replicas=2.
  • Producer acks=all, idempotent enabled.
  • Consumer manual commit + cooperative-sticky.
  • Static membership для критичных consumers.
  • DLQ для каждого критичного topic.
  • Schema Registry + BACKWARD compatibility.
  • Monitoring: lag, UR partitions, request latency.
  • Backup / DR: MirrorMaker 2 в DR-cluster.
  • Alerts: UR > 0, lag > threshold, free disk < 30%.
  • Runbook для broker failure, partition leader election, partition reassign.

  1. Apache Kafka Documentation 3.7+ — официально, базовый референс.
  2. Confluent: KRaft (KIP-500) — глубокий разбор KRaft.
  3. twmb/franz-go — клиент и его документация (содержит лучшие практики).
  4. Effective Kafka, 2nd ed. — Emil Koutanov, 2023.
  5. Kafka: The Definitive Guide, 2nd ed. — Narkhede et al., O’Reilly.
  6. Designing Event-Driven Systems — Ben Stopford.
  7. Redpanda Docs — Kafka-compatible альтернатива.
  8. Karapace (Aiven Schema Registry) — open-source Schema Registry.
  9. KIP-848 (Next-gen consumer groups) — будущее consumer groups.
  10. Microservices.io: Outbox Pattern — Chris Richardson.