Перейти к содержимому

29. Kafka на уровне протокола

Зачем знать на Middle 3: Senior, отвечающий за messaging-системы, должен понимать, что Kafka — это persistent commit log с binary wire-protocol, и как работают её главные элементы под капотом: ISR, leader epoch, consumer groups protocol, идемпотентный producer, транзакции, exactly-once. Без этого «retention», «replication.factor», «acks=all», «transactional.id» — пустые слова, и любая баг в проде превращается в guesswork. На уровне Middle 3 ты должен уметь объяснить, как Kafka переживает сбой контроллера, как cooperative rebalance отличается от eager, и в каких случаях acks=all всё ещё может потерять данные.


  1. Концепция
  2. Production-deep dive
  3. Gotchas
  4. Real cases
  5. Вопросы
  6. Practice
  7. Источники

Apache Kafka — distributed commit log. Не «очередь» в классическом смысле (RabbitMQ), а append-only лог сообщений, разбитый на topic’и → partitions → segments → records, реплицированный по нескольким brokers. Consumer’ы сами держат offset и решают, с какого места читать.

Producers ─────► ┌────────────────────────────────────────────┐
│ Kafka Cluster │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Broker1 │ │ Broker2 │ │ Broker3 │ │
│ │ │ │ │ │ │ │
│ │ P0(L) │ │ P0(F) │ │ P0(F) │ │
│ │ P1(F) │ │ P1(L) │ │ P1(F) │ │
│ │ P2(F) │ │ P2(F) │ │ P2(L) │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ┌─┴────────────┴────────────┴─┐ │
│ │ Controller (1 of brokers) │ │
│ │ (KRaft quorum or ZK) │ │
│ └─────────────────────────────┘ │
└────────────────────────────────────────────┘
Consumers (groups)

Каждая partition имеет:

  • Leader — обрабатывает все reads/writes.
  • Followers — реплицируют от лидера, могут стать лидерами при failover.
  • ISR (In-Sync Replicas) — followers, синхронные с лидером.

Kafka — binary protocol поверх TCP, не HTTP, не gRPC. Все клиенты (Java, librdkafka, kafka-go, sarama, franz-go) реализуют его сами.

Сообщение протокола: Request / Response с size prefix, версионированием API keys, схемой.

[size: int32]
[api_key: int16] # Produce=0, Fetch=1, Metadata=3, OffsetCommit=8 ...
[api_version: int16]
[correlation_id: int32]
[client_id: nullable_string]
[payload: ...]

API ключи (выбрано):

  • Produce (0) — отправка
  • Fetch (1) — получение
  • Metadata (3) — топология
  • OffsetCommit (8), OffsetFetch (9)
  • JoinGroup (11), Heartbeat (12), SyncGroup (14), LeaveGroup (13)
  • ApiVersions (18) — список поддерживаемых версий
  • InitProducerId (22), AddPartitionsToTxn (24), EndTxn (26)

Клиент обязан сначала вызвать ApiVersions и договориться о версиях.


Controller — один из брокеров, отвечающий за:

  • Метаданные кластера (топики, партиции, leadership).
  • Leader election при отказе брокера.
  • Partition reassignment.

Раньше: метаданные в ZooKeeper, controller — broker с локом в ZK.

С Kafka 3.3+ — KRaft (Kafka Raft) GA:

  • ZK не нужен.
  • Метаданные хранятся как встроенный log на quorum’е controller’ов (3 или 5 ноды).
  • Controllers могут быть совмещены с brokers (process.roles=broker,controller) или раздельно.
  • Snapshots для bootstrap новых controller’ов.

Преимущества KRaft:

  • Меньше moving parts (без ZK).
  • Быстрее controller failover (~секунды vs минуты).
  • Поддержка миллионов партиций (раньше упирались в ZK).

Конфиг (broker + controller mode):

process.roles=broker,controller
node.id=1
controller.quorum.voters=1@n1:9093,2@n2:9093,3@n3:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER

⚠️ KRaft и ZK режимы несовместимы; миграция — отдельный процесс (kafka-storage.sh, KIP-866).

Каждая partition на диске — это директория с segments:

/var/lib/kafka/data/my-topic-0/
00000000000000000000.log # сегмент 0, log file
00000000000000000000.index # offset index (sparse)
00000000000000000000.timeindex # timestamp index
00000000000005678901.log # сегмент со startOffset=5678901
00000000000005678901.index
00000000000005678901.timeindex
leader-epoch-checkpoint
partition.metadata
  • Segment — максимум 1GB или N дней (log.segment.bytes, log.roll.hours).
  • .log — собственно сообщения, записываются последовательно.
  • .index — sparse index (offset → byte position), шаг ~4KB.
  • .timeindex(timestamp → offset), для time-based lookup.
  • leader-epoch-checkpoint — каждый раз при смене лидера эпоха инкрементируется и фиксируется.

Запись — append к active segment’у, fsync контролируется log.flush.interval.messages / log.flush.interval.ms (по умолчанию полагается на OS pagecache + replication для durability).

Чтение — sendfile() syscall, zero-copy: данные летят из page cache в socket без копирования в user-space. Это и есть «секрет» throughput Kafka.

┌───────────────┐
│ Leader │
│ Log: 0..100 │
└────┬───┬──────┘
│ │
fetch │ │ fetch
┌─────────┘ └────────┐
▼ ▼
┌───────────────┐ ┌───────────────┐
│ Follower 1 │ │ Follower 2 │
│ Log: 0..98 │ │ Log: 0..100 │
│ (slow) │ │ │
└───────────────┘ └───────────────┘
↑ "lagging" ↑ "in sync"

ISR — followers, у которых:

  • replica.lag.time.max.ms (default 30s) — отставание по времени не больше порога.
  • Heartbeat не потерян.

High Watermark (HW) — наибольший offset, который реплицирован на ВСЕ ISR. Только сообщения до HW видны consumer’ам.

0 1 2 3 4 5 6 7 8 9 10
Leader log: ■ ■ ■ ■ ■ ■ ■ ■ ■ ■ ■
HW = 7 ─────────────────────────────►│
видны
consumer'ам
LEO (Log End Offset) = 10 ▲
невидимы

LEO (Log End Offset) — конец лога. На leader’е LEO > HW, если ISR не догнали.

min.insync.replicas (topic-level config) — минимальное число ISR для acks=all. Если в ISR ниже — acks=all Produce отвергается:

NotEnoughReplicasException

Без epoch’а возможен сценарий «zombie leader». Sequence:

  1. Broker A — leader, оffset 100, ISR = {A, B}.
  2. A падает / изолируется.
  3. B становится leader, начинает принимать запись от 100. Offset 101, 102.
  4. A приходит назад, думает «я leader», пишет в свой log offset 101, 102 (разные данные).

Leader epoch решает это. У каждой смены лидера — увеличивается epoch number. Followers при подключении к leader’у запрашивают OffsetForLeaderEpoch. Если их данные противоречат истинной истории — они усекают (truncate) лог до точки расхождения.

Epoch 1 (A leader): 0..100
Epoch 2 (B leader): 100..102 ← B видел epoch 1 до 100
A приходит, думает он leader epoch 1. Запросит у текущего leader B:
"мой epoch 1 кончился на 100" → truncate offset 101 на A, fetch с 100 от B.

Это вшито в Kafka 0.11+. Гарантирует, что нет «двух правд».

unclean.leader.election.enable=true — если ВСЕ ISR упали, выбрать кого-то ИЗ NOT-ISR как нового leader.

  • Плюс: availability.
  • Минус: новый leader не имеет последних сообщений → потеря данных.

⚠️ В production ВСЕГДА ставь unclean.leader.election.enable=false для критичных топиков. Лучше downtime, чем потеря.

Producer группирует записи в batch (один Produce request содержит много records):

ProduceRequest:
transactional_id (optional)
acks: int16 # 0, 1, -1 (=all)
timeout_ms
topic_data: [
{ topic, partition, records: RecordBatch }
]

acks:

acksПоведениеDurability
0Fire-and-forget. Producer не ждёт ACK.Низкая. Можно терять пачками.
1Ждёт ACK от leader.Средняя. Leader может упасть до replication.
-1 / allЖдёт ACK от leader + всех ISR (min.insync.replicas).Высокая. Zero data loss при min.insync.replicas ≥ 2.

Idempotent producer (enable.idempotence=true, default с 3.0):

  • Producer получает producer_id через InitProducerId.
  • Каждый record получает (producer_id, producer_epoch, sequence_number).
  • Leader дедуплицирует по (producer_id, partition, sequence). Дубликаты молча игнорируются.
  • Гарантия: exactly-once delivery в одну партицию.

