31. DLQ + Backpressure в production streaming
Зачем знать на Middle 3: Production streaming-системы падают не из-за throughput’а, а из-за двух классов сбоев: poison messages (некорректные данные намертво блокируют consumer) и backpressure (медленный consumer глушит всю систему). Senior обязан проектировать DLQ-стратегию (что считать poison, куда складывать, кто разбирает) и backpressure-стратегию (bounded buffers, pull-модель, rate limits) на уровне архитектуры. Без этого Kafka/NATS/RabbitMQ в проде превращаются в «время от времени забивающиеся трубы» — а это инциденты, потери денег и репутации.
Содержание
Заголовок раздела «Содержание»1. Концепция
Заголовок раздела «1. Концепция»Что такое DLQ
Заголовок раздела «Что такое DLQ»DLQ (Dead Letter Queue) — отдельная очередь/топик, куда отправляются сообщения, которые не могут быть обработаны consumer’ом.
Причины «poison message»:
- Невалидный формат / breaking schema.
- Невалидные данные (ссылка на удалённую сущность, integer overflow).
- Систематическая ошибка обработчика (бесконечная NPE).
- Дубликат, нарушающий бизнес-инвариант.
Без DLQ poison-сообщение:
- Не ack’ается → перевыполняется.
- Блокирует consumer на этом offset’е (особенно in-order processing).
- Очередь растёт за ним.
Стратегии обработки:
┌────────────────────────────────────────┐ │ Producer → Queue → Consumer │ └─────────┬──────────────────────────────┘ │ ▼ fail на обработке ┌─────────────────────────────────────────┐ │ Что делать с этим сообщением? │ └─────────┬───────────────────────────────┘ │ ┌─────────────┼──────────────┬─────────────────┐ ▼ ▼ ▼ ▼ SKIP RETRY RETRY + DLQ (log) (n раз) backoff (move out) │ ▼ Manual review / Auto-reprocess после фиксаЧто такое backpressure
Заголовок раздела «Что такое backpressure»Backpressure — механизм, при котором медленный consumer сигнализирует producer’у замедлиться, чтобы система не «лопнула».
Без backpressure:
- Producer пишет быстрее, чем consumer читает.
- Буфер растёт.
- OOM / disk full / lag растёт до часов.
- Producer’ы виснут / роняют сообщения.
С backpressure:
- Producer узнаёт о медленности и тормозит / отбрасывает.
- Система остаётся стабильной (graceful degradation).
Стратегии backpressure:
| Стратегия | Что делает | Trade-off |
|---|---|---|
| Block | Producer ждёт (блокируется на send) | Latency растёт, не теряем |
| Drop | Producer выкидывает сообщение | Теряем, latency низкая |
| Buffer + spill | Producer пишет на диск при переполнении RAM | Сложнее, но универсально |
| Pull-based | Consumer сам запрашивает batch размером, какой может обработать | Естественный backpressure |
| Reactive signal | Consumer явно сообщает «slow down N%» | Сложнее в реализации |
2. Production-deep dive
Заголовок раздела «2. Production-deep dive»2.1. DLQ в Kafka
Заголовок раздела «2.1. DLQ в Kafka»Kafka не имеет встроенного DLQ. Реализуется вручную.
Pattern 1: Per-topic DLQ
events → consumer fails → events.dlqevents.dlq → DLQ-consumer / manual tool / alertConsumer:
for { fetches := kafkaCl.PollFetches(ctx) fetches.EachPartition(func(p kgo.FetchTopicPartition) { for _, r := range p.Records { if err := handle(r); err != nil { if isPoison(err) { // отправляем в DLQ с meta dlqRecord := &kgo.Record{ Topic: r.Topic + ".dlq", Key: r.Key, Value: r.Value, Headers: []kgo.RecordHeader{ {Key: "x-original-topic", Value: []byte(r.Topic)}, {Key: "x-original-partition", Value: []byte(strconv.FormatInt(int64(r.Partition), 10))}, {Key: "x-original-offset", Value: []byte(strconv.FormatInt(r.Offset, 10))}, {Key: "x-error", Value: []byte(err.Error())}, {Key: "x-failed-at", Value: []byte(time.Now().Format(time.RFC3339))}, {Key: "x-attempt", Value: []byte("1")}, }, } kafkaCl.ProduceSync(ctx, dlqRecord) // commit offset, чтобы не зацикливаться kafkaCl.CommitRecords(ctx, r) } else { // retriable error, не committим time.Sleep(backoff) continue } } } })}Pattern 2: Retry topic chain
events → events.retry.5s → events.retry.30s → events.retry.5m → events.dlqКаждый retry topic имеет свой delay (в Kafka — через timestamp + consumer, который ждёт, или через Kafka Streams).
// При неуспехе отправляем в следующий retry topicattempt := getHeader(r, "x-attempt", 1) + 1nextTopic := selectRetryTopic(attempt)delaySend(nextTopic, r, computeDelay(attempt))Pattern 3: Confluent Kafka Connect — Dead Letter Queue config
В Kafka Connect (для source/sink connector’ов):
errors.tolerance=allerrors.deadletterqueue.topic.name=connect-dlqerrors.deadletterqueue.context.headers.enable=trueerrors.deadletterqueue.topic.replication.factor=3⚠️ Best practices DLQ в Kafka:
- DLQ topic = compacted? Нет. Обычно DLQ — limits-based retention (нельзя терять poison’ы).
- Replication.factor ≥ 3 — DLQ важен, не теряй.
- Schema schema-less (raw bytes) — DLQ должен принимать что угодно.
- Headers с meta — обязательно: source topic, partition, offset, error, attempt, timestamp.
- Алертинг — DLQ растёт = что-то сломалось.
- Tool для reprocess — после фикса бага UI/CLI, который читает DLQ и шлёт обратно в нужный topic.
2.2. DLQ в RabbitMQ
Заголовок раздела «2.2. DLQ в RabbitMQ»RabbitMQ имеет встроенный DLQ через x-dead-letter-exchange.
# При создании очереди:channel.queue_declare( queue='orders', arguments={ 'x-dead-letter-exchange': 'orders-dlx', 'x-dead-letter-routing-key': 'orders-dlq', 'x-message-ttl': 60000, # 60s, потом auto-DLQ 'x-max-length': 100000, })Когда сообщение попадает в DLQ:
- Reject/nack без requeue:
channel.basic_nack(delivery_tag, requeue=false). - TTL expired: сообщение в очереди слишком долго.
- Queue length exceeded: очередь переполнена.
DLQ сообщение получает headers x-death:
"x-death": [ { "count": 3, "reason": "rejected", "queue": "orders", "exchange": "orders-ex", "routing-keys": ["orders.create"], "time": "2026-05-21T10:00:00" }]Delayed retry в RabbitMQ:
Нет нативного delay (только plugin rabbitmq-delayed-message-exchange). Workaround — TTL-based retry queues:
main_queue (process) → dlx → retry_queue_5s (TTL=5s, no consumer) → dlx → main_queueПосле TTL сообщение возвращается. Лимит попыток — в headers x-attempt.
2.3. DLQ в NATS JetStream
Заголовок раздела «2.3. DLQ в NATS JetStream»JetStream имеет MaxDeliver — после N попыток сообщение terminated (не пересылается). Built-in DLQ нет, но реализуется через advisory subjects.
JetStream публикует advisory события:
$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.<stream>.<consumer>$JS.EVENT.ADVISORY.CONSUMER.MSG_NAKED.<stream>.<consumer>(если негативный ack)$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED.<stream>.<consumer>
Pattern: подписываемся на advisory + переотправляем в DLQ stream.
// Consumer основного stream'а с MaxDeliver=5cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Name: "worker", Durable: "worker", AckPolicy: jetstream.AckExplicitPolicy, MaxDeliver: 5, AckWait: 30*time.Second,})
// DLQ subscriber на advisorync.Subscribe("$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.ORDERS.worker", func(m *nats.Msg) { var ev jetstream.ConsumerDeliveryExceededAdvisory json.Unmarshal(m.Data, &ev) // Fetch original message and push to DLQ stream rawMsg, _ := stream.GetMsg(ctx, ev.StreamSeq) js.Publish(ctx, "orders.dlq", rawMsg.Data, jetstream.WithMsgID(...))})Альтернатива: explicit Term() вместо Nak(), и в обработчике catch’ить через wrapper’ы.
Recommended pattern: dlq.<stream>.<consumer> стрим, и DLQ-handler как отдельный consumer.
2.4. Backpressure в Kafka
Заголовок раздела «2.4. Backpressure в Kafka»Producer side:
buffer.memory=33554432 # 32MBmax.block.ms=60000 # при полном буфере — ждать до 60s, потом ошибкаacks=alllinger.ms=10Producer аккумулирует сообщения в buffer.memory. Когда полный — send() блокируется до max.block.ms, потом возвращает BufferExhaustedException.
Backpressure flow:
App ──Produce()──► Producer Client ──Network──► Broker │ ▼ buffer full → block / drop blocking⚠️ Если app не учитывает backpressure — поток отправки блокируется → ваш RPC handler виснет → клиент видит timeout.
Consumer side:
poll(Duration) — pull-based. Consumer сам решает скорость. Pull = inherent backpressure.
max.poll.records=500 # сколько забрать за один pollfetch.min.bytes=1048576 # 1MB — broker ждёт пока накопитсяfetch.max.wait.ms=500 # или 500msmax.partition.fetch.bytes=10485760 # max bytes per partitionГибкий backpressure:
Если обработка медленная, не убирай max.poll.records ниже — это уменьшит throughput. Лучше:
- Делать
poll()чаще, обрабатывать асинхронно. - Параллелить per-key (если order не нужен на partition).
- Pause partition:
pauseFetchPartitions(...)— стопает fetch для конкретных партиций, не разрывая группу.
// franz-go pause/resumecl.PauseFetchPartitions(map[string][]int32{"events": {0, 1}})// ... process slowly ...cl.ResumeFetchPartitions(map[string][]int32{"events": {0, 1}})2.5. Backpressure в NATS JetStream
Заголовок раздела «2.5. Backpressure в NATS JetStream»Pull consumer — естественный backpressure: consumer вызывает Fetch(batch_size), broker отдаёт максимум batch_size.
Push consumer:
max_ack_pending (важнейший параметр!) — сколько сообщений не ack’нутых может быть в полёте одновременно.
jetstream.ConsumerConfig{ MaxAckPending: 1000, // если 1000 не ack'нуты — broker не шлёт новые}Это flow control: broker сам тормозит отправку.
Flow Control (flow_control: true) — broker периодически шлёт «echo back ack» сообщения. Если клиент их не подтверждает — broker замедляется.
⚠️ MaxAckPending = 0 (unlimited) в push consumer — путь к перегрузке. ВСЕГДА ставь конкретное значение.
2.6. Backpressure в gRPC streaming
Заголовок раздела «2.6. Backpressure в gRPC streaming»gRPC поверх HTTP/2 имеет встроенный flow control window.
- Server и client объявляют, сколько байт каждый может принять.
WINDOW_UPDATEфреймы расширяют окно по мере чтения.- Если получатель не читает — окно сжимается → отправитель блокируется.
В Go:
// Не нужно делать вручную, gRPC сам.// Но если ты не вызываешь stream.Recv() / stream.Send() — окно закрывается.⚠️ Если используешь for { stream.Recv() } в горутине без обработки — окно закроется, отправитель повиснет. Backpressure правильный, но видно как «зависание».
2.7. Backpressure в Go: channels и semaphores
Заголовок раздела «2.7. Backpressure в Go: channels и semaphores»Bounded channel — самый простой backpressure:
jobs := make(chan Job, 1000) // bounded
// Producerselect {case jobs <- j: // успехcase <-ctx.Done(): return ctx.Err()default: // drop policy metrics.DroppedJobs.Inc()}
// или blocking:jobs <- j // блокируется, если канал полон
// Worker poolfor i := 0; i < N; i++ { go func() { for j := range jobs { handle(j) } }()}Semaphore-based concurrency limit:
import "golang.org/x/sync/semaphore"
sem := semaphore.NewWeighted(100) // max 100 concurrent
for j := range jobs { if err := sem.Acquire(ctx, 1); err != nil { break } go func(j Job) { defer sem.Release(1) handle(j) }(j)}Rate limiter:
import "golang.org/x/time/rate"
lim := rate.NewLimiter(rate.Limit(1000), 100) // 1000 RPS, burst 100for j := range jobs { if err := lim.Wait(ctx); err != nil { break } handle(j)}2.8. Pattern: ingestion с DLQ + backpressure
Заголовок раздела «2.8. Pattern: ingestion с DLQ + backpressure»Полный production pipeline для consumer’а Kafka:
type Worker struct { cl *kgo.Client dlqTopic string workers int sem *semaphore.Weighted metrics *Metrics}
func (w *Worker) Run(ctx context.Context) error { for { if err := ctx.Err(); err != nil { return err } fetches := w.cl.PollFetches(ctx) if fetches.IsClientClosed() { return nil }
fetches.EachPartition(func(p kgo.FetchTopicPartition) { for _, r := range p.Records { if err := w.sem.Acquire(ctx, 1); err != nil { return } go func(r *kgo.Record) { defer w.sem.Release(1) w.processWithRetry(ctx, r) }(r) } }) }}
func (w *Worker) processWithRetry(ctx context.Context, r *kgo.Record) { var lastErr error backoff := time.Second for attempt := 1; attempt <= 5; attempt++ { err := w.handle(ctx, r) if err == nil { w.metrics.Processed.Inc() return } if !isRetriable(err) || attempt == 5 { w.toDLQ(ctx, r, err, attempt) return } lastErr = err time.Sleep(backoff) backoff *= 2 if backoff > 60*time.Second { backoff = 60*time.Second } } _ = lastErr}
func (w *Worker) toDLQ(ctx context.Context, r *kgo.Record, err error, attempt int) { record := &kgo.Record{ Topic: w.dlqTopic, Key: r.Key, Value: r.Value, Headers: append(r.Headers, kgo.RecordHeader{Key: "x-error", Value: []byte(err.Error())}, kgo.RecordHeader{Key: "x-original-topic", Value: []byte(r.Topic)}, kgo.RecordHeader{Key: "x-attempt", Value: []byte(strconv.Itoa(attempt))}, kgo.RecordHeader{Key: "x-failed-at", Value: []byte(time.Now().Format(time.RFC3339))}, ), } if err := w.cl.ProduceSync(ctx, record).FirstErr(); err != nil { w.metrics.DLQErrors.Inc() // последний шанс — log на диск } else { w.metrics.DLQSuccess.Inc() }}2.9. Reprocessing DLQ
Заголовок раздела «2.9. Reprocessing DLQ»После фикса бага нужно вернуть сообщения. Tool примерно:
dlqCl := kafkaConsumer("dlq-replay", "events.dlq")prodCl := kafkaProducer()for { fetches := dlqCl.PollFetches(ctx) fetches.EachRecord(func(r *kgo.Record) { origTopic := string(getHeader(r, "x-original-topic")) prodCl.ProduceSync(ctx, &kgo.Record{ Topic: origTopic, Key: r.Key, Value: r.Value, Headers: append(r.Headers, kgo.RecordHeader{Key: "x-replayed", Value: []byte("true")}), }) }) dlqCl.CommitRecords(ctx, fetches.Records()...)}⚠️ Replay сообщений может breakнуть idempotency. Если оригинальный consumer уже частично обработал — повторение даст side effect. Решение: idempotency keys на уровне БД (INSERT ... ON CONFLICT DO NOTHING).
2.10. Monitoring DLQ + lag
Заголовок раздела «2.10. Monitoring DLQ + lag»Метрики, которые обязаны быть:
- DLQ message rate (per topic) — растёт = инцидент.
- DLQ size (cumulative).
- Consumer lag (per partition) — растёт = backpressure не справляется.
- Producer buffer utilization — близко к 100% = скоро будут drops/blocks.
- Process time p99 в consumer’е — растёт = код тормозит.
- Retry rate (внутренние ретраи).
Alerting:
- DLQ rate > X в минуту = page.
- Consumer lag > N сообщений > M минут = page.
- Producer buffer > 80% > 1 минута = warn.
2.11. Producer tuning per workload
Заголовок раздела «2.11. Producer tuning per workload»| Workload | Настройка |
|---|---|
| Low latency (RPC) | linger.ms=0, batch.size=маленький |
| High throughput (logs) | linger.ms=50, batch.size=512KB, compression=zstd |
| Critical (payment events) | acks=all, enable.idempotence=true, min.insync.replicas=2 |
| Best effort (metrics) | acks=0 или acks=1, drop policy на producer’е |
2.12. Consumer tuning per workload
Заголовок раздела «2.12. Consumer tuning per workload»| Workload | Настройка |
|---|---|
| Order-sensitive (single) | max.poll.records=1, no parallel within partition |
| Parallel within partition | Sequence numbers + reordering downstream |
| Async DB write | Batch consumer poll + bulk insert + commit only on bulk success |
| Heavy CPU | Меньше партиций per consumer, больше консьюмеров |
2.13. Schema-evolution + DLQ
Заголовок раздела «2.13. Schema-evolution + DLQ»Главная причина poison’ов в реальности — несовместимые schema изменения.
Защита:
- Schema Registry с FORWARD/BACKWARD/FULL compatibility check на CI.
- Consumer должен уметь читать N-1, N, N+1 версии (graceful unknown fields в Protobuf, ignore in JSON).
- При unknown field в strict-режиме — в DLQ с тегом «schema-mismatch».
2.14. Idempotency
Заголовок раздела «2.14. Idempotency»DLQ-replay = повторная доставка. Уметь:
- Уникальный message ID в headers.
- БД хранит processed_message_ids (TTL 7 дней).
INSERT INTO processed (msg_id) VALUES ($1) ON CONFLICT DO NOTHING RETURNING msg_id;- Если RETURNING пустой — уже обработали, скип.
3. Gotchas (12+)
Заголовок раздела «3. Gotchas (12+)»⚠️ 1. DLQ без алертинга = чёрная дыра
Заголовок раздела «⚠️ 1. DLQ без алертинга = чёрная дыра»Если DLQ не мониторится — туда сваливаются сообщения, и никто не знает. После инцидента «потеряли 1М заказов» обнаруживают, что они в DLQ месяц лежали.
⚠️ 2. Retry без backoff = thundering herd
Заголовок раздела «⚠️ 2. Retry без backoff = thundering herd»Падает downstream сервис → consumer ретраит мгновенно → лавина RPS на еле живой сервис → он окончательно умирает.
⚠️ 3. DLQ rate limit
Заголовок раздела «⚠️ 3. DLQ rate limit»DLQ-producer тоже может тормозить. При больших pol’sons лавиной — DLQ produce виснет → consumer виснет → продают throughput. Используй async DLQ producer с собственным буфером.
⚠️ 4. Бесконечный retry = вечно блокирует partition
Заголовок раздела «⚠️ 4. Бесконечный retry = вечно блокирует partition»In-order processing + infinite retry на одной partition = вся очередь стоит. Лимит retry + DLQ обязателен.
⚠️ 5. DLQ replay перед фиксом бага
Заголовок раздела «⚠️ 5. DLQ replay перед фиксом бага»«Давайте перешлём из DLQ обратно» — сообщения попадают в тот же сломанный код → опять в DLQ. Прежде чем replay’ить, подтверди, что код пофикшен.
⚠️ 6. Backpressure через blocking send в HTTP handler
Заголовок раздела «⚠️ 6. Backpressure через blocking send в HTTP handler»Если в HTTP-обработчике делаешь kafka.ProduceSync() и producer buffer полный — handler блокируется на минуты. Клиент видит timeout. Используй async + bounded queue.
⚠️ 7. Buffered channel = unbounded memory если producer быстрее consumer’a
Заголовок раздела «⚠️ 7. Buffered channel = unbounded memory если producer быстрее consumer’a»make(chan T, 10000) — это bound 10k в RAM. Если producer пишет 1M/s, а consumer 100k/s — за секунду заполнится, дальше дропы.
⚠️ 8. NATS push consumer без max_ack_pending
Заголовок раздела «⚠️ 8. NATS push consumer без max_ack_pending»Broker зашлёт миллион сообщений → consumer OOM. Всегда лимитируй.
⚠️ 9. Kafka pause/resume не вызывает rebalance
Заголовок раздела «⚠️ 9. Kafka pause/resume не вызывает rebalance»pauseFetchPartitions — это локальное состояние клиента, broker ничего не знает. Если pause держит долго — max.poll.interval.ms всё равно тикает. Не путай.
⚠️ 10. Rate limit на producer’е и burst
Заголовок раздела «⚠️ 10. Rate limit на producer’е и burst»rate.NewLimiter(1000, 100) — 1000 RPS, но при burst 100 разрешает мгновенные всплески 100 req за раз. Это может перегружать downstream при чрезмерно больших batch’ах.
⚠️ 11. DLQ полный = DLQ DLQ?
Заголовок раздела «⚠️ 11. DLQ полный = DLQ DLQ?»Если DLQ переполнен / недоступен — куда деть сообщение? Решения:
- Логировать на disk (degraded mode).
- Алерт «critical»: DLQ down.
- НИКОГДА не игнорировать, не log.Println и не continue.
⚠️ 12. Schema mismatch не всегда poison
Заголовок раздела «⚠️ 12. Schema mismatch не всегда poison»Иногда «unknown field» в новой версии — не ошибка, а forward-compat. Различай: «не знаю поле» vs «не парсится вообще». Только второе — в DLQ.
⚠️ 13. Replay меняет ordering
Заголовок раздела «⚠️ 13. Replay меняет ordering»При DLQ replay сообщения возвращаются в основной топик, но уже в другом порядке (потеряли offset). Если приложение зависит от order — ломается.
⚠️ 14. RabbitMQ x-death header overflow
Заголовок раздела «⚠️ 14. RabbitMQ x-death header overflow»При многоступенчатом retry массив x-death растёт. RabbitMQ может уронить connection при слишком больших headers. Лимит ретраев в headers + чистка лишнего.
⚠️ 15. Consumer group offset reset
Заголовок раздела «⚠️ 15. Consumer group offset reset»Если случайно сделал --reset-offsets --to-earliest — все consumer’ы перечитают с начала, DLQ заполнится дубликатами. Idempotency спасает.
4. Real cases
Заголовок раздела «4. Real cases»Case 1: Финансовая система с DLQ chain
Заголовок раздела «Case 1: Финансовая система с DLQ chain»Контекст: Платежи. Иногда downstream PSP (payment provider) недоступен 30 секунд.
Архитектура:
payments.in → consumer → external HTTP call │ fail (5xx) │ ▼ payments.retry.30s (Kafka with timestamp delay) │ consumer wait → retry │ fail × 3 ▼ payments.dlq → manual review (UI)Использовали Kafka Streams с time-windowed retries. После фикса проблемы на стороне PSP — UI «mass replay» возвращает в payments.in.
Case 2: Backpressure спас сервис от OOM
Заголовок раздела «Case 2: Backpressure спас сервис от OOM»Контекст: Сервис принимает событий и пишет в ClickHouse. ClickHouse раз в неделю делает merge — становится медленным.
Симптом: во время merge — буфер producer’а заполняется, latency HTTP-handler’ов растёт до 30 секунд.
Решение: заменили producer.Send(blocking) на:
select {case bufferedCh <- evt:default: // bounded channel full metrics.DroppedEvents.Inc() // и в HTTP отвечаем 503 Service Unavailable + Retry-After}Лучше дропнуть 0.1% сообщений и оставить latency низкой, чем повалить всё.
Case 3: Poison message с парсингом UTF-8
Заголовок раздела «Case 3: Poison message с парсингом UTF-8»Контекст: consumer падал на одном сообщении, не помогал даже restart pod’а.
Расследование: в payload был байт 0xff, который ломал утилиту распаковки.
Решение:
- Catch panic в обработчике.
- DLQ-handler с тегом
panic: .... - Конкретное сообщение пошло в DLQ, остальная очередь поехала.
- Создали JIRA для фикса парсера.
Case 4: NATS max_ack_pending спас от OOM
Заголовок раздела «Case 4: NATS max_ack_pending спас от OOM»Контекст: push consumer JetStream без лимита. Producer ускорился × 10 раз ночью.
Симптом: consumer pod упал с OOM (8GB → 16GB → ещё больше).
Решение:
- Перешли на pull consumer.
MaxAckPending: 5000.- Manual fetch batches по 100.
- Latency немного выросла, но stability — 100%.
Case 5: Replay 5M сообщений после фикса схемы
Заголовок раздела «Case 5: Replay 5M сообщений после фикса схемы»Контекст: Bug — поле amount иногда null, consumer падал, всё уходило в DLQ. За сутки накопилось 5M.
Решение:
- Fixed consumer code (граффуль null handling).
- Deployed.
- Запустили DLQ replay tool с rate limit 1000/s — за час всё переиграно.
- Idempotency keys предотвратили дубликаты (часть сообщений уже была обработана при ретраях).
5. Вопросы (25)
Заголовок раздела «5. Вопросы (25)»- Что такое DLQ и зачем он нужен?
- Что такое poison message? Приведи примеры причин.
- Какие стратегии обработки failed-message бывают?
- Как реализовать DLQ в Kafka? Какие headers добавлять?
- Что такое retry topic chain? Почему именно с разными delay?
- Как DLQ реализован в RabbitMQ через
x-dead-letter-exchange? - Что такое
x-deathheader и какие risks связаны? - Как сделать DLQ в NATS JetStream? Что такое advisory subjects?
- Что такое backpressure и какие стратегии?
- Чем pull-based лучше push-based для backpressure?
- Как работает producer backpressure в Kafka (
buffer.memory,max.block.ms)? - Как Kafka consumer достигает backpressure естественным образом?
- Что такое
pauseFetchPartitionsи в чём pitfalls? - Что такое
max_ack_pendingв NATS push consumer? - Как реализуется backpressure в gRPC streaming?
- Как в Go сделать bounded buffer с drop policy?
- Что такое semaphore в
golang.org/x/sync/semaphore? - Как
golang.org/x/time/rateпомогает с rate limiting? - Что критично мониторить вокруг DLQ?
- Какие risks при DLQ replay? Как защититься?
- Зачем нужны idempotency keys? Как их реализовать в БД?
- Какие настройки producer’а для low-latency vs high-throughput vs critical?
- Как parallelize обработку in-order Kafka partition без потери порядка глобально?
- Что произойдёт, если DLQ-producer тоже не работает?
- Как валидировать схему сообщения, чтобы избежать poison’ов?
6. Practice
Заголовок раздела «6. Practice»6.1. Kafka DLQ-handler
Заголовок раздела «6.1. Kafka DLQ-handler»Реализуй consumer с retry + DLQ:
type DLQHandler struct { cl *kgo.Client; dlq string }
func (h *DLQHandler) HandleOrFail(ctx context.Context, r *kgo.Record, fn func(*kgo.Record) error) { for attempt := 1; attempt <= 3; attempt++ { if err := fn(r); err == nil { return } time.Sleep(time.Duration(attempt*attempt) * time.Second) } h.cl.ProduceSync(ctx, &kgo.Record{ Topic: h.dlq, Key: r.Key, Value: r.Value, Headers: append(r.Headers, kgo.RecordHeader{Key: "x-orig-topic", Value: []byte(r.Topic)}), })}6.2. Bounded buffer + drop policy в Go
Заголовок раздела «6.2. Bounded buffer + drop policy в Go»Напиши producer, который отдаёт 503 при заполнении буфера:
ch := make(chan Event, 10000)http.HandleFunc("/event", func(w http.ResponseWriter, r *http.Request) { select { case ch <- parse(r): w.WriteHeader(202) default: w.Header().Set("Retry-After", "1") w.WriteHeader(503) }})6.3. NATS pull consumer с MaxAckPending
Заголовок раздела «6.3. NATS pull consumer с MaxAckPending»cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Durable: "worker", AckPolicy: jetstream.AckExplicitPolicy, MaxAckPending: 100, MaxDeliver: 5, AckWait: 30*time.Second,})for { msgs, _ := cons.Fetch(10, jetstream.FetchMaxWait(5*time.Second)) for m := range msgs.Messages() { if err := process(m); err != nil { m.Nak(); continue } m.Ack() }}6.4. Rate limiter
Заголовок раздела «6.4. Rate limiter»lim := rate.NewLimiter(rate.Limit(500), 50)for j := range jobs { lim.Wait(ctx) go handle(j)}6.5. DLQ replay tool
Заголовок раздела «6.5. DLQ replay tool»CLI, который читает из DLQ и шлёт обратно с throttle:
dlq-replay --src events.dlq --dst events --rate 1000 --since 2026-05-20T10:00:00Z6.6. Idempotency через Postgres
Заголовок раздела «6.6. Idempotency через Postgres»CREATE TABLE processed_events ( msg_id TEXT PRIMARY KEY, processed_at TIMESTAMPTZ DEFAULT now());-- old records cleanup:DELETE FROM processed_events WHERE processed_at < now() - interval '7 days';res, err := db.Exec(ctx, `INSERT INTO processed_events (msg_id) VALUES ($1) ON CONFLICT DO NOTHING`, msgID)if err != nil { return err }n, _ := res.RowsAffected()if n == 0 { return nil } // duplicate, уже обработалиprocessMessage(...)6.7. Schema validation + DLQ tagging
Заголовок раздела «6.7. Schema validation + DLQ tagging»При unmarshal ошибке — в DLQ с тегом «schema»:
var evt OrderEventif err := proto.Unmarshal(r.Value, &evt); err != nil { toDLQWithReason(r, "schema-decode-error", err) return}7. Источники
Заголовок раздела «7. Источники»- Confluent — Dead Letter Queues with Apache Kafka. https://www.confluent.io/blog/error-handling-patterns-in-kafka/
- Kafka Connect Dead Letter Queue config. https://docs.confluent.io/platform/current/connect/concepts.html#dead-letter-queue
- RabbitMQ — Dead Letter Exchanges. https://www.rabbitmq.com/dlx.html
- RabbitMQ — Delayed Message Plugin. https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
- NATS JetStream — Advisory Subjects. https://docs.nats.io/nats-concepts/jetstream/event_management
- NATS JetStream — Consumers. https://docs.nats.io/nats-concepts/jetstream/consumers
- «Designing Data-Intensive Applications» — Martin Kleppmann (Chapter 11: Stream Processing).
- «Kafka: The Definitive Guide» (2nd ed.) — Gwen Shapira et al. Chapter «Reliable Data Delivery».
- golang.org/x/sync/semaphore. https://pkg.go.dev/golang.org/x/sync/semaphore
- golang.org/x/time/rate. https://pkg.go.dev/golang.org/x/time/rate
- Uber Engineering Blog — Handling Failure in Streaming Pipelines. https://eng.uber.com/
- Microsoft — Retry pattern. https://learn.microsoft.com/en-us/azure/architecture/patterns/retry
- AWS — Dead Letter Queues (SQS). https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html
- Lyft Envoy — Backpressure & Circuit Breakers. https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/circuit_breaking
- «Reactive Streams Specification». https://www.reactive-streams.org/
- Netflix — Adaptive Concurrency Limits. https://netflixtechblog.com/performance-under-load-3e6fa9a60581
- franz-go pause/resume. https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo