Перейти к содержимому

31. DLQ + Backpressure в production streaming

Зачем знать на Middle 3: Production streaming-системы падают не из-за throughput’а, а из-за двух классов сбоев: poison messages (некорректные данные намертво блокируют consumer) и backpressure (медленный consumer глушит всю систему). Senior обязан проектировать DLQ-стратегию (что считать poison, куда складывать, кто разбирает) и backpressure-стратегию (bounded buffers, pull-модель, rate limits) на уровне архитектуры. Без этого Kafka/NATS/RabbitMQ в проде превращаются в «время от времени забивающиеся трубы» — а это инциденты, потери денег и репутации.


  1. Концепция
  2. Production-deep dive
  3. Gotchas
  4. Real cases
  5. Вопросы
  6. Practice
  7. Источники

DLQ (Dead Letter Queue) — отдельная очередь/топик, куда отправляются сообщения, которые не могут быть обработаны consumer’ом.

Причины «poison message»:

  • Невалидный формат / breaking schema.
  • Невалидные данные (ссылка на удалённую сущность, integer overflow).
  • Систематическая ошибка обработчика (бесконечная NPE).
  • Дубликат, нарушающий бизнес-инвариант.

Без DLQ poison-сообщение:

  1. Не ack’ается → перевыполняется.
  2. Блокирует consumer на этом offset’е (особенно in-order processing).
  3. Очередь растёт за ним.

Стратегии обработки:

┌────────────────────────────────────────┐
│ Producer → Queue → Consumer │
└─────────┬──────────────────────────────┘
▼ fail на обработке
┌─────────────────────────────────────────┐
│ Что делать с этим сообщением? │
└─────────┬───────────────────────────────┘
┌─────────────┼──────────────┬─────────────────┐
▼ ▼ ▼ ▼
SKIP RETRY RETRY + DLQ
(log) (n раз) backoff (move out)
Manual review /
Auto-reprocess
после фикса

Backpressure — механизм, при котором медленный consumer сигнализирует producer’у замедлиться, чтобы система не «лопнула».

Без backpressure:

  • Producer пишет быстрее, чем consumer читает.
  • Буфер растёт.
  • OOM / disk full / lag растёт до часов.
  • Producer’ы виснут / роняют сообщения.

С backpressure:

  • Producer узнаёт о медленности и тормозит / отбрасывает.
  • Система остаётся стабильной (graceful degradation).

Стратегии backpressure:

СтратегияЧто делаетTrade-off
BlockProducer ждёт (блокируется на send)Latency растёт, не теряем
DropProducer выкидывает сообщениеТеряем, latency низкая
Buffer + spillProducer пишет на диск при переполнении RAMСложнее, но универсально
Pull-basedConsumer сам запрашивает batch размером, какой может обработатьЕстественный backpressure
Reactive signalConsumer явно сообщает «slow down N%»Сложнее в реализации

Kafka не имеет встроенного DLQ. Реализуется вручную.

Pattern 1: Per-topic DLQ

events → consumer fails → events.dlq
events.dlq → DLQ-consumer / manual tool / alert

Consumer:

for {
fetches := kafkaCl.PollFetches(ctx)
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
for _, r := range p.Records {
if err := handle(r); err != nil {
if isPoison(err) {
// отправляем в DLQ с meta
dlqRecord := &kgo.Record{
Topic: r.Topic + ".dlq",
Key: r.Key,
Value: r.Value,
Headers: []kgo.RecordHeader{
{Key: "x-original-topic", Value: []byte(r.Topic)},
{Key: "x-original-partition", Value: []byte(strconv.FormatInt(int64(r.Partition), 10))},
{Key: "x-original-offset", Value: []byte(strconv.FormatInt(r.Offset, 10))},
{Key: "x-error", Value: []byte(err.Error())},
{Key: "x-failed-at", Value: []byte(time.Now().Format(time.RFC3339))},
{Key: "x-attempt", Value: []byte("1")},
},
}
kafkaCl.ProduceSync(ctx, dlqRecord)
// commit offset, чтобы не зацикливаться
kafkaCl.CommitRecords(ctx, r)
} else {
// retriable error, не committим
time.Sleep(backoff)
continue
}
}
}
})
}

Pattern 2: Retry topic chain

events → events.retry.5s → events.retry.30s → events.retry.5m → events.dlq

Каждый retry topic имеет свой delay (в Kafka — через timestamp + consumer, который ждёт, или через Kafka Streams).

// При неуспехе отправляем в следующий retry topic
attempt := getHeader(r, "x-attempt", 1) + 1
nextTopic := selectRetryTopic(attempt)
delaySend(nextTopic, r, computeDelay(attempt))

Pattern 3: Confluent Kafka Connect — Dead Letter Queue config

В Kafka Connect (для source/sink connector’ов):

errors.tolerance=all
errors.deadletterqueue.topic.name=connect-dlq
errors.deadletterqueue.context.headers.enable=true
errors.deadletterqueue.topic.replication.factor=3

⚠️ Best practices DLQ в Kafka:

  1. DLQ topic = compacted? Нет. Обычно DLQ — limits-based retention (нельзя терять poison’ы).
  2. Replication.factor ≥ 3 — DLQ важен, не теряй.
  3. Schema schema-less (raw bytes) — DLQ должен принимать что угодно.
  4. Headers с meta — обязательно: source topic, partition, offset, error, attempt, timestamp.
  5. Алертинг — DLQ растёт = что-то сломалось.
  6. Tool для reprocess — после фикса бага UI/CLI, который читает DLQ и шлёт обратно в нужный topic.

RabbitMQ имеет встроенный DLQ через x-dead-letter-exchange.

# При создании очереди:
channel.queue_declare(
queue='orders',
arguments={
'x-dead-letter-exchange': 'orders-dlx',
'x-dead-letter-routing-key': 'orders-dlq',
'x-message-ttl': 60000, # 60s, потом auto-DLQ
'x-max-length': 100000,
}
)

Когда сообщение попадает в DLQ:

  1. Reject/nack без requeue: channel.basic_nack(delivery_tag, requeue=false).
  2. TTL expired: сообщение в очереди слишком долго.
  3. Queue length exceeded: очередь переполнена.

DLQ сообщение получает headers x-death:

"x-death": [
{
"count": 3,
"reason": "rejected",
"queue": "orders",
"exchange": "orders-ex",
"routing-keys": ["orders.create"],
"time": "2026-05-21T10:00:00"
}
]

Delayed retry в RabbitMQ:

Нет нативного delay (только plugin rabbitmq-delayed-message-exchange). Workaround — TTL-based retry queues:

main_queue (process) → dlx → retry_queue_5s (TTL=5s, no consumer) → dlx → main_queue

После TTL сообщение возвращается. Лимит попыток — в headers x-attempt.

JetStream имеет MaxDeliver — после N попыток сообщение terminated (не пересылается). Built-in DLQ нет, но реализуется через advisory subjects.

JetStream публикует advisory события:

  • $JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.<stream>.<consumer>
  • $JS.EVENT.ADVISORY.CONSUMER.MSG_NAKED.<stream>.<consumer> (если негативный ack)
  • $JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED.<stream>.<consumer>

Pattern: подписываемся на advisory + переотправляем в DLQ stream.

// Consumer основного stream'а с MaxDeliver=5
cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Name: "worker", Durable: "worker",
AckPolicy: jetstream.AckExplicitPolicy,
MaxDeliver: 5, AckWait: 30*time.Second,
})
// DLQ subscriber на advisory
nc.Subscribe("$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.ORDERS.worker", func(m *nats.Msg) {
var ev jetstream.ConsumerDeliveryExceededAdvisory
json.Unmarshal(m.Data, &ev)
// Fetch original message and push to DLQ stream
rawMsg, _ := stream.GetMsg(ctx, ev.StreamSeq)
js.Publish(ctx, "orders.dlq", rawMsg.Data, jetstream.WithMsgID(...))
})

Альтернатива: explicit Term() вместо Nak(), и в обработчике catch’ить через wrapper’ы.

Recommended pattern: dlq.<stream>.<consumer> стрим, и DLQ-handler как отдельный consumer.

Producer side:

buffer.memory=33554432 # 32MB
max.block.ms=60000 # при полном буфере — ждать до 60s, потом ошибка
acks=all
linger.ms=10

Producer аккумулирует сообщения в buffer.memory. Когда полный — send() блокируется до max.block.ms, потом возвращает BufferExhaustedException.

Backpressure flow:

App ──Produce()──► Producer Client ──Network──► Broker
▼ buffer full → block / drop
blocking

⚠️ Если app не учитывает backpressure — поток отправки блокируется → ваш RPC handler виснет → клиент видит timeout.

Consumer side:

poll(Duration) — pull-based. Consumer сам решает скорость. Pull = inherent backpressure.

max.poll.records=500 # сколько забрать за один poll
fetch.min.bytes=1048576 # 1MB — broker ждёт пока накопится
fetch.max.wait.ms=500 # или 500ms
max.partition.fetch.bytes=10485760 # max bytes per partition

Гибкий backpressure:

Если обработка медленная, не убирай max.poll.records ниже — это уменьшит throughput. Лучше:

  1. Делать poll() чаще, обрабатывать асинхронно.
  2. Параллелить per-key (если order не нужен на partition).
  3. Pause partition: pauseFetchPartitions(...) — стопает fetch для конкретных партиций, не разрывая группу.
// franz-go pause/resume
cl.PauseFetchPartitions(map[string][]int32{"events": {0, 1}})
// ... process slowly ...
cl.ResumeFetchPartitions(map[string][]int32{"events": {0, 1}})

Pull consumer — естественный backpressure: consumer вызывает Fetch(batch_size), broker отдаёт максимум batch_size.

Push consumer:

max_ack_pending (важнейший параметр!) — сколько сообщений не ack’нутых может быть в полёте одновременно.

jetstream.ConsumerConfig{
MaxAckPending: 1000, // если 1000 не ack'нуты — broker не шлёт новые
}

Это flow control: broker сам тормозит отправку.

Flow Control (flow_control: true) — broker периодически шлёт «echo back ack» сообщения. Если клиент их не подтверждает — broker замедляется.

⚠️ MaxAckPending = 0 (unlimited) в push consumer — путь к перегрузке. ВСЕГДА ставь конкретное значение.

gRPC поверх HTTP/2 имеет встроенный flow control window.

  • Server и client объявляют, сколько байт каждый может принять.
  • WINDOW_UPDATE фреймы расширяют окно по мере чтения.
  • Если получатель не читает — окно сжимается → отправитель блокируется.

В Go:

// Не нужно делать вручную, gRPC сам.
// Но если ты не вызываешь stream.Recv() / stream.Send() — окно закрывается.

⚠️ Если используешь for { stream.Recv() } в горутине без обработки — окно закроется, отправитель повиснет. Backpressure правильный, но видно как «зависание».

Bounded channel — самый простой backpressure:

jobs := make(chan Job, 1000) // bounded
// Producer
select {
case jobs <- j:
// успех
case <-ctx.Done():
return ctx.Err()
default:
// drop policy
metrics.DroppedJobs.Inc()
}
// или blocking:
jobs <- j // блокируется, если канал полон
// Worker pool
for i := 0; i < N; i++ {
go func() {
for j := range jobs {
handle(j)
}
}()
}

Semaphore-based concurrency limit:

import "golang.org/x/sync/semaphore"
sem := semaphore.NewWeighted(100) // max 100 concurrent
for j := range jobs {
if err := sem.Acquire(ctx, 1); err != nil { break }
go func(j Job) {
defer sem.Release(1)
handle(j)
}(j)
}

Rate limiter:

import "golang.org/x/time/rate"
lim := rate.NewLimiter(rate.Limit(1000), 100) // 1000 RPS, burst 100
for j := range jobs {
if err := lim.Wait(ctx); err != nil { break }
handle(j)
}

Полный production pipeline для consumer’а Kafka:

type Worker struct {
cl *kgo.Client
dlqTopic string
workers int
sem *semaphore.Weighted
metrics *Metrics
}
func (w *Worker) Run(ctx context.Context) error {
for {
if err := ctx.Err(); err != nil { return err }
fetches := w.cl.PollFetches(ctx)
if fetches.IsClientClosed() { return nil }
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
for _, r := range p.Records {
if err := w.sem.Acquire(ctx, 1); err != nil { return }
go func(r *kgo.Record) {
defer w.sem.Release(1)
w.processWithRetry(ctx, r)
}(r)
}
})
}
}
func (w *Worker) processWithRetry(ctx context.Context, r *kgo.Record) {
var lastErr error
backoff := time.Second
for attempt := 1; attempt <= 5; attempt++ {
err := w.handle(ctx, r)
if err == nil {
w.metrics.Processed.Inc()
return
}
if !isRetriable(err) || attempt == 5 {
w.toDLQ(ctx, r, err, attempt)
return
}
lastErr = err
time.Sleep(backoff)
backoff *= 2
if backoff > 60*time.Second { backoff = 60*time.Second }
}
_ = lastErr
}
func (w *Worker) toDLQ(ctx context.Context, r *kgo.Record, err error, attempt int) {
record := &kgo.Record{
Topic: w.dlqTopic, Key: r.Key, Value: r.Value,
Headers: append(r.Headers,
kgo.RecordHeader{Key: "x-error", Value: []byte(err.Error())},
kgo.RecordHeader{Key: "x-original-topic", Value: []byte(r.Topic)},
kgo.RecordHeader{Key: "x-attempt", Value: []byte(strconv.Itoa(attempt))},
kgo.RecordHeader{Key: "x-failed-at", Value: []byte(time.Now().Format(time.RFC3339))},
),
}
if err := w.cl.ProduceSync(ctx, record).FirstErr(); err != nil {
w.metrics.DLQErrors.Inc()
// последний шанс — log на диск
} else {
w.metrics.DLQSuccess.Inc()
}
}

После фикса бага нужно вернуть сообщения. Tool примерно:

dlqCl := kafkaConsumer("dlq-replay", "events.dlq")
prodCl := kafkaProducer()
for {
fetches := dlqCl.PollFetches(ctx)
fetches.EachRecord(func(r *kgo.Record) {
origTopic := string(getHeader(r, "x-original-topic"))
prodCl.ProduceSync(ctx, &kgo.Record{
Topic: origTopic, Key: r.Key, Value: r.Value,
Headers: append(r.Headers, kgo.RecordHeader{Key: "x-replayed", Value: []byte("true")}),
})
})
dlqCl.CommitRecords(ctx, fetches.Records()...)
}

⚠️ Replay сообщений может breakнуть idempotency. Если оригинальный consumer уже частично обработал — повторение даст side effect. Решение: idempotency keys на уровне БД (INSERT ... ON CONFLICT DO NOTHING).

Метрики, которые обязаны быть:

  • DLQ message rate (per topic) — растёт = инцидент.
  • DLQ size (cumulative).
  • Consumer lag (per partition) — растёт = backpressure не справляется.
  • Producer buffer utilization — близко к 100% = скоро будут drops/blocks.
  • Process time p99 в consumer’е — растёт = код тормозит.
  • Retry rate (внутренние ретраи).

Alerting:

  • DLQ rate > X в минуту = page.
  • Consumer lag > N сообщений > M минут = page.
  • Producer buffer > 80% > 1 минута = warn.