Идемпотентность не работает между:

  • Партициями (один батч на разные партиции).
  • Сессиями producer’а (после рестарта новый producer_id).

Для межпартиционной atomicity нужен transactional producer.

Transactional producer:

producer.InitTransactions() // получаем producer_id (persistent через txnId)
producer.BeginTransaction()
producer.Produce(topic1, ...)
producer.Produce(topic2, ...)
// можем закоммитить consumer offsets как часть транзакции:
producer.SendOffsetsToTransaction(offsets, groupId)
producer.CommitTransaction() // или AbortTransaction()
  • Уникальный transactional.id (persistent через рестарты!) — нужен для recovery.
  • Один из broker’ов — transaction coordinator для txn-id.
  • Транзакция атомарна по нескольким партициям и offset commit.
  • Consumer должен читать с isolation.level=read_committed — иначе увидит «грязные» сообщения из неcommitted транзакций.

⚠️ Транзакции имеют overhead: лишние round trip’ы (Begin/End), markers в логах (txn-commit / txn-abort), latency растёт в 2-5 раз. Используй только там, где нужно EoS.

Consumer’ы объединяются в consumer group. Один из brokers — group coordinator (выбирается hash’ем group_id).

Pipeline:

client → FindCoordinator → coordinator
client → JoinGroup → coordinator
(coordinator выбирает leader группы — первого consumer'а)
leader-consumer → SyncGroup (с планом назначения partitions) → coordinator → всем
all consumers ← assignment ← coordinator
loop: Fetch + Heartbeat

Heartbeats:

  • heartbeat.interval.ms (3s default) — фоновый Heartbeat.
  • session.timeout.ms (45s) — если без heartbeat — consumer считается dead → rebalance.
  • max.poll.interval.ms (5min) — если poll() не вызывается → consumer считается «обработка зависла» → rebalance.

Rebalance protocols (assignors):

  1. Range (default до 3.0) — партиции одного топика делятся на консьюмеров «диапазонами». Может неравномерно распределить при разном числе топиков.
  2. RoundRobin — все партиции всех топиков равномерно по консьюмерам.
  3. Sticky — пытается сохранить максимум прежних назначений (минимизирует «переезд» state).
  4. CooperativeSticky (recommended с Kafka 2.4+) — incremental rebalance, не стопает всех consumer’ов разом, переносит партиции по одной.

Eager vs Cooperative:

Eager rebalance: stop-the-world
→ все consumers revoke all
→ JoinGroup
→ новые assignments
→ start consuming again
(downtime секунды-минуты)
Cooperative rebalance:
→ объявляется, какие партиции надо передать
→ только эти консьюмеры revoke их
→ новые консьюмеры подхватывают
(downtime микро-секунды для затронутых партиций)

Static membership (group.instance.id):

  • Consumer регистрируется со стабильным ID (через k8s StatefulSet pod name, например).
  • При рестарте в течение session.timeout.ms группа не делает rebalance — coordinator ждёт, что тот же group.instance.id вернётся.
  • Идеально для stateful consumers (Kafka Streams). Минус — если pod не вернулся, ждёт до timeout впустую.

Полная EoS — это read-process-write loop:

input topic → Consumer → process → Producer → output topic
атомарно:
commit producer txn +
commit consumer offsets

Conditions:

  • Producer: enable.idempotence=true, transactional.id=<persistent>.
  • Consumer: isolation.level=read_committed, enable.auto.commit=false.
  • Offsets коммитятся через SendOffsetsToTransaction().

Это даёт «exactly-once в Kafka». Но side effects вне Kafka (HTTP-call, DB-write) — снаружи EoS, и тебе нужны свои idempotency keys.

Topics с cleanup.policy=compact сохраняют последнее значение по ключу. Это lookup-store (Kafka Streams KTable, KSQL tables, метаданные).

  • Background log cleaner идёт по segments и удаляет старые версии одного ключа.
  • null-value = tombstone = удаление ключа.
  • min.cleanable.dirty.ratio (0.5) — порог запуска cleanup.

⚠️ Compaction не освобождает «текущий» (active) segment. Tombstones удерживаются delete.retention.ms (24h по умолчанию) — чтобы все consumers успели увидеть удаление.

Producer:

acks=all
enable.idempotence=true
linger.ms=10 # ждать 10ms для батча
batch.size=131072 # 128KB
compression.type=lz4 # или zstd
max.in.flight.requests.per.connection=5
buffer.memory=67108864 # 64MB

Увеличение linger.ms + batch.size — главный рычаг throughput.

Consumer:

fetch.min.bytes=1048576 # 1MB
fetch.max.wait.ms=500
max.partition.fetch.bytes=10485760
max.poll.records=500
session.timeout.ms=30000
heartbeat.interval.ms=3000
isolation.level=read_committed
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

Broker:

num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
num.replica.fetchers=4
replica.fetch.max.bytes=10485760
log.segment.bytes=1073741824

Must-have JMX/Prometheus метрики:

  • UnderReplicatedPartitions — > 0 = брокер отстаёт.
  • OfflinePartitionsCount — > 0 = катастрофа (нет лидера).
  • ActiveControllerCount — должен быть ровно 1 (на всём кластере).
  • ISR shrinks / expands rate — частая трясучка = плохо.
  • MessagesInPerSec, BytesInPerSec, BytesOutPerSec.
  • RequestQueueTimeMs, RequestQueueSize — насыщение брокера.
  • LeaderElectionRateAndTimeMs — частые выборы = плохо.
  • Consumer lag (через kafka-consumer-groups.sh или Burrow).
КлиентЗаметки
segmentio/kafka-goPure Go, чистый API, дружелюбный, но slightly slower
IBM/saramaPure Go, veteran (бывший Shopify/sarama), много фич, тяжёлая API
twmb/franz-goСамый быстрый pure Go, transactions, поддержка KRaft, recommended
confluentinc/confluent-kafka-goC-based (librdkafka), max performance, CGO

Для transactions + KRaft + sticky cooperative — franz-go лидер.

import "github.com/twmb/franz-go/pkg/kgo"
cl, _ := kgo.NewClient(
kgo.SeedBrokers("kafka:9092"),
kgo.RequiredAcks(kgo.AllISRAcks()),
kgo.ProducerLinger(10 * time.Millisecond),
kgo.ProducerBatchCompression(kgo.Lz4Compression()),
kgo.TransactionalID("my-app-1"),
)
defer cl.Close()
cl.BeginTransaction()
cl.ProduceSync(ctx, &kgo.Record{Topic: "out", Value: data})
cl.EndTransaction(ctx, kgo.TryCommit)

⚠️ 1. acks=all не гарантирует zero data loss без min.insync.replicas

Заголовок раздела «⚠️ 1. acks=all не гарантирует zero data loss без min.insync.replicas»

Если min.insync.replicas=1 и единственная ISR — leader, то «all» = «leader». При потере leader = потеря данных. Ставь min.insync.replicas=2 + replication.factor=3.

Всегда unclean.leader.election.enable=false для критичных топиков. Иначе при потере всех ISR Kafka возьмёт «грязного» лидера и потеряет последние сообщения.

Слишком мало партиций — узкое место throughput / параллелизма consumer’ов. Слишком много — overhead на broker (file handles, ISR-метрики). Правило: 1-2× CPUs кластера на топик, но не больше 4000-5000 на брокер.

⚠️ 4. Order гарантирован только в одной партиции

Заголовок раздела «⚠️ 4. Order гарантирован только в одной партиции»

Cross-partition ordering — нет. Если важен порядок по бизнес-ключу — partition by key. И НЕ менять число партиций (это меняет hash mapping).

⚠️ 5. max.in.flight.requests.per.connection > 1 без idempotence ломает order

Заголовок раздела «⚠️ 5. max.in.flight.requests.per.connection > 1 без idempotence ломает order»

Producer может повторно отправить retry → out-of-order. С enable.idempotence=true Kafka сама гарантирует order (max 5 in-flight allowed).

⚠️ 6. max.poll.interval.ms — частая причина rebalance’ов

Заголовок раздела «⚠️ 6. max.poll.interval.ms — частая причина rebalance’ов»

Long processing блокирует heartbeats. С Kafka 0.10.1+ heartbeat в отдельном потоке, и max.poll.interval.ms контролирует «как часто ты вызываешь poll()». Если обработка занимает > 5 минут — выгрузи в фоновый воркер, а poll() вызывай чаще.

Если консьюмер обрабатывает batch и в этот момент группа rebalance’ится — offset не commit’нется. После переподключения другой consumer перечитает. Только idempotent processing спасает.

Топик с cleanup.policy=compact хранит все уникальные ключи навсегда (до tombstone). Если ключи имеют high cardinality (UUID каждый раз новый) — топик растёт. Compaction подходит для конечного множества ключей.

__transaction_state topic должен иметь replication.factor >= 3. Default — 1, что в проде — катастрофа.

Consumer ждёт, пока транзакция commit/abort’нется. При abort’ах consumer пропускает записи. Дополнительный round-trip.

Многие приложения создают свой Kafka client на каждый pod/worker → тысячи соединений на брокере. Это упирается в socket.max.connections, max.connections.per.ip. Используй sharing client’а в приложении.

Между кластерами offsets смещаются (другой commit log). Для failover между кластерами нужен offset translation (MM2 умеет с 2.7+).

Если producer использует Avro + Schema Registry, и SR недоступен — producer виснет. Делай SR HA, тестируй.

При nearly full disk Kafka может уронить broker до того, как WAL флашнется. Алертить на Volume usage > 75%. Использовать log.retention.bytes per-partition.

При временной сетевой проблеме ISR может «схлопнуться» (shrink) до 1, и acks=all начнёт фейлить (если min.insync.replicas=2). Хорошо настроенный кластер — алерт + не паниковать, дождаться auto-expand.


Контекст: Кластер 30 брокеров, ZK 5 нод. Боль: ZK latency растёт с числом партиций (40k партиций).

Решение: миграция на KRaft по KIP-866:

  1. Подняли 3 dedicated controllers на KRaft.
  2. Включили dual-write mode (метаданные одновременно в ZK и KRaft).
  3. После catch-up — переключили на KRaft-only.
  4. Удалили ZK.

Результат: failover контроллера упал с 90s до 3s. Latency писем на __consumer_offsets упал в 5 раз.

Контекст: Платёжный сервис. Каждый payment event → балансы пользователей. Потеря = деньги, дубликат = деньги.

Решение:

  • Producer: enable.idempotence=true, transactional.id=payments-<pod-uuid> (persistent через ConfigMap).
  • Read-process-write: consume payments, обновить кэш балансов в Kafka (compacted topic), produce balance.updates, commit offsets ВНУТРИ транзакции.
  • DB-side: idempotency keys в Postgres + INSERT ON CONFLICT DO NOTHING.

При сбое пода: после рестарта producer с тем же transactional.id — старая транзакция aborts’ится coordinator’ом, новая стартует чисто.

Симптом: Каждые 5 минут все 200 consumers «дёргаются», 30 секунд лаг по 1М сообщений.

Расследование: max.poll.interval.ms = 300s, processing занимает иногда 6 минут. Поллинг блокировался — rebalance.

Решение:

  • Уменьшили max.poll.records=100.
  • Включили partition.assignment.strategy=CooperativeStickyAssignor.
  • Тяжёлая обработка вынесена в worker pool, poll() возвращается мгновенно.

Rebalance’ы исчезли.

Симптом: Раз в день ISR на брокере 3 схлопывается, потом восстанавливается. UnderReplicatedPartitions > 0 на минуту.

Расследование: JVM GC stop-the-world 35 секунд. Replica fetcher не успевал ack’нуть.

Решение: перешли на G1GC с MaxGCPauseMillis=20, увеличили heap до 16GB. GC паузы упали до 100ms.

Контекст: Logs producer 200MB/s, сеть упёрта в 1 Gbps.

Решение: Включили compression.type=zstd, увеличили linger.ms=50, batch.size=512KB. Эффективная пропускная способность выросла до 1.5 GB/s исходных данных (compression ratio ~7x).


  1. Что такое Kafka и чем она отличается от классической очереди (RabbitMQ)?
  2. Что такое partition, segment, offset?
  3. Как устроен wire-protocol Kafka? Зачем нужен ApiVersions?
  4. Что такое controller и какие функции он выполняет?
  5. Что такое KRaft и зачем убирали ZooKeeper?
  6. Что такое ISR и как она формируется?
  7. Что такое high watermark и LEO?
  8. Что такое leader epoch и как он защищает от split-brain?
  9. Что такое unclean leader election и почему его выключают?
  10. Какие значения acks бывают и при каком из них возможна потеря данных?
  11. Что такое idempotent producer и какие гарантии он даёт?
  12. Что такое transactional producer? Что такое transactional.id?
  13. Что такое transaction coordinator? Где живут метаданные транзакций?
  14. Что такое consumer group и group coordinator?
  15. Какие assignor’ы есть и в чём разница eager vs cooperative?
  16. Что такое static membership и зачем?
  17. Как настроить exactly-once для read-process-write?
  18. Что такое compacted topic и в каких случаях он подходит?
  19. Что такое tombstone и зачем оно нужно?
  20. Что такое min.insync.replicas и как оно связано с acks=all?
  21. Что произойдёт, если все ISR кроме leader’а упадут?
  22. Зачем нужен enable.idempotence при acks=all?
  23. Какая разница между max.in.flight.requests.per.connection=1 и >1?
  24. Что такое max.poll.interval.ms и почему его превышение вызывает rebalance?
  25. Как Kafka достигает high throughput? (sendfile, batching, compression)
  26. Какие основные JMX метрики мониторить в production?
  27. Чем franz-go отличается от sarama и kafka-go?
  28. Как сделать atomic write в несколько партиций?
  29. Что такое Mirror Maker 2 и зачем он нужен?
  30. Как переименовать партицию или изменить число партиций без потерь?

version: "3.9"
services:
kafka1:
image: bitnami/kafka:3.7
environment:
KAFKA_CFG_NODE_ID: 1
KAFKA_CFG_PROCESS_ROLES: broker,controller
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_CFG_MIN_INSYNC_REPLICAS: 2
ports: [ "9092:9092" ]
# kafka2, kafka3 — аналогично
cl, _ := kgo.NewClient(
kgo.SeedBrokers("kafka:9092"),
kgo.RequiredAcks(kgo.AllISRAcks()),
kgo.TransactionalID("demo-tx-1"),
kgo.MaxBufferedRecords(10_000),
)
defer cl.Close()
ctx := context.Background()
if err := cl.BeginTransaction(); err != nil { panic(err) }
cl.Produce(ctx, &kgo.Record{Topic: "out", Value: []byte("hello")}, nil)
if err := cl.Flush(ctx); err != nil { _ = cl.EndTransaction(ctx, kgo.TryAbort); panic(err) }
if err := cl.EndTransaction(ctx, kgo.TryCommit); err != nil { panic(err) }
cl, _ := kgo.NewClient(
kgo.SeedBrokers("kafka:9092"),
kgo.ConsumeTopics("events"),
kgo.ConsumerGroup("workers"),
kgo.Balancers(kgo.CooperativeStickyBalancer()),
kgo.SessionTimeout(30*time.Second),
kgo.InstanceID("pod-0"), // static membership
)
defer cl.Close()
for {
fetches := cl.PollFetches(ctx)
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
for _, r := range p.Records {
handle(r)
}
})
}

Реализуй loop: читать из in, обработать, записать в out, закомитить offset — всё внутри одной транзакции (SendOffsetsToTransaction в franz-go).

Запусти 3 consumers, кильни один, посмотри в логах — был ли это eager или cooperative rebalance.

Создай compacted topic, пиши с одним и тем же key, через kafka-dump-log.sh глянь segment’ы — увидь, что cleaner удалил старые версии.


  1. Apache Kafka Documentation. https://kafka.apache.org/documentation/
  2. KIP-101: Leader Epoch. https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation
  3. KIP-500: KRaft Mode. https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum
  4. KIP-98: Exactly-Once Delivery and Transactional Messaging. https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
  5. KIP-429: Incremental Cooperative Rebalancing. https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol
  6. KIP-866: ZK to KRaft Migration. https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration
  7. Confluent Blog — Exactly Once Semantics. https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
  8. «Kafka: The Definitive Guide» (2nd ed.) — Gwen Shapira, Todd Palino, Rajini Sivaram, Krit Petty.
  9. franz-go. https://github.com/twmb/franz-go
  10. sarama. https://github.com/IBM/sarama
  11. kafka-go (segmentio). https://github.com/segmentio/kafka-go
  12. librdkafka. https://github.com/confluentinc/librdkafka
  13. «Apache Kafka In Action» — Aleksandr Kovalev (KRaft, 3.x).
  14. Strimzi (Kafka on Kubernetes). https://strimzi.io/
  15. Cruise Control (Kafka rebalancing). https://github.com/linkedin/cruise-control

В Kafka 0.11+ структура записи на диске — RecordBatch v2 (compressed):

RecordBatch:
baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (= 2)
crc: int32
attributes: int16 (compression, timestamp type, transactional, control)
lastOffsetDelta: int32
baseTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
recordsCount: int32
records: [Record]
Record:
length: varint
attributes: int8
timestampDelta: varint
offsetDelta: varint
keyLength: varint
key: bytes
valueLength: varint
value: bytes
headersCount: varint
headers: [Header]

Ключевые поля:

  • producerId + producerEpoch + baseSequence — для idempotency.
  • partitionLeaderEpoch — для leader epoch протокола (truncation).
  • attributes — флаги: compression type, transactional, control batch (commit/abort markers).
  • offsetDelta — относительный offset внутри batch (экономит место).

Kafka Streams — клиентская библиотека (Java) для stream processing поверх Kafka. Stateful processing с RocksDB-state stores, локально хранящимися на consumer’е, и change-log topic’ом для recovery.

Ключевые абстракции:

  • KStream — поток событий.
  • KTable — таблица (compacted topic) с последним значением по ключу.
  • GlobalKTable — read-only глобальная копия для join’ов.
KStream<String, Order> orders = builder.stream("orders");
KStream<String, EnrichedOrder> enriched = orders.join(usersTable,
(orderKey, order) -> order.userId,
(order, user) -> enrich(order, user));
enriched.to("enriched-orders");

