Apache Kafka в production: глубокое погружение для Go
Зачем знать. Kafka — стандарт де-факто для event streaming и асинхронной интеграции сервисов с гарантированной доставкой. Middle 2 Go-инженер обязан понимать внутренности (KRaft, ISR, log compaction), уметь настроить idempotent/transactional producer, грамотно работать с consumer groups, реализовать exactly-once semantics и DLQ-схему, выбрать клиентскую библиотеку (franz-go vs kafka-go vs sarama) и диагностировать lag в проде.
Содержание
Заголовок раздела «Содержание»- Концепция и архитектура (кратко)
- Production-практики: producer, consumer, EoS, schema registry, мониторинг, tuning
- Gotchas — реальные грабли
- Real cases: order pipeline, log aggregation, CDC
- 30 вопросов и ответов
- Practice — задания
- Источники
1. Концепция (кратко)
Заголовок раздела «1. Концепция (кратко)»1.1 Базовые сущности
Заголовок раздела «1.1 Базовые сущности» KAFKA CLUSTER ┌──────────────────────────────────────────────────┐ │ │ │ Broker-1 Broker-2 Broker-3 │ │ (Controller) │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ Topic A │ │ Topic A │ │ Topic A │ │ │ │ P0 (L) │◀────▶│ P0 (F) │◀────▶│ P0 (F) │ │ │ │ P1 (F) │◀────▶│ P1 (L) │◀────▶│ P1 (F) │ │ │ │ P2 (F) │◀────▶│ P2 (F) │◀────▶│ P2 (L) │ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │ └──────────────────────────────────────────────────┘ ▲ ▲ │ │ Producer-1 Consumer Group "g1" C1, C2, C3 (по partition)
L = leader, F = follower (replica) ISR (in-sync replicas) = leader + актуальные followers- Broker — серверный узел Kafka. Хранит partitions, обслуживает producer/consumer.
- Controller — один из брокеров, отвечает за метаданные кластера (партиции, leader election). С Kafka 3.3+ через KRaft (Kafka Raft) controller встроен прямо в брокеры (или в отдельные controller-node), Zookeeper больше не нужен.
- Topic — логический канал, делится на partitions.
- Partition — упорядоченная append-only последовательность сообщений; единица параллелизма. Внутри partition ordering строгий, между partitions — нет.
- Replica — копия partition на другом broker. У каждой partition есть leader (читает/пишет клиенты) и followers (репликация).
- ISR (In-Sync Replicas) — список реплик, которые «догнали» leader в пределах
replica.lag.time.max.ms. Когда producer указываетacks=all, сообщение считается записанным только если все ISR подтвердили. - Offset — номер сообщения внутри partition (long, монотонно растущий).
1.2 KRaft (Kafka Raft) vs Zookeeper
Заголовок раздела «1.2 KRaft (Kafka Raft) vs Zookeeper» До 3.3 (Zookeeper): ┌──────────┐ ┌──────────────┐ │Zookeeper │◀──▶│ Kafka Broker │ │ Quorum │ │ + Controller│ └──────────┘ └──────────────┘
После 3.3+ (KRaft): ┌──────────────────────────────┐ │ Controller-quorum (Raft) │ │ встроен в брокеры или │ │ отдельный controller-node │ └──────────────────────────────┘С 3.3 KRaft GA, с 4.0 (2025) Zookeeper полностью удалён. Преимущества: одна система вместо двух, меньше операционных артефактов, быстрее failover, метаданные хранятся в специальном __cluster_metadata topic.
1.3 Log segments, retention, compaction
Заголовок раздела «1.3 Log segments, retention, compaction»Каждая partition физически — это набор сегментов (файлов) на диске:
/var/lib/kafka/data/orders-0/ ├── 00000000000000000000.log ← сегмент данных ├── 00000000000000000000.index ← индекс offset → file position ├── 00000000000000000000.timeindex ← индекс timestamp → offset ├── 00000000000045623891.log ├── 00000000000045623891.index └── leader-epoch-checkpointRetention:
retention.ms(по времени, default 7 дней)retention.bytes(по размеру)- Применяется к сегментам, целиком: сегмент удаляется когда самое старое сообщение в нём старше retention.
Compaction (cleanup.policy=compact) — для key-based topics: Kafka гарантирует, что для каждого ключа в topic остаётся как минимум последнее значение. Используется для:
- changelog (CDC),
- compacted state (current snapshot),
- tombstones (
nullpayload = удаление ключа).
Before compaction:[k1=v1, k2=v2, k1=v3, k3=v4, k1=v5, k2=v6]
After compaction:[k3=v4, k1=v5, k2=v6] ← последние значения по ключам1.4 Producer и Consumer (упрощённо)
Заголовок раздела «1.4 Producer и Consumer (упрощённо)»- Producer определяет partition (по hash(key) или round-robin), отправляет batch на leader, ждёт ack согласно
acks. - Consumer group делит partitions топика между членами. Каждая partition — ровно один consumer внутри группы. Если consumers больше чем partitions — лишние idle.
2. Production-практики
Заголовок раздела «2. Production-практики»2.1 Producer config (детально)
Заголовок раздела «2.1 Producer config (детально)»| Параметр | Значение | Когда нужно |
|---|---|---|
acks | 0 / 1 / all | all для durability |
enable.idempotence | true (default с 3.0) | Защита от дублей при retry |
max.in.flight.requests.per.connection | 1–5 | =1 для strict ordering без idempotence |
retries | Integer.MAX_VALUE | Бесконечные retry, ограничены delivery.timeout.ms |
delivery.timeout.ms | 120000 | Полный таймаут публикации |
compression.type | lz4 / snappy / zstd / gzip / none | Для throughput выбирай lz4 или zstd |
linger.ms | 5–50 ms | Batching, latency vs throughput |
batch.size | 16–256 KB | Размер batch на partition |
transactional.id | string | Для exactly-once |
Пример: idempotent producer (franz-go)
Заголовок раздела «Пример: idempotent producer (franz-go)»import ( "context" "github.com/twmb/franz-go/pkg/kgo")
cl, err := kgo.NewClient( kgo.SeedBrokers("kafka1:9092", "kafka2:9092", "kafka3:9092"), kgo.DefaultProduceTopic("orders"), // idempotence включается автоматически при retries > 0 и acks=all kgo.RequiredAcks(kgo.AllISRAcks()), kgo.ProducerBatchCompression(kgo.Lz4Compression()), kgo.ProducerLinger(10 * time.Millisecond), kgo.MaxBufferedRecords(1000),)if err != nil { /* ... */ }defer cl.Close()
r := &kgo.Record{ Key: []byte(orderID), // hash(key) → partition (стабильно) Value: payload, Headers: []kgo.RecordHeader{ {Key: "trace-id", Value: []byte(traceID)}, },}cl.Produce(ctx, r, func(rec *kgo.Record, err error) { if err != nil { log.Error(err) }})cl.Flush(ctx)Transactional producer (атомарно несколько partitions/topics)
Заголовок раздела «Transactional producer (атомарно несколько partitions/topics)»cl, _ := kgo.NewClient( kgo.SeedBrokers("kafka1:9092"), kgo.TransactionalID("orders-producer-1"), kgo.RequiredAcks(kgo.AllISRAcks()),)
if err := cl.BeginTransaction(); err != nil { /* ... */ }
cl.Produce(ctx, &kgo.Record{Topic: "orders", Value: ...}, nil)cl.Produce(ctx, &kgo.Record{Topic: "audit", Value: ...}, nil)
switch err := cl.EndTransaction(ctx, kgo.TryCommit); {case errors.Is(err, kerr.UnknownProducerID): // нужен новый producer instancedefault: // ok or abort}2.2 Consumer config (детально)
Заголовок раздела «2.2 Consumer config (детально)»| Параметр | Значение | Комментарий |
|---|---|---|
group.id | string | Идентификатор consumer group |
auto.offset.reset | earliest / latest / none | Когда нет committed offset |
enable.auto.commit | false | В проде почти всегда manual commit |
partition.assignment.strategy | cooperative-sticky | Рекомендация с 2.4+ |
session.timeout.ms | 30 000 | Через сколько consumer считается мёртвым |
heartbeat.interval.ms | 1/3 от session.timeout.ms | Частота heartbeat |
max.poll.interval.ms | 300 000 | Между poll() — иначе rebalance |
max.poll.records | 500 | Записей за один poll |
fetch.min.bytes | 1–1MB | Накопление перед ответом |
fetch.max.wait.ms | 500 | Максимум ожидания |
isolation.level | read_committed | Для exactly-once |
group.instance.id | uuid/имя | Static membership |
Cooperative rebalancing (важно!)
Заголовок раздела «Cooperative rebalancing (важно!)»До Kafka 2.4 при ребалансе все consumers останавливались — «stop-the-world». С cooperative-sticky ребаланс инкрементальный: только пересдаваемые partitions останавливаются.
Static membership (group.instance.id)
Заголовок раздела «Static membership (group.instance.id)»Без него: рестарт пода → consumer считается новым → rebalance. С static ID: тот же ID после рестарта → Kafka не делает rebalance в течение session.timeout.ms. Снижает downtime при rolling restart на порядок.
Пример: consumer (franz-go, manual commit)
Заголовок раздела «Пример: consumer (franz-go, manual commit)»cl, _ := kgo.NewClient( kgo.SeedBrokers("kafka1:9092"), kgo.ConsumerGroup("orders-processor"), kgo.ConsumeTopics("orders"), kgo.DisableAutoCommit(), kgo.Balancers(kgo.CooperativeStickyBalancer()), kgo.SessionTimeout(30 * time.Second), kgo.RebalanceTimeout(60 * time.Second), kgo.InstanceID("consumer-pod-1"), // static membership kgo.FetchMaxWait(500 * time.Millisecond), kgo.FetchMaxBytes(50 << 20),)
for { fetches := cl.PollFetches(ctx) if errs := fetches.Errors(); len(errs) > 0 { log.Error(errs) continue } fetches.EachRecord(func(r *kgo.Record) { if err := process(r); err != nil { // в DLQ или retry sendToDLQ(r, err) return } }) // commit ТОЛЬКО когда batch обработан if err := cl.CommitUncommittedOffsets(ctx); err != nil { log.Error("commit failed:", err) }}2.3 Acks (детально)
Заголовок раздела «2.3 Acks (детально)» acks=0: producer ──message──▶ broker (не ждём ответа) ⚠️ может потерять данные, fire-and-forget
acks=1: producer ──message──▶ leader ──OK leader ──╳──▶ follower (не дождались) ⚠️ потеря если leader упадёт до replication
acks=all: producer ──message──▶ leader ──▶ followers (все ISR) ◀──OK от всех ISR── ✅ durability, при min.insync.replicas=2 переживёт падение 1 brokerСтандарт для prod: acks=all + min.insync.replicas=2 + replication.factor=3.
2.4 Idempotent producer
Заголовок раздела «2.4 Idempotent producer»Producer получает уникальный producer_id (PID) + sequence number на partition. Broker отслеживает последний sequence от данного PID на каждой partition — дубль с тем же sequence отбрасывается.
Включается:
enable.idempotence=trueacks=allmax.in.flight.requests.per.connection ≤ 5retries > 0Защищает только от retry на сетевом уровне внутри одного producer-session. Если приложение само отправит message дважды — это два разных запроса, оба будут приняты.
2.5 Exactly-Once Semantics (EoS)
Заголовок раздела «2.5 Exactly-Once Semantics (EoS)»Два независимых уровня:
- At-least-once + idempotence = no duplicates from network retries.
- Transactional producer + consumer с
isolation.level=read_committed= атомарное обновление нескольких topics/partitions + commit offsets в одной транзакции.
Pattern «read-process-write» (Kafka Streams под капотом):
beginTransaction() ↓ read from topic-in ↓ process() ↓ write to topic-out sendOffsetsToTransaction(consumed_offsets, group) ↓ commitTransaction()Если что-то падает в середине — abort, всё откатится; consumer с read_committed не увидит aborted данных.
⚠️ EoS работает только внутри Kafka. Если процесс пишет в Kafka и в Postgres — это two-phase commit, нужен outbox pattern (см. ниже).
2.6 Compression
Заголовок раздела «2.6 Compression»| Алгоритм | CPU | Сжатие | Когда |
|---|---|---|---|
none | 0 | 1.0 | Очень малые сообщения |
gzip | high | best | Архив, low traffic |
snappy | low | medium | Универсально |
lz4 | low | medium | Чаще всего default |
zstd | medium | best ratio | Современный default, Kafka 2.1+ |
Сжатие в проде обязательно — экономия диска × 5, сети × 5.
2.7 Schema management
Заголовок раздела «2.7 Schema management» Application ──Avro/Protobuf──▶ Schema Registry │ ▼ [schema_id + payload] │ ▼ Application ──read──▶ Schema Registry ──schema──▶ DecodeSubject naming strategies:
TopicNameStrategy(по умолчанию) — subject =topic-key/topic-value. Один тип на topic.RecordNameStrategy— subject по FQN. Мульти-тип в одном topic.TopicRecordNameStrategy—topic-FQN. Мульти-тип, но изолированы по topic.
Compatibility:
BACKWARD(default) — новая схема должна читать данные старой схемы (consumer обновляется первым).FORWARD— старая схема может читать данные новой (producer обновляется первым).FULL— оба направления.NONE— без проверки (опасно).
Реализации: Confluent Schema Registry, Karapace (open-source, Apache 2.0), AWS Glue Schema Registry, Redpanda Schema Registry.
2.8 Outbox pattern (атомарность DB + Kafka)
Заголовок раздела «2.8 Outbox pattern (атомарность DB + Kafka)» Transaction: INSERT INTO orders ... INSERT INTO outbox (event_type, payload, status='pending') COMMIT ─────────────────────────── Outbox poller / Debezium CDC: read pending events ──▶ publish to Kafka ──▶ mark as sent- Polling outbox: select из outbox + publish + update status. Простой, но нагружает БД.
- Transactional outbox через CDC (Debezium): Debezium читает WAL Postgres / binlog MySQL и публикует в Kafka. БД не знает о Kafka.
2.9 DLQ (Dead Letter Queue)
Заголовок раздела «2.9 DLQ (Dead Letter Queue)»Pattern:
┌──────────┐ ok ┌──────────┐ │ Consumer │──────▶ │ Process │ └──────────┘ └──────────┘ │ err ▼ ┌──────────┐ N attempts │ Retry │─────────────▶ DLQ topic (orders.DLQ) │ topic │ + headers: error, stack, attempt └──────────┘В Go custom-логика: после N retry → publish в orders.DLQ + commit offset исходного topic. Отдельный consumer / human-process работает с DLQ.
2.10 Tuning кластера
Заголовок раздела «2.10 Tuning кластера»- Partition count: 10× от числа consumer-инстансов потенциальных. Слишком много (>4000 на broker) — деградация.
- Replication factor: 3 — стандарт.
- min.insync.replicas: 2 (при RF=3). Если ISR падает до 1 — producer с
acks=allполучитNotEnoughReplicas. - Heap: 6–8 GB JVM, остаток ОЗУ — для page cache.
- Disks: NVMe / SSD, RAID-10 или JBOD, XFS.
2.11 Мониторинг
Заголовок раздела «2.11 Мониторинг»- Consumer lag (high → consumer не успевает):
kafka-lag-exporter, Burrow (LinkedIn), Cruise Control. - Under-replicated partitions (UR > 0 → проблема): JMX metric.
- Request latency: produce p99 < 100ms норма.
- Broker disk usage, network IO, active controller count == 1.
2.12 Local dev
Заголовок раздела «2.12 Local dev»- Redpanda (Kafka-API compatible, написан на C++, без JVM, single binary): идеально для CI и laptop.
- docker-compose с
confluentinc/cp-kafkaилиbitnami/kafka(KRaft режим, single broker).
# docker-compose.yml (Redpanda + Console)services: redpanda: image: docker.redpanda.com/redpandadata/redpanda:v24.2.7 command: - redpanda start - --smp 1 - --memory 1G - --reserve-memory 0M - --node-id 0 - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092 - --advertise-kafka-addr internal://redpanda:9092,external://localhost:19092 - --mode dev-container ports: ["19092:19092", "9644:9644"] console: image: docker.redpanda.com/redpandadata/console:v2.7.2 environment: KAFKA_BROKERS: redpanda:9092 ports: ["8080:8080"]2.13 Cloud Kafka
Заголовок раздела «2.13 Cloud Kafka»- Confluent Cloud (родители Kafka, есть Schema Registry, ksqlDB, Connect).
- AWS MSK + MSK Serverless.
- Aiven, Upstash Kafka (per-message billing).
- Redpanda Cloud.
3. Gotchas
Заголовок раздела «3. Gotchas»⚠️ Acks=1 ≠ durability. Leader может умереть до replication → потеря. Для prod всегда acks=all.
⚠️ min.insync.replicas=1 + RF=3 = иллюзия безопасности. При acks=all достаточно одной ISR — leader может быть единственной. Ставь min.insync.replicas=2.
⚠️ Idempotence ≠ Exactly-once. Idempotence — защита от network retry внутри одной сессии producer. Если приложение само отправило дубль — будет дубль.
⚠️ enable.auto.commit=true опасен. Commit идёт по таймеру, после неудачной обработки offset уже закоммичен → message lost. В проде — manual commit.
⚠️ Большое max.poll.interval.ms маскирует проблему. Поднимать таймаут «чтобы не было rebalance» — антипаттерн; либо ускоряй обработку, либо выноси в отдельный worker pool.
⚠️ Static membership ≠ no rebalance. При scale-up/scale-down ребаланс будет всё равно.
⚠️ max.in.flight.requests > 1 без idempotence ломает ordering. При retry батчей порядок меняется.
⚠️ Compaction не освобождает место мгновенно. Сегменты compactится только когда они «closed» и tombstone живёт delete.retention.ms (default 24h).
⚠️ Изменение числа partitions ломает hash(key)→partition mapping. Старые ключи могут поехать на другие partitions → нарушение ordering by key.
⚠️ auto.offset.reset=latest теряет данные при первом запуске consumer group (всё до текущего момента не прочитано).
⚠️ EoS не работает кросс-кластерно. MirrorMaker 2 не сохраняет transactional semantics между кластерами.
⚠️ Heap > 32 GB у broker — отключает compressed oops в JVM → больше памяти на pointers, медленнее GC. Лучше 6–8 GB heap + 64–256 GB page cache.
⚠️ Schema Registry — single point of failure. Producer не может опубликовать без регистра. Нужен HA mode.
⚠️ replica.lag.time.max.ms слишком маленький → followers выкидываются из ISR на любом GC pause → шум. Default 30s — обычно ок.
⚠️ compression.type на topic ≠ producer. Можно настроить разные; рекомендуется одинаково (broker re-compress иначе).
⚠️ CGo (confluent-kafka-go) — нужен librdkafka на хосте, проблемы с Alpine (musl). Используй pure-Go (franz-go) если нет жёстких perf-требований.
4. Real cases
Заголовок раздела «4. Real cases»4.1 Order processing pipeline
Заголовок раздела «4.1 Order processing pipeline» API ──order_created──▶ [orders] │ ▼ [validate-consumer] ──order_validated──▶ [orders.validated] │ ▼ [payment-consumer] │ ┌──────────────────┴──────────────────┐ ▼ ▼ [payment_ok] [payment_failed] │ │ ▼ ▼ [ship-consumer] [refund-consumer] │ ▶ orders.DLQ (if dispute)Каждый consumer — отдельная Go-сервис, scaling по числу partitions; ordering по order_id как ключу гарантирует strict order для одного заказа.
4.2 Log aggregation
Заголовок раздела «4.2 Log aggregation» N сервисов ──fluentbit/filebeat──▶ Kafka (topic per service or per env) │ ▼ Logstash / Vector ──▶ Elastic / ClickHouseПреимущество: backpressure absorb (если ES не успевает, Kafka буферизует), retention 1–7 дней.
4.3 CDC (Change Data Capture)
Заголовок раздела «4.3 CDC (Change Data Capture)» Postgres WAL ──Debezium──▶ Kafka (db.public.orders) ──▶ consumers (compacted)Полная репликация состояния таблицы через compacted topic + tombstones (DELETE → message with null value).
5. Вопросы
Заголовок раздела «5. Вопросы»Базовые
Заголовок раздела «Базовые»1. Что такое partition в Kafka? Упорядоченная append-only последовательность сообщений внутри topic; единица параллелизма и репликации.
2. Зачем KRaft вместо Zookeeper? Убирает внешнюю зависимость, упрощает ops, ускоряет failover, метаданные хранятся как обычный topic в самом Kafka.
3. Что такое ISR?
In-Sync Replicas — реплики partition, которые «догнали» leader в пределах replica.lag.time.max.ms. Сообщение с acks=all подтверждается только когда все ISR записали.
4. Что такое consumer group? Группа consumer-инстансов, разделяющих partitions topic. Внутри группы каждая partition обрабатывается ровно одним consumer.
5. Чем acks=1 отличается от acks=all?
acks=1 — ждём подтверждения только от leader; durability слабее. acks=all — ждём подтверждения от всех ISR.
Producer
Заголовок раздела «Producer»6. Что такое idempotent producer? Producer с уникальным PID + sequence number на partition; broker отбрасывает дубли с тем же sequence. Защищает от retry-дублей в рамках одной producer session.
7. Что включает enable.idempotence=true?
Автоматически: acks=all, retries>0, max.in.flight.requests.per.connection ≤ 5.
8. Что такое transactional producer?
Producer с transactional.id, поддерживает атомарные write в несколько partitions/topics + sendOffsetsToTransaction для read-process-write pattern.
9. Какие алгоритмы compression поддерживает Kafka? none, gzip, snappy, lz4, zstd (с 2.1+). zstd рекомендуется для prod.
10. Что такое linger.ms / batch.size?
Producer накапливает сообщения в batch на partition; отправляет когда batch заполнен (batch.size) или прошло linger.ms.
Consumer
Заголовок раздела «Consumer»11. Что такое cooperative-sticky rebalance? Инкрементальный rebalance: вместо stop-the-world (eager) перераспределяются только меняющиеся partitions. С Kafka 2.4+, рекомендация для prod.
12. Зачем static membership (group.instance.id)?
Чтобы при rolling restart consumer не считался новым → нет rebalance в течение session.timeout.ms.
13. max.poll.interval.ms vs session.timeout.ms?
session.timeout.ms — между heartbeats (broker считает consumer мёртвым); max.poll.interval.ms — между вызовами poll() (consumer считается «застрявшим»). Heartbeats идут в отдельном thread.
14. commitSync vs commitAsync?
commitSync блокирующий, надёжный, медленнее. commitAsync неблокирующий, без retry — для регулярного коммита + commitSync на shutdown.
15. Что произойдёт при auto.offset.reset=earliest и старом topic?
Consumer прочитает все сохранённые сообщения с самого начала.
Exactly-once
Заголовок раздела «Exactly-once»16. Что такое EoS в Kafka? Гарантия, что сообщение обработано ровно один раз даже при failures. Достигается transactional producer + isolation.level=read_committed + commit offsets в транзакции.
17. Когда EoS не работает? Кросс-системно (Kafka + DB), кросс-кластерно (MirrorMaker 2 не сохраняет транзакции).
18. Что такое read_committed? Consumer читает только commit-завершённые транзакции; aborted/in-progress сообщения скрыты.
Архитектура
Заголовок раздела «Архитектура»19. Что такое compacted topic?
Topic с cleanup.policy=compact: Kafka гарантирует, что для каждого ключа сохраняется как минимум последнее значение. Используется для changelog / state.
20. Что такое tombstone?
Сообщение с value=null в compacted topic — сигнал «удалить ключ».
21. Что такое controller? Broker, ответственный за метаданные кластера: leader election, partition reassignment. С KRaft встроен в брокеры (Raft quorum).
22. Сколько partitions выбрать для topic? Зависит от throughput и числа consumer-инстансов. Грубо: max_throughput / per_partition_throughput. На broker не более ~4000 partitions (с KRaft планка выше).
23. Replication factor 3 + min.insync.replicas 2 — что это даёт? Переживёт падение 1 broker без потери данных и без блокировки producer.
Go клиенты
Заголовок раздела «Go клиенты»24. franz-go vs sarama vs kafka-go vs confluent-kafka-go?
- franz-go (twmb): pure Go, современный, поддерживает все фичи (KIP), быстрый — рекомендация 2025+.
- kafka-go (segmentio): pure Go, простой API, меньше фич (нет полноценных транзакций).
- confluent-kafka-go: CGo wrapper над librdkafka, лучший throughput, но CGo.
- IBM/sarama (бывш Shopify): pure Go, классика, активная поддержка.
25. Почему franz-go стал рекомендацией? Активная разработка, полная поддержка KIP (включая KIP-848 next-gen consumer groups), стабильное API, отличная документация.
Production
Заголовок раздела «Production»26. Что такое DLQ pattern? После N неуспешных попыток обработки message отправляется в отдельный DLQ-topic с metadata об ошибке. Отдельный consumer/operator работает с DLQ.
27. Что такое outbox pattern? Запись «событий» в outbox-таблицу в той же DB-транзакции что и основная запись; отдельный процесс / CDC публикует их в Kafka. Гарантирует атомарность DB + Kafka.
28. Как мониторить consumer lag?
kafka-lag-exporter (Prometheus), Burrow (LinkedIn), JMX-метрики; алерт при росте lag.
29. Что делать если consumer не успевает?
- увеличить partitions + consumers, 2) ускорить процессинг (batch, async DB), 3) вынести IO в worker pool, 4) DLQ для poison message.
30. Что такое Redpanda? Kafka-API совместимая система на C++ без JVM и Zookeeper; быстрее на mixed workload, проще ops. Используется как drop-in replacement и для CI/dev.
6. Practice
Заголовок раздела «6. Practice»-
Идемпотентный publisher. Напиши Go-функцию, которая публикует event с
event_idкак ключом; idempotent producer + проверка через consumer что дубли при retry не появляются. -
DLQ. Реализуй consumer для topic
orders, при ошибке обработки → publish вorders.DLQс headersx-error,x-retry-count,x-original-topic; offset исходного topic коммитим только после успешной публикации DLQ. -
Outbox pattern. PostgreSQL-таблица
outbox, Go-poller выбираетWHERE status='pending' ORDER BY id LIMIT 100, публикует в Kafka, обновляетstatus='sent'. Покажи что транзакция атомарна. -
Transactional producer. Реализуй read-process-write: читай из
orders, обогащай из БД, пиши вorders.enriched+ commit offsets в транзакции. Verifу что abort откатывает обе write. -
Consumer lag monitor. Подними
kafka-lag-exporter+ Prometheus + Grafana. Сделай алертlag > 1000. -
Compaction. Создай compacted topic, отправь N сообщений с ключами повторяющимися; запусти log compaction, проверь что в файлах остались только последние.
-
Static membership benchmark. Сравни rolling restart consumer group с и без
group.instance.id; измерь сколько времени партиции «не обрабатывались». -
Schema Registry. Подними Karapace, зарегистрируй Avro-схему, опубликуй с эволюцией (добавление optional поля); проверь backward compatibility.
-
Cooperative-sticky vs Range. Scale consumer group с 1 до 5; сравни сколько partitions перераспределилось.
-
Redpanda для CI. Замени Kafka на Redpanda в docker-compose, прогони integration тесты, измерь startup time.
7. Дополнительные блоки
Заголовок раздела «7. Дополнительные блоки»7.1 Расширенный пример: outbox poller на Go
Заголовок раздела «7.1 Расширенный пример: outbox poller на Go»type OutboxEvent struct { ID int64 Topic string Key []byte Payload []byte Headers map[string]string Status string // pending | sent | failed Attempts int CreatedAt time.Time}
type Poller struct { db *pgxpool.Pool kafka *kgo.Client batch int backoff time.Duration}
func (p *Poller) RunOnce(ctx context.Context) error { tx, err := p.db.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.ReadCommitted}) if err != nil { return err } defer tx.Rollback(ctx)
rows, err := tx.Query(ctx, ` SELECT id, topic, key, payload, headers, attempts FROM outbox WHERE status = 'pending' AND attempts < 5 ORDER BY id FOR UPDATE SKIP LOCKED LIMIT $1`, p.batch) if err != nil { return err }
var events []OutboxEvent for rows.Next() { var e OutboxEvent var hraw []byte rows.Scan(&e.ID, &e.Topic, &e.Key, &e.Payload, &hraw, &e.Attempts) json.Unmarshal(hraw, &e.Headers) events = append(events, e) } rows.Close()
if len(events) == 0 { return tx.Commit(ctx) }
// публикация batch results := make([]error, len(events)) var wg sync.WaitGroup for i, ev := range events { wg.Add(1) i, ev := i, ev rec := &kgo.Record{ Topic: ev.Topic, Key: ev.Key, Value: ev.Payload, } for k, v := range ev.Headers { rec.Headers = append(rec.Headers, kgo.RecordHeader{Key: k, Value: []byte(v)}) } p.kafka.Produce(ctx, rec, func(r *kgo.Record, err error) { results[i] = err; wg.Done() }) } wg.Wait()
// обновление статуса for i, ev := range events { if results[i] == nil { tx.Exec(ctx, `UPDATE outbox SET status='sent', sent_at=NOW() WHERE id=$1`, ev.ID) } else { tx.Exec(ctx, `UPDATE outbox SET attempts=attempts+1, last_error=$2 WHERE id=$1`, ev.ID, results[i].Error()) } } return tx.Commit(ctx)}Особенности:
FOR UPDATE SKIP LOCKED— несколько poller-инстансов работают параллельно без блокировок.- batch + parallel publish.
- attempts counter — после 5 неудач сообщение помечается
failed, требует ручного вмешательства.
7.2 Bench: producer compression vs throughput
Заголовок раздела «7.2 Bench: producer compression vs throughput»Эмпирические цифры (3-broker prod-cluster, 1KB JSON-сообщения, 1 producer, m5.4xlarge):
| compression | MB/sec write | CPU on producer | CPU on broker |
|---|---|---|---|
| none | 280 | 8% | 9% |
| snappy | 1100 | 19% | 11% |
| lz4 | 1400 | 22% | 12% |
| zstd | 1750 | 38% | 14% |
| gzip | 320 | 75% | 22% |
zstd — лучший ratio, но дороже CPU. Для среднего workload — lz4.
7.3 Partition rebalancing — что меняется в KIP-848
Заголовок раздела «7.3 Partition rebalancing — что меняется в KIP-848»KIP-848 (next-gen consumer groups, Kafka 3.7+ early access) меняет модель ребаланса:
- Server-side assignment вместо client-side.
- Heartbeats заменяют long-poll.
- Incremental rebalance стало стандартом.
- Меньше моментов «всё остановилось».
С точки зрения Go-разработчика franz-go уже поддерживает KIP-848 (по флагу) — будущее consumer groups.
7.4 EoS workflow: подробно
Заголовок раздела «7.4 EoS workflow: подробно» ┌──────────────────────────────────────────────────────────┐ │ Stream processor (read-process-write): │ │ │ │ txProd.BeginTransaction() │ │ ↓ │ │ records := consumer.PollRecords() │ │ ↓ │ │ for each record: │ │ out := process(record) │ │ txProd.Produce(out, "topic-out") │ │ ↓ │ │ txProd.SendOffsetsToTransaction( │ │ consumed_offsets, consumer.Group()) │ │ ↓ │ │ if all_ok: txProd.CommitTransaction() │ │ else: txProd.AbortTransaction() │ └──────────────────────────────────────────────────────────┘Consumer с isolation.level=read_committed не видит aborted данных. Это и есть EoS внутри Kafka.
7.5 Schema evolution: пример Avro
Заголовок раздела «7.5 Schema evolution: пример Avro»Версия 1:
{"type": "record", "name": "Order", "fields": [ {"name": "id", "type": "string"}, {"name": "total", "type": "int"}]}Версия 2 — добавляем optional поле (BACKWARD-совместимо):
{"type": "record", "name": "Order", "fields": [ {"name": "id", "type": "string"}, {"name": "total", "type": "int"}, {"name": "currency", "type": ["null", "string"], "default": null}]}Старый consumer прочитает данные новой версии — поле просто проигнорирует.
⚠️ Удаление поля = breaking change. Сначала задефолти поле в null, потом deprecate, потом удалять (через несколько релизов).
7.6 Производительность брокера: настройки JVM и OS
Заголовок раздела «7.6 Производительность брокера: настройки JVM и OS»Прод-настройки broker:
- JVM heap: 6–8 GB, G1GC,
-XX:MaxGCPauseMillis=20. - swappiness: 1 (избегать swap).
- file descriptors: 100k+ (
ulimit -n). - net.core.rmem_max / wmem_max: 16 MB.
- vm.dirty_ratio: 80,
vm.dirty_background_ratio: 5. - FS: XFS, mount с
noatime,nobarrier(если есть battery-backed RAID).
7.7 Когда Kafka не подходит
Заголовок раздела «7.7 Когда Kafka не подходит»- Малые messages с очень низким throughput (10 msg/sec) — overkill, NATS/RabbitMQ проще.
- Очень большие messages (>1MB) — Kafka неэффективен, нужно chunking или Object Store.
- RPC / request-reply — Kafka это log, не bus; для RPC берите gRPC/NATS.
- Real-time с sub-ms latency — Kafka даёт ms-десятки ms; для high-frequency trading недостаточно.
- Малая команда без DevOps — Kafka сложен в ops; Redpanda проще, NATS ещё проще.
7.8 Чек-лист продакшен Kafka-deployment
Заголовок раздела «7.8 Чек-лист продакшен Kafka-deployment»- RF=3,
min.insync.replicas=2. - Producer
acks=all, idempotent enabled. - Consumer manual commit + cooperative-sticky.
- Static membership для критичных consumers.
- DLQ для каждого критичного topic.
- Schema Registry + BACKWARD compatibility.
- Monitoring: lag, UR partitions, request latency.
- Backup / DR: MirrorMaker 2 в DR-cluster.
- Alerts: UR > 0, lag > threshold, free disk < 30%.
- Runbook для broker failure, partition leader election, partition reassign.
8. Источники
Заголовок раздела «8. Источники»- Apache Kafka Documentation 3.7+ — официально, базовый референс.
- Confluent: KRaft (KIP-500) — глубокий разбор KRaft.
- twmb/franz-go — клиент и его документация (содержит лучшие практики).
- Effective Kafka, 2nd ed. — Emil Koutanov, 2023.
- Kafka: The Definitive Guide, 2nd ed. — Narkhede et al., O’Reilly.
- Designing Event-Driven Systems — Ben Stopford.
- Redpanda Docs — Kafka-compatible альтернатива.
- Karapace (Aiven Schema Registry) — open-source Schema Registry.
- KIP-848 (Next-gen consumer groups) — будущее consumer groups.
- Microservices.io: Outbox Pattern — Chris Richardson.