29. Kafka на уровне протокола
Зачем знать на Middle 3: Senior, отвечающий за messaging-системы, должен понимать, что Kafka — это persistent commit log с binary wire-protocol, и как работают её главные элементы под капотом: ISR, leader epoch, consumer groups protocol, идемпотентный producer, транзакции, exactly-once. Без этого «retention», «replication.factor», «acks=all», «transactional.id» — пустые слова, и любая баг в проде превращается в guesswork. На уровне Middle 3 ты должен уметь объяснить, как Kafka переживает сбой контроллера, как cooperative rebalance отличается от eager, и в каких случаях
acks=allвсё ещё может потерять данные.
Содержание
Заголовок раздела «Содержание»1. Концепция
Заголовок раздела «1. Концепция»Что такое Kafka
Заголовок раздела «Что такое Kafka»Apache Kafka — distributed commit log. Не «очередь» в классическом смысле (RabbitMQ), а append-only лог сообщений, разбитый на topic’и → partitions → segments → records, реплицированный по нескольким brokers. Consumer’ы сами держат offset и решают, с какого места читать.
Высокоуровневая архитектура
Заголовок раздела «Высокоуровневая архитектура»Producers ─────► ┌────────────────────────────────────────────┐ │ Kafka Cluster │ │ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ Broker1 │ │ Broker2 │ │ Broker3 │ │ │ │ │ │ │ │ │ │ │ │ P0(L) │ │ P0(F) │ │ P0(F) │ │ │ │ P1(F) │ │ P1(L) │ │ P1(F) │ │ │ │ P2(F) │ │ P2(F) │ │ P2(L) │ │ │ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ │ │ │ │ ┌─┴────────────┴────────────┴─┐ │ │ │ Controller (1 of brokers) │ │ │ │ (KRaft quorum or ZK) │ │ │ └─────────────────────────────┘ │ └────────────────────────────────────────────┘ ▲ │ Consumers (groups)Каждая partition имеет:
- Leader — обрабатывает все reads/writes.
- Followers — реплицируют от лидера, могут стать лидерами при failover.
- ISR (In-Sync Replicas) — followers, синхронные с лидером.
Wire-protocol
Заголовок раздела «Wire-protocol»Kafka — binary protocol поверх TCP, не HTTP, не gRPC. Все клиенты (Java, librdkafka, kafka-go, sarama, franz-go) реализуют его сами.
Сообщение протокола: Request / Response с size prefix, версионированием API keys, схемой.
[size: int32][api_key: int16] # Produce=0, Fetch=1, Metadata=3, OffsetCommit=8 ...[api_version: int16][correlation_id: int32][client_id: nullable_string][payload: ...]API ключи (выбрано):
Produce(0) — отправкаFetch(1) — получениеMetadata(3) — топологияOffsetCommit(8),OffsetFetch(9)JoinGroup(11),Heartbeat(12),SyncGroup(14),LeaveGroup(13)ApiVersions(18) — список поддерживаемых версийInitProducerId(22),AddPartitionsToTxn(24),EndTxn(26)
Клиент обязан сначала вызвать ApiVersions и договориться о версиях.
2. Production-deep dive
Заголовок раздела «2. Production-deep dive»2.1. Controller, ZK и KRaft
Заголовок раздела «2.1. Controller, ZK и KRaft»Controller — один из брокеров, отвечающий за:
- Метаданные кластера (топики, партиции, leadership).
- Leader election при отказе брокера.
- Partition reassignment.
Раньше: метаданные в ZooKeeper, controller — broker с локом в ZK.
С Kafka 3.3+ — KRaft (Kafka Raft) GA:
- ZK не нужен.
- Метаданные хранятся как встроенный log на quorum’е controller’ов (3 или 5 ноды).
- Controllers могут быть совмещены с brokers (
process.roles=broker,controller) или раздельно. - Snapshots для bootstrap новых controller’ов.
Преимущества KRaft:
- Меньше moving parts (без ZK).
- Быстрее controller failover (~секунды vs минуты).
- Поддержка миллионов партиций (раньше упирались в ZK).
Конфиг (broker + controller mode):
process.roles=broker,controllernode.id=1controller.quorum.voters=1@n1:9093,2@n2:9093,3@n3:9093listeners=PLAINTEXT://:9092,CONTROLLER://:9093inter.broker.listener.name=PLAINTEXTcontroller.listener.names=CONTROLLER⚠️ KRaft и ZK режимы несовместимы; миграция — отдельный процесс (kafka-storage.sh, KIP-866).
2.2. Log structure
Заголовок раздела «2.2. Log structure»Каждая partition на диске — это директория с segments:
/var/lib/kafka/data/my-topic-0/ 00000000000000000000.log # сегмент 0, log file 00000000000000000000.index # offset index (sparse) 00000000000000000000.timeindex # timestamp index 00000000000005678901.log # сегмент со startOffset=5678901 00000000000005678901.index 00000000000005678901.timeindex leader-epoch-checkpoint partition.metadata- Segment — максимум 1GB или N дней (
log.segment.bytes,log.roll.hours). - .log — собственно сообщения, записываются последовательно.
- .index — sparse index
(offset → byte position), шаг ~4KB. - .timeindex —
(timestamp → offset), для time-based lookup. - leader-epoch-checkpoint — каждый раз при смене лидера эпоха инкрементируется и фиксируется.
Запись — append к active segment’у, fsync контролируется log.flush.interval.messages / log.flush.interval.ms (по умолчанию полагается на OS pagecache + replication для durability).
Чтение — sendfile() syscall, zero-copy: данные летят из page cache в socket без копирования в user-space. Это и есть «секрет» throughput Kafka.
2.3. Replication, ISR, high watermark
Заголовок раздела «2.3. Replication, ISR, high watermark» ┌───────────────┐ │ Leader │ │ Log: 0..100 │ └────┬───┬──────┘ │ │ fetch │ │ fetch ┌─────────┘ └────────┐ ▼ ▼ ┌───────────────┐ ┌───────────────┐ │ Follower 1 │ │ Follower 2 │ │ Log: 0..98 │ │ Log: 0..100 │ │ (slow) │ │ │ └───────────────┘ └───────────────┘ ↑ "lagging" ↑ "in sync"ISR — followers, у которых:
replica.lag.time.max.ms(default 30s) — отставание по времени не больше порога.- Heartbeat не потерян.
High Watermark (HW) — наибольший offset, который реплицирован на ВСЕ ISR. Только сообщения до HW видны consumer’ам.
0 1 2 3 4 5 6 7 8 9 10Leader log: ■ ■ ■ ■ ■ ■ ■ ■ ■ ■ ■HW = 7 ─────────────────────────────►│ ▲ видны consumer'амLEO (Log End Offset) = 10 ▲ невидимыLEO (Log End Offset) — конец лога. На leader’е LEO > HW, если ISR не догнали.
min.insync.replicas (topic-level config) — минимальное число ISR для acks=all. Если в ISR ниже — acks=all Produce отвергается:
NotEnoughReplicasException2.4. Leader epoch (защита от split-brain)
Заголовок раздела «2.4. Leader epoch (защита от split-brain)»Без epoch’а возможен сценарий «zombie leader». Sequence:
- Broker A — leader, оffset 100, ISR = {A, B}.
- A падает / изолируется.
- B становится leader, начинает принимать запись от 100. Offset 101, 102.
- A приходит назад, думает «я leader», пишет в свой log offset 101, 102 (разные данные).
Leader epoch решает это. У каждой смены лидера — увеличивается epoch number. Followers при подключении к leader’у запрашивают OffsetForLeaderEpoch. Если их данные противоречат истинной истории — они усекают (truncate) лог до точки расхождения.
Epoch 1 (A leader): 0..100Epoch 2 (B leader): 100..102 ← B видел epoch 1 до 100A приходит, думает он leader epoch 1. Запросит у текущего leader B:"мой epoch 1 кончился на 100" → truncate offset 101 на A, fetch с 100 от B.Это вшито в Kafka 0.11+. Гарантирует, что нет «двух правд».
2.5. Unclean leader election
Заголовок раздела «2.5. Unclean leader election»unclean.leader.election.enable=true — если ВСЕ ISR упали, выбрать кого-то ИЗ NOT-ISR как нового leader.
- Плюс: availability.
- Минус: новый leader не имеет последних сообщений → потеря данных.
⚠️ В production ВСЕГДА ставь unclean.leader.election.enable=false для критичных топиков. Лучше downtime, чем потеря.
2.6. Producer protocol
Заголовок раздела «2.6. Producer protocol»Producer группирует записи в batch (один Produce request содержит много records):
ProduceRequest: transactional_id (optional) acks: int16 # 0, 1, -1 (=all) timeout_ms topic_data: [ { topic, partition, records: RecordBatch } ]acks:
| acks | Поведение | Durability |
|---|---|---|
| 0 | Fire-and-forget. Producer не ждёт ACK. | Низкая. Можно терять пачками. |
| 1 | Ждёт ACK от leader. | Средняя. Leader может упасть до replication. |
| -1 / all | Ждёт ACK от leader + всех ISR (min.insync.replicas). | Высокая. Zero data loss при min.insync.replicas ≥ 2. |
Idempotent producer (enable.idempotence=true, default с 3.0):
- Producer получает
producer_idчерезInitProducerId. - Каждый record получает
(producer_id, producer_epoch, sequence_number). - Leader дедуплицирует по
(producer_id, partition, sequence). Дубликаты молча игнорируются. - Гарантия: exactly-once delivery в одну партицию.
Идемпотентность не работает между:
- Партициями (один батч на разные партиции).
- Сессиями producer’а (после рестарта новый
producer_id).
Для межпартиционной atomicity нужен transactional producer.
Transactional producer:
producer.InitTransactions() // получаем producer_id (persistent через txnId)producer.BeginTransaction()producer.Produce(topic1, ...)producer.Produce(topic2, ...)// можем закоммитить consumer offsets как часть транзакции:producer.SendOffsetsToTransaction(offsets, groupId)producer.CommitTransaction() // или AbortTransaction()- Уникальный
transactional.id(persistent через рестарты!) — нужен для recovery. - Один из broker’ов — transaction coordinator для txn-id.
- Транзакция атомарна по нескольким партициям и offset commit.
- Consumer должен читать с
isolation.level=read_committed— иначе увидит «грязные» сообщения из неcommitted транзакций.
⚠️ Транзакции имеют overhead: лишние round trip’ы (Begin/End), markers в логах (txn-commit / txn-abort), latency растёт в 2-5 раз. Используй только там, где нужно EoS.
2.7. Consumer protocol
Заголовок раздела «2.7. Consumer protocol»Consumer’ы объединяются в consumer group. Один из brokers — group coordinator (выбирается hash’ем group_id).
Pipeline:
client → FindCoordinator → coordinatorclient → JoinGroup → coordinator (coordinator выбирает leader группы — первого consumer'а)leader-consumer → SyncGroup (с планом назначения partitions) → coordinator → всемall consumers ← assignment ← coordinatorloop: Fetch + HeartbeatHeartbeats:
heartbeat.interval.ms(3s default) — фоновый Heartbeat.session.timeout.ms(45s) — если без heartbeat — consumer считается dead → rebalance.max.poll.interval.ms(5min) — если poll() не вызывается → consumer считается «обработка зависла» → rebalance.
Rebalance protocols (assignors):
- Range (default до 3.0) — партиции одного топика делятся на консьюмеров «диапазонами». Может неравномерно распределить при разном числе топиков.
- RoundRobin — все партиции всех топиков равномерно по консьюмерам.
- Sticky — пытается сохранить максимум прежних назначений (минимизирует «переезд» state).
- CooperativeSticky (recommended с Kafka 2.4+) — incremental rebalance, не стопает всех consumer’ов разом, переносит партиции по одной.
Eager vs Cooperative:
Eager rebalance: stop-the-world → все consumers revoke all → JoinGroup → новые assignments → start consuming again (downtime секунды-минуты)
Cooperative rebalance: → объявляется, какие партиции надо передать → только эти консьюмеры revoke их → новые консьюмеры подхватывают (downtime микро-секунды для затронутых партиций)Static membership (group.instance.id):
- Consumer регистрируется со стабильным ID (через k8s StatefulSet pod name, например).
- При рестарте в течение
session.timeout.msгруппа не делает rebalance — coordinator ждёт, что тот жеgroup.instance.idвернётся. - Идеально для stateful consumers (Kafka Streams). Минус — если pod не вернулся, ждёт до timeout впустую.
2.8. Exactly-Once Semantics (EoS)
Заголовок раздела «2.8. Exactly-Once Semantics (EoS)»Полная EoS — это read-process-write loop:
input topic → Consumer → process → Producer → output topic ↑ │ атомарно: commit producer txn + commit consumer offsetsConditions:
- Producer:
enable.idempotence=true,transactional.id=<persistent>. - Consumer:
isolation.level=read_committed,enable.auto.commit=false. - Offsets коммитятся через
SendOffsetsToTransaction().
Это даёт «exactly-once в Kafka». Но side effects вне Kafka (HTTP-call, DB-write) — снаружи EoS, и тебе нужны свои idempotency keys.
2.9. Compaction
Заголовок раздела «2.9. Compaction»Topics с cleanup.policy=compact сохраняют последнее значение по ключу. Это lookup-store (Kafka Streams KTable, KSQL tables, метаданные).
- Background log cleaner идёт по segments и удаляет старые версии одного ключа.
null-value = tombstone = удаление ключа.min.cleanable.dirty.ratio(0.5) — порог запуска cleanup.
⚠️ Compaction не освобождает «текущий» (active) segment. Tombstones удерживаются delete.retention.ms (24h по умолчанию) — чтобы все consumers успели увидеть удаление.
2.10. Performance tuning
Заголовок раздела «2.10. Performance tuning»Producer:
acks=allenable.idempotence=truelinger.ms=10 # ждать 10ms для батчаbatch.size=131072 # 128KBcompression.type=lz4 # или zstdmax.in.flight.requests.per.connection=5buffer.memory=67108864 # 64MBУвеличение linger.ms + batch.size — главный рычаг throughput.
Consumer:
fetch.min.bytes=1048576 # 1MBfetch.max.wait.ms=500max.partition.fetch.bytes=10485760max.poll.records=500session.timeout.ms=30000heartbeat.interval.ms=3000isolation.level=read_committedpartition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignorBroker:
num.network.threads=8num.io.threads=16socket.send.buffer.bytes=1048576socket.receive.buffer.bytes=1048576socket.request.max.bytes=104857600num.replica.fetchers=4replica.fetch.max.bytes=10485760log.segment.bytes=10737418242.11. Мониторинг
Заголовок раздела «2.11. Мониторинг»Must-have JMX/Prometheus метрики:
- UnderReplicatedPartitions — > 0 = брокер отстаёт.
- OfflinePartitionsCount — > 0 = катастрофа (нет лидера).
- ActiveControllerCount — должен быть ровно 1 (на всём кластере).
- ISR shrinks / expands rate — частая трясучка = плохо.
- MessagesInPerSec, BytesInPerSec, BytesOutPerSec.
- RequestQueueTimeMs, RequestQueueSize — насыщение брокера.
- LeaderElectionRateAndTimeMs — частые выборы = плохо.
- Consumer lag (через
kafka-consumer-groups.shили Burrow).
2.12. Go clients
Заголовок раздела «2.12. Go clients»| Клиент | Заметки |
|---|---|
segmentio/kafka-go | Pure Go, чистый API, дружелюбный, но slightly slower |
IBM/sarama | Pure Go, veteran (бывший Shopify/sarama), много фич, тяжёлая API |
twmb/franz-go | Самый быстрый pure Go, transactions, поддержка KRaft, recommended |
confluentinc/confluent-kafka-go | C-based (librdkafka), max performance, CGO |
Для transactions + KRaft + sticky cooperative — franz-go лидер.
import "github.com/twmb/franz-go/pkg/kgo"
cl, _ := kgo.NewClient( kgo.SeedBrokers("kafka:9092"), kgo.RequiredAcks(kgo.AllISRAcks()), kgo.ProducerLinger(10 * time.Millisecond), kgo.ProducerBatchCompression(kgo.Lz4Compression()), kgo.TransactionalID("my-app-1"),)defer cl.Close()
cl.BeginTransaction()cl.ProduceSync(ctx, &kgo.Record{Topic: "out", Value: data})cl.EndTransaction(ctx, kgo.TryCommit)3. Gotchas (12+)
Заголовок раздела «3. Gotchas (12+)»⚠️ 1. acks=all не гарантирует zero data loss без min.insync.replicas
Заголовок раздела «⚠️ 1. acks=all не гарантирует zero data loss без min.insync.replicas»Если min.insync.replicas=1 и единственная ISR — leader, то «all» = «leader». При потере leader = потеря данных. Ставь min.insync.replicas=2 + replication.factor=3.
⚠️ 2. Unclean leader election в проде
Заголовок раздела «⚠️ 2. Unclean leader election в проде»Всегда unclean.leader.election.enable=false для критичных топиков. Иначе при потере всех ISR Kafka возьмёт «грязного» лидера и потеряет последние сообщения.
⚠️ 3. Default partition count
Заголовок раздела «⚠️ 3. Default partition count»Слишком мало партиций — узкое место throughput / параллелизма consumer’ов. Слишком много — overhead на broker (file handles, ISR-метрики). Правило: 1-2× CPUs кластера на топик, но не больше 4000-5000 на брокер.
⚠️ 4. Order гарантирован только в одной партиции
Заголовок раздела «⚠️ 4. Order гарантирован только в одной партиции»Cross-partition ordering — нет. Если важен порядок по бизнес-ключу — partition by key. И НЕ менять число партиций (это меняет hash mapping).
⚠️ 5. max.in.flight.requests.per.connection > 1 без idempotence ломает order
Заголовок раздела «⚠️ 5. max.in.flight.requests.per.connection > 1 без idempotence ломает order»Producer может повторно отправить retry → out-of-order. С enable.idempotence=true Kafka сама гарантирует order (max 5 in-flight allowed).
⚠️ 6. max.poll.interval.ms — частая причина rebalance’ов
Заголовок раздела «⚠️ 6. max.poll.interval.ms — частая причина rebalance’ов»Long processing блокирует heartbeats. С Kafka 0.10.1+ heartbeat в отдельном потоке, и max.poll.interval.ms контролирует «как часто ты вызываешь poll()». Если обработка занимает > 5 минут — выгрузи в фоновый воркер, а poll() вызывай чаще.
⚠️ 7. Rebalance потеря: revoke не committed offsets
Заголовок раздела «⚠️ 7. Rebalance потеря: revoke не committed offsets»Если консьюмер обрабатывает batch и в этот момент группа rebalance’ится — offset не commit’нется. После переподключения другой consumer перечитает. Только idempotent processing спасает.
⚠️ 8. Compaction не для всех топиков
Заголовок раздела «⚠️ 8. Compaction не для всех топиков»Топик с cleanup.policy=compact хранит все уникальные ключи навсегда (до tombstone). Если ключи имеют high cardinality (UUID каждый раз новый) — топик растёт. Compaction подходит для конечного множества ключей.
⚠️ 9. Транзакции и replication.factor
Заголовок раздела «⚠️ 9. Транзакции и replication.factor»__transaction_state topic должен иметь replication.factor >= 3. Default — 1, что в проде — катастрофа.
⚠️ 10. Read_committed замедляет consumer
Заголовок раздела «⚠️ 10. Read_committed замедляет consumer»Consumer ждёт, пока транзакция commit/abort’нется. При abort’ах consumer пропускает записи. Дополнительный round-trip.
⚠️ 11. Connection limits на брокере
Заголовок раздела «⚠️ 11. Connection limits на брокере»Многие приложения создают свой Kafka client на каждый pod/worker → тысячи соединений на брокере. Это упирается в socket.max.connections, max.connections.per.ip. Используй sharing client’а в приложении.
⚠️ 12. Mirror Maker 2 не сохраняет offsets
Заголовок раздела «⚠️ 12. Mirror Maker 2 не сохраняет offsets»Между кластерами offsets смещаются (другой commit log). Для failover между кластерами нужен offset translation (MM2 умеет с 2.7+).
⚠️ 13. Schema registry + transactions
Заголовок раздела «⚠️ 13. Schema registry + transactions»Если producer использует Avro + Schema Registry, и SR недоступен — producer виснет. Делай SR HA, тестируй.
⚠️ 14. Disk full → corruption
Заголовок раздела «⚠️ 14. Disk full → corruption»При nearly full disk Kafka может уронить broker до того, как WAL флашнется. Алертить на Volume usage > 75%. Использовать log.retention.bytes per-partition.
⚠️ 15. Network partition: ISR shrink
Заголовок раздела «⚠️ 15. Network partition: ISR shrink»При временной сетевой проблеме ISR может «схлопнуться» (shrink) до 1, и acks=all начнёт фейлить (если min.insync.replicas=2). Хорошо настроенный кластер — алерт + не паниковать, дождаться auto-expand.
4. Real cases
Заголовок раздела «4. Real cases»Case 1: KRaft migration
Заголовок раздела «Case 1: KRaft migration»Контекст: Кластер 30 брокеров, ZK 5 нод. Боль: ZK latency растёт с числом партиций (40k партиций).
Решение: миграция на KRaft по KIP-866:
- Подняли 3 dedicated controllers на KRaft.
- Включили dual-write mode (метаданные одновременно в ZK и KRaft).
- После catch-up — переключили на KRaft-only.
- Удалили ZK.
Результат: failover контроллера упал с 90s до 3s. Latency писем на __consumer_offsets упал в 5 раз.
Case 2: EoS для финансов
Заголовок раздела «Case 2: EoS для финансов»Контекст: Платёжный сервис. Каждый payment event → балансы пользователей. Потеря = деньги, дубликат = деньги.
Решение:
- Producer:
enable.idempotence=true,transactional.id=payments-<pod-uuid>(persistent через ConfigMap). - Read-process-write: consume
payments, обновить кэш балансов в Kafka (compacted topic), producebalance.updates, commit offsets ВНУТРИ транзакции. - DB-side: idempotency keys в Postgres +
INSERT ON CONFLICT DO NOTHING.
При сбое пода: после рестарта producer с тем же transactional.id — старая транзакция aborts’ится coordinator’ом, новая стартует чисто.
Case 3: Rebalance storm
Заголовок раздела «Case 3: Rebalance storm»Симптом: Каждые 5 минут все 200 consumers «дёргаются», 30 секунд лаг по 1М сообщений.
Расследование: max.poll.interval.ms = 300s, processing занимает иногда 6 минут. Поллинг блокировался — rebalance.
Решение:
- Уменьшили
max.poll.records=100. - Включили
partition.assignment.strategy=CooperativeStickyAssignor. - Тяжёлая обработка вынесена в worker pool, poll() возвращается мгновенно.
Rebalance’ы исчезли.
Case 4: ISR shrink из-за GC
Заголовок раздела «Case 4: ISR shrink из-за GC»Симптом: Раз в день ISR на брокере 3 схлопывается, потом восстанавливается. UnderReplicatedPartitions > 0 на минуту.
Расследование: JVM GC stop-the-world 35 секунд. Replica fetcher не успевал ack’нуть.
Решение: перешли на G1GC с MaxGCPauseMillis=20, увеличили heap до 16GB. GC паузы упали до 100ms.
Case 5: Throughput +10x через compression
Заголовок раздела «Case 5: Throughput +10x через compression»Контекст: Logs producer 200MB/s, сеть упёрта в 1 Gbps.
Решение: Включили compression.type=zstd, увеличили linger.ms=50, batch.size=512KB. Эффективная пропускная способность выросла до 1.5 GB/s исходных данных (compression ratio ~7x).
5. Вопросы (30)
Заголовок раздела «5. Вопросы (30)»- Что такое Kafka и чем она отличается от классической очереди (RabbitMQ)?
- Что такое partition, segment, offset?
- Как устроен wire-protocol Kafka? Зачем нужен
ApiVersions? - Что такое controller и какие функции он выполняет?
- Что такое KRaft и зачем убирали ZooKeeper?
- Что такое ISR и как она формируется?
- Что такое high watermark и LEO?
- Что такое leader epoch и как он защищает от split-brain?
- Что такое unclean leader election и почему его выключают?
- Какие значения
acksбывают и при каком из них возможна потеря данных? - Что такое idempotent producer и какие гарантии он даёт?
- Что такое transactional producer? Что такое
transactional.id? - Что такое transaction coordinator? Где живут метаданные транзакций?
- Что такое consumer group и group coordinator?
- Какие assignor’ы есть и в чём разница eager vs cooperative?
- Что такое static membership и зачем?
- Как настроить exactly-once для read-process-write?
- Что такое compacted topic и в каких случаях он подходит?
- Что такое tombstone и зачем оно нужно?
- Что такое
min.insync.replicasи как оно связано сacks=all? - Что произойдёт, если все ISR кроме leader’а упадут?
- Зачем нужен
enable.idempotenceприacks=all? - Какая разница между
max.in.flight.requests.per.connection=1и >1? - Что такое
max.poll.interval.msи почему его превышение вызывает rebalance? - Как Kafka достигает high throughput? (sendfile, batching, compression)
- Какие основные JMX метрики мониторить в production?
- Чем
franz-goотличается отsaramaиkafka-go? - Как сделать atomic write в несколько партиций?
- Что такое Mirror Maker 2 и зачем он нужен?
- Как переименовать партицию или изменить число партиций без потерь?
6. Practice
Заголовок раздела «6. Practice»6.1. Локальный KRaft cluster (Docker)
Заголовок раздела «6.1. Локальный KRaft cluster (Docker)»version: "3.9"services: kafka1: image: bitnami/kafka:3.7 environment: KAFKA_CFG_NODE_ID: 1 KAFKA_CFG_PROCESS_ROLES: broker,controller KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9093,2@kafka2:9093,3@kafka3:9093 KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: 3 KAFKA_CFG_MIN_INSYNC_REPLICAS: 2 ports: [ "9092:9092" ] # kafka2, kafka3 — аналогично6.2. Transactional producer на franz-go
Заголовок раздела «6.2. Transactional producer на franz-go»cl, _ := kgo.NewClient( kgo.SeedBrokers("kafka:9092"), kgo.RequiredAcks(kgo.AllISRAcks()), kgo.TransactionalID("demo-tx-1"), kgo.MaxBufferedRecords(10_000),)defer cl.Close()ctx := context.Background()
if err := cl.BeginTransaction(); err != nil { panic(err) }cl.Produce(ctx, &kgo.Record{Topic: "out", Value: []byte("hello")}, nil)if err := cl.Flush(ctx); err != nil { _ = cl.EndTransaction(ctx, kgo.TryAbort); panic(err) }if err := cl.EndTransaction(ctx, kgo.TryCommit); err != nil { panic(err) }6.3. Consumer с cooperative-sticky
Заголовок раздела «6.3. Consumer с cooperative-sticky»cl, _ := kgo.NewClient( kgo.SeedBrokers("kafka:9092"), kgo.ConsumeTopics("events"), kgo.ConsumerGroup("workers"), kgo.Balancers(kgo.CooperativeStickyBalancer()), kgo.SessionTimeout(30*time.Second), kgo.InstanceID("pod-0"), // static membership)defer cl.Close()for { fetches := cl.PollFetches(ctx) fetches.EachPartition(func(p kgo.FetchTopicPartition) { for _, r := range p.Records { handle(r) } })}6.4. EoS read-process-write
Заголовок раздела «6.4. EoS read-process-write»Реализуй loop: читать из in, обработать, записать в out, закомитить offset — всё внутри одной транзакции (SendOffsetsToTransaction в franz-go).
6.5. Симуляция rebalance
Заголовок раздела «6.5. Симуляция rebalance»Запусти 3 consumers, кильни один, посмотри в логах — был ли это eager или cooperative rebalance.
6.6. Compaction
Заголовок раздела «6.6. Compaction»Создай compacted topic, пиши с одним и тем же key, через kafka-dump-log.sh глянь segment’ы — увидь, что cleaner удалил старые версии.
7. Источники
Заголовок раздела «7. Источники»- Apache Kafka Documentation. https://kafka.apache.org/documentation/
- KIP-101: Leader Epoch. https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation
- KIP-500: KRaft Mode. https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum
- KIP-98: Exactly-Once Delivery and Transactional Messaging. https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
- KIP-429: Incremental Cooperative Rebalancing. https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol
- KIP-866: ZK to KRaft Migration. https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration
- Confluent Blog — Exactly Once Semantics. https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
- «Kafka: The Definitive Guide» (2nd ed.) — Gwen Shapira, Todd Palino, Rajini Sivaram, Krit Petty.
- franz-go. https://github.com/twmb/franz-go
- sarama. https://github.com/IBM/sarama
- kafka-go (segmentio). https://github.com/segmentio/kafka-go
- librdkafka. https://github.com/confluentinc/librdkafka
- «Apache Kafka In Action» — Aleksandr Kovalev (KRaft, 3.x).
- Strimzi (Kafka on Kubernetes). https://strimzi.io/
- Cruise Control (Kafka rebalancing). https://github.com/linkedin/cruise-control
Приложение A. Detail-level: RecordBatch формат
Заголовок раздела «Приложение A. Detail-level: RecordBatch формат»В Kafka 0.11+ структура записи на диске — RecordBatch v2 (compressed):
RecordBatch: baseOffset: int64 batchLength: int32 partitionLeaderEpoch: int32 magic: int8 (= 2) crc: int32 attributes: int16 (compression, timestamp type, transactional, control) lastOffsetDelta: int32 baseTimestamp: int64 maxTimestamp: int64 producerId: int64 producerEpoch: int16 baseSequence: int32 recordsCount: int32 records: [Record]
Record: length: varint attributes: int8 timestampDelta: varint offsetDelta: varint keyLength: varint key: bytes valueLength: varint value: bytes headersCount: varint headers: [Header]Ключевые поля:
producerId+producerEpoch+baseSequence— для idempotency.partitionLeaderEpoch— для leader epoch протокола (truncation).attributes— флаги: compression type, transactional, control batch (commit/abort markers).offsetDelta— относительный offset внутри batch (экономит место).
Приложение B. Kafka Streams и KSQL (overview)
Заголовок раздела «Приложение B. Kafka Streams и KSQL (overview)»Kafka Streams — клиентская библиотека (Java) для stream processing поверх Kafka. Stateful processing с RocksDB-state stores, локально хранящимися на consumer’е, и change-log topic’ом для recovery.
Ключевые абстракции:
- KStream — поток событий.
- KTable — таблица (compacted topic) с последним значением по ключу.
- GlobalKTable — read-only глобальная копия для join’ов.
KStream<String, Order> orders = builder.stream("orders");KStream<String, EnrichedOrder> enriched = orders.join(usersTable, (orderKey, order) -> order.userId, (order, user) -> enrich(order, user));enriched.to("enriched-orders");KSQL / ksqlDB — SQL поверх Kafka Streams. Можно писать:
CREATE STREAM orders (id BIGINT, amount DOUBLE, user_id BIGINT) WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON');CREATE TABLE order_stats AS SELECT user_id, COUNT(*), SUM(amount) FROM orders GROUP BY user_id;Go-эквивалентов production-grade Streams нет; обычно state store строят руками поверх franz-go.
Приложение C. Kafka на Kubernetes
Заголовок раздела «Приложение C. Kafka на Kubernetes»- Strimzi — Apache 2.0, наиболее популярный operator. CRD:
Kafka,KafkaTopic,KafkaUser. Поддерживает KRaft. - Confluent for Kubernetes (CFK) — коммерческий.
- Banzai Cloud Koperator — устарел, не developed actively.
Strimzi пример:
apiVersion: kafka.strimzi.io/v1beta2kind: Kafkametadata: name: my-clusterspec: kafka: version: 3.7.0 replicas: 3 listeners: - name: plain port: 9092 type: internal tls: false storage: { type: persistent-claim, size: 100Gi, deleteClaim: false } entityOperator: topicOperator: {} userOperator: {}Приложение D. Tuning Kafka producer для финтех EoS
Заголовок раздела «Приложение D. Tuning Kafka producer для финтех EoS»import "github.com/twmb/franz-go/pkg/kgo"
cl, _ := kgo.NewClient( kgo.SeedBrokers("k1:9092", "k2:9092", "k3:9092"), kgo.RequiredAcks(kgo.AllISRAcks()), kgo.MaxBufferedRecords(50_000), kgo.ProducerBatchMaxBytes(1_000_000), kgo.ProducerLinger(5*time.Millisecond), kgo.RecordRetries(math.MaxInt32), // ретраить до победного (с idempotency = безопасно) kgo.RetryBackoffFn(func(int) time.Duration { return 100*time.Millisecond }), kgo.TransactionalID("payments-v1"), kgo.TransactionTimeout(60*time.Second), kgo.ProducerBatchCompression(kgo.ZstdCompression()),)И consumer:
cl2, _ := kgo.NewClient( kgo.SeedBrokers("k1:9092"), kgo.ConsumerGroup("payments-processor"), kgo.ConsumeTopics("payments.in"), kgo.FetchIsolationLevel(kgo.ReadCommitted()), kgo.Balancers(kgo.CooperativeStickyBalancer()), kgo.SessionTimeout(30*time.Second), kgo.DisableAutoCommit(),)Приложение E. Failure-injection checklist
Заголовок раздела «Приложение E. Failure-injection checklist»Перед prod-релизом тестируй:
- Kill leader broker → проверить, что producer ретраит без потерь.
- Kill consumer pod → rebalance протекает, нет дубликатов под EoS.
- Network partition controller → выборы нового через KRaft → < 10s.
- Disk full → broker честно отказывает producer’ам, не корраптится.
- Schema breaking change → consumer падает в DLQ, не блокирует.
Инструмент: Chaos Mesh / Litmus / собственные скрипты tc netem + kill -9.