KSQL / ksqlDB — SQL поверх Kafka Streams. Можно писать:

CREATE STREAM orders (id BIGINT, amount DOUBLE, user_id BIGINT)
WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON');
CREATE TABLE order_stats AS
SELECT user_id, COUNT(*), SUM(amount) FROM orders GROUP BY user_id;

Go-эквивалентов production-grade Streams нет; обычно state store строят руками поверх franz-go.

  • Strimzi — Apache 2.0, наиболее популярный operator. CRD: Kafka, KafkaTopic, KafkaUser. Поддерживает KRaft.
  • Confluent for Kubernetes (CFK) — коммерческий.
  • Banzai Cloud Koperator — устарел, не developed actively.

Strimzi пример:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.7.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
storage: { type: persistent-claim, size: 100Gi, deleteClaim: false }
entityOperator:
topicOperator: {}
userOperator: {}
import "github.com/twmb/franz-go/pkg/kgo"
cl, _ := kgo.NewClient(
kgo.SeedBrokers("k1:9092", "k2:9092", "k3:9092"),
kgo.RequiredAcks(kgo.AllISRAcks()),
kgo.MaxBufferedRecords(50_000),
kgo.ProducerBatchMaxBytes(1_000_000),
kgo.ProducerLinger(5*time.Millisecond),
kgo.RecordRetries(math.MaxInt32), // ретраить до победного (с idempotency = безопасно)
kgo.RetryBackoffFn(func(int) time.Duration { return 100*time.Millisecond }),
kgo.TransactionalID("payments-v1"),
kgo.TransactionTimeout(60*time.Second),
kgo.ProducerBatchCompression(kgo.ZstdCompression()),
)

И consumer:

cl2, _ := kgo.NewClient(
kgo.SeedBrokers("k1:9092"),
kgo.ConsumerGroup("payments-processor"),
kgo.ConsumeTopics("payments.in"),
kgo.FetchIsolationLevel(kgo.ReadCommitted()),
kgo.Balancers(kgo.CooperativeStickyBalancer()),
kgo.SessionTimeout(30*time.Second),
kgo.DisableAutoCommit(),
)

Перед prod-релизом тестируй:

  • Kill leader broker → проверить, что producer ретраит без потерь.
  • Kill consumer pod → rebalance протекает, нет дубликатов под EoS.
  • Network partition controller → выборы нового через KRaft → < 10s.
  • Disk full → broker честно отказывает producer’ам, не корраптится.
  • Schema breaking change → consumer падает в DLQ, не блокирует.

Инструмент: Chaos Mesh / Litmus / собственные скрипты tc netem + kill -9.