WorkloadНастройка
Low latency (RPC)linger.ms=0, batch.size=маленький
High throughput (logs)linger.ms=50, batch.size=512KB, compression=zstd
Critical (payment events)acks=all, enable.idempotence=true, min.insync.replicas=2
Best effort (metrics)acks=0 или acks=1, drop policy на producer’е
WorkloadНастройка
Order-sensitive (single)max.poll.records=1, no parallel within partition
Parallel within partitionSequence numbers + reordering downstream
Async DB writeBatch consumer poll + bulk insert + commit only on bulk success
Heavy CPUМеньше партиций per consumer, больше консьюмеров

Главная причина poison’ов в реальности — несовместимые schema изменения.

Защита:

  1. Schema Registry с FORWARD/BACKWARD/FULL compatibility check на CI.
  2. Consumer должен уметь читать N-1, N, N+1 версии (graceful unknown fields в Protobuf, ignore in JSON).
  3. При unknown field в strict-режиме — в DLQ с тегом «schema-mismatch».

DLQ-replay = повторная доставка. Уметь:

  • Уникальный message ID в headers.
  • БД хранит processed_message_ids (TTL 7 дней).
  • INSERT INTO processed (msg_id) VALUES ($1) ON CONFLICT DO NOTHING RETURNING msg_id;
  • Если RETURNING пустой — уже обработали, скип.

Если DLQ не мониторится — туда сваливаются сообщения, и никто не знает. После инцидента «потеряли 1М заказов» обнаруживают, что они в DLQ месяц лежали.

Падает downstream сервис → consumer ретраит мгновенно → лавина RPS на еле живой сервис → он окончательно умирает.

DLQ-producer тоже может тормозить. При больших pol’sons лавиной — DLQ produce виснет → consumer виснет → продают throughput. Используй async DLQ producer с собственным буфером.

⚠️ 4. Бесконечный retry = вечно блокирует partition

Заголовок раздела «⚠️ 4. Бесконечный retry = вечно блокирует partition»

In-order processing + infinite retry на одной partition = вся очередь стоит. Лимит retry + DLQ обязателен.

«Давайте перешлём из DLQ обратно» — сообщения попадают в тот же сломанный код → опять в DLQ. Прежде чем replay’ить, подтверди, что код пофикшен.

Если в HTTP-обработчике делаешь kafka.ProduceSync() и producer buffer полный — handler блокируется на минуты. Клиент видит timeout. Используй async + bounded queue.

⚠️ 7. Buffered channel = unbounded memory если producer быстрее consumer’a

Заголовок раздела «⚠️ 7. Buffered channel = unbounded memory если producer быстрее consumer’a»

make(chan T, 10000) — это bound 10k в RAM. Если producer пишет 1M/s, а consumer 100k/s — за секунду заполнится, дальше дропы.

Broker зашлёт миллион сообщений → consumer OOM. Всегда лимитируй.

pauseFetchPartitions — это локальное состояние клиента, broker ничего не знает. Если pause держит долго — max.poll.interval.ms всё равно тикает. Не путай.

rate.NewLimiter(1000, 100) — 1000 RPS, но при burst 100 разрешает мгновенные всплески 100 req за раз. Это может перегружать downstream при чрезмерно больших batch’ах.

Если DLQ переполнен / недоступен — куда деть сообщение? Решения:

  • Логировать на disk (degraded mode).
  • Алерт «critical»: DLQ down.
  • НИКОГДА не игнорировать, не log.Println и не continue.

Иногда «unknown field» в новой версии — не ошибка, а forward-compat. Различай: «не знаю поле» vs «не парсится вообще». Только второе — в DLQ.

При DLQ replay сообщения возвращаются в основной топик, но уже в другом порядке (потеряли offset). Если приложение зависит от order — ломается.

При многоступенчатом retry массив x-death растёт. RabbitMQ может уронить connection при слишком больших headers. Лимит ретраев в headers + чистка лишнего.

Если случайно сделал --reset-offsets --to-earliest — все consumer’ы перечитают с начала, DLQ заполнится дубликатами. Idempotency спасает.


Контекст: Платежи. Иногда downstream PSP (payment provider) недоступен 30 секунд.

Архитектура:

payments.in → consumer → external HTTP call
fail (5xx) │
payments.retry.30s (Kafka with timestamp delay)
consumer wait → retry
fail × 3
payments.dlq → manual review (UI)

Использовали Kafka Streams с time-windowed retries. После фикса проблемы на стороне PSP — UI «mass replay» возвращает в payments.in.

Контекст: Сервис принимает событий и пишет в ClickHouse. ClickHouse раз в неделю делает merge — становится медленным.

Симптом: во время merge — буфер producer’а заполняется, latency HTTP-handler’ов растёт до 30 секунд.

Решение: заменили producer.Send(blocking) на:

select {
case bufferedCh <- evt:
default:
// bounded channel full
metrics.DroppedEvents.Inc()
// и в HTTP отвечаем 503 Service Unavailable + Retry-After
}

Лучше дропнуть 0.1% сообщений и оставить latency низкой, чем повалить всё.

Контекст: consumer падал на одном сообщении, не помогал даже restart pod’а.

Расследование: в payload был байт 0xff, который ломал утилиту распаковки.

Решение:

  • Catch panic в обработчике.
  • DLQ-handler с тегом panic: ....
  • Конкретное сообщение пошло в DLQ, остальная очередь поехала.
  • Создали JIRA для фикса парсера.

Контекст: push consumer JetStream без лимита. Producer ускорился × 10 раз ночью.

Симптом: consumer pod упал с OOM (8GB → 16GB → ещё больше).

Решение:

  • Перешли на pull consumer.
  • MaxAckPending: 5000.
  • Manual fetch batches по 100.
  • Latency немного выросла, но stability — 100%.

Контекст: Bug — поле amount иногда null, consumer падал, всё уходило в DLQ. За сутки накопилось 5M.

Решение:

  1. Fixed consumer code (граффуль null handling).
  2. Deployed.
  3. Запустили DLQ replay tool с rate limit 1000/s — за час всё переиграно.
  4. Idempotency keys предотвратили дубликаты (часть сообщений уже была обработана при ретраях).

  1. Что такое DLQ и зачем он нужен?
  2. Что такое poison message? Приведи примеры причин.
  3. Какие стратегии обработки failed-message бывают?
  4. Как реализовать DLQ в Kafka? Какие headers добавлять?
  5. Что такое retry topic chain? Почему именно с разными delay?
  6. Как DLQ реализован в RabbitMQ через x-dead-letter-exchange?
  7. Что такое x-death header и какие risks связаны?
  8. Как сделать DLQ в NATS JetStream? Что такое advisory subjects?
  9. Что такое backpressure и какие стратегии?
  10. Чем pull-based лучше push-based для backpressure?
  11. Как работает producer backpressure в Kafka (buffer.memory, max.block.ms)?
  12. Как Kafka consumer достигает backpressure естественным образом?
  13. Что такое pauseFetchPartitions и в чём pitfalls?
  14. Что такое max_ack_pending в NATS push consumer?
  15. Как реализуется backpressure в gRPC streaming?
  16. Как в Go сделать bounded buffer с drop policy?
  17. Что такое semaphore в golang.org/x/sync/semaphore?
  18. Как golang.org/x/time/rate помогает с rate limiting?
  19. Что критично мониторить вокруг DLQ?
  20. Какие risks при DLQ replay? Как защититься?
  21. Зачем нужны idempotency keys? Как их реализовать в БД?
  22. Какие настройки producer’а для low-latency vs high-throughput vs critical?
  23. Как parallelize обработку in-order Kafka partition без потери порядка глобально?
  24. Что произойдёт, если DLQ-producer тоже не работает?
  25. Как валидировать схему сообщения, чтобы избежать poison’ов?

Реализуй consumer с retry + DLQ:

type DLQHandler struct { cl *kgo.Client; dlq string }
func (h *DLQHandler) HandleOrFail(ctx context.Context, r *kgo.Record, fn func(*kgo.Record) error) {
for attempt := 1; attempt <= 3; attempt++ {
if err := fn(r); err == nil { return }
time.Sleep(time.Duration(attempt*attempt) * time.Second)
}
h.cl.ProduceSync(ctx, &kgo.Record{
Topic: h.dlq, Key: r.Key, Value: r.Value,
Headers: append(r.Headers,
kgo.RecordHeader{Key: "x-orig-topic", Value: []byte(r.Topic)}),
})
}

Напиши producer, который отдаёт 503 при заполнении буфера:

ch := make(chan Event, 10000)
http.HandleFunc("/event", func(w http.ResponseWriter, r *http.Request) {
select {
case ch <- parse(r):
w.WriteHeader(202)
default:
w.Header().Set("Retry-After", "1")
w.WriteHeader(503)
}
})
cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "worker", AckPolicy: jetstream.AckExplicitPolicy,
MaxAckPending: 100, MaxDeliver: 5, AckWait: 30*time.Second,
})
for {
msgs, _ := cons.Fetch(10, jetstream.FetchMaxWait(5*time.Second))
for m := range msgs.Messages() {
if err := process(m); err != nil { m.Nak(); continue }
m.Ack()
}
}
lim := rate.NewLimiter(rate.Limit(500), 50)
for j := range jobs {
lim.Wait(ctx)
go handle(j)
}

CLI, который читает из DLQ и шлёт обратно с throttle:

Окно терминала
dlq-replay --src events.dlq --dst events --rate 1000 --since 2026-05-20T10:00:00Z
CREATE TABLE processed_events (
msg_id TEXT PRIMARY KEY,
processed_at TIMESTAMPTZ DEFAULT now()
);
-- old records cleanup:
DELETE FROM processed_events WHERE processed_at < now() - interval '7 days';
res, err := db.Exec(ctx, `INSERT INTO processed_events (msg_id) VALUES ($1) ON CONFLICT DO NOTHING`, msgID)
if err != nil { return err }
n, _ := res.RowsAffected()
if n == 0 { return nil } // duplicate, уже обработали
processMessage(...)

При unmarshal ошибке — в DLQ с тегом «schema»:

var evt OrderEvent
if err := proto.Unmarshal(r.Value, &evt); err != nil {
toDLQWithReason(r, "schema-decode-error", err)
return
}

  1. Confluent — Dead Letter Queues with Apache Kafka. https://www.confluent.io/blog/error-handling-patterns-in-kafka/
  2. Kafka Connect Dead Letter Queue config. https://docs.confluent.io/platform/current/connect/concepts.html#dead-letter-queue
  3. RabbitMQ — Dead Letter Exchanges. https://www.rabbitmq.com/dlx.html
  4. RabbitMQ — Delayed Message Plugin. https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
  5. NATS JetStream — Advisory Subjects. https://docs.nats.io/nats-concepts/jetstream/event_management
  6. NATS JetStream — Consumers. https://docs.nats.io/nats-concepts/jetstream/consumers
  7. «Designing Data-Intensive Applications» — Martin Kleppmann (Chapter 11: Stream Processing).
  8. «Kafka: The Definitive Guide» (2nd ed.) — Gwen Shapira et al. Chapter «Reliable Data Delivery».
  9. golang.org/x/sync/semaphore. https://pkg.go.dev/golang.org/x/sync/semaphore
  10. golang.org/x/time/rate. https://pkg.go.dev/golang.org/x/time/rate
  11. Uber Engineering Blog — Handling Failure in Streaming Pipelines. https://eng.uber.com/
  12. Microsoft — Retry pattern. https://learn.microsoft.com/en-us/azure/architecture/patterns/retry
  13. AWS — Dead Letter Queues (SQS). https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html
  14. Lyft Envoy — Backpressure & Circuit Breakers. https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/circuit_breaking
  15. «Reactive Streams Specification». https://www.reactive-streams.org/
  16. Netflix — Adaptive Concurrency Limits. https://netflixtechblog.com/performance-under-load-3e6fa9a60581
  17. franz-go pause/resume. https://pkg.go.dev/github.com/twmb/franz-go/pkg/kgo