Перейти к содержимому

RabbitMQ в production

Зачем знать. RabbitMQ — зрелый AMQP-брокер с гибкой маршрутизацией, идеально для task-очередей, fan-out нотификаций и сложного routing. Middle 2 Go-инженер обязан понимать AMQP 0.9.1 (exchanges, queues, bindings), различать quorum vs classic queues, настроить DLX, prefetch, TTL, priority queues, корректно использовать amqp091-go, отличать сценарии где RabbitMQ выигрывает у Kafka/NATS.

  1. Концепция: AMQP 0.9.1, объекты, routing
  2. Production-практики
  3. Gotchas
  4. Real cases
  5. 25 вопросов
  6. Practice
  7. Источники

RabbitMQ реализует AMQP 0.9.1 — open-протокол для асинхронного обмена сообщениями. Ключевые сущности:

┌──────────┐ publish ┌─────────────┐ routing ┌────────┐ deliver ┌──────────┐
│ Producer │ ─────────────▶│ Exchange │ ──────────▶│ Queue │──────────▶│ Consumer │
└──────────┘ └─────────────┘ └────────┘ └──────────┘
▲ ▲
│ │
binding сообщения хранятся
(routing key / pattern) в очереди
  • Producer публикует в Exchange (никогда напрямую в queue).
  • Exchange через Binding маршрутизирует в Queue на основе routing key / headers.
  • Consumer подписывается на Queue.
┌─ direct ─ routing key точно равен binding key
├─ fanout ─ broadcast: всем bound queues
├─ topic ─ pattern matching: "orders.*.created", "logs.#"
└─ headers ─ matching по headers (не по routing key)

direct:

publish key="payment.ok" ──▶ Exchange (direct)
┌───────────────┼───────────────┐
▼ ▼ ▼
Q("payment.ok") Q("payment.fail") Q("audit", key="payment.ok")
✓ ✓

fanout — игнорирует routing key, в каждую bound queue.

topic — patterns:

  • * = ровно одно слово (segment).
  • # = ноль или больше слов.
  • orders.*.created matches orders.us.created, не matches orders.us.eu.created.
  • logs.# matches любые logs.*.

headers — binding со словарём; matches x-match=all (все хедеры) или any (любой).

Default exchange — пустая строка "", routing key = queue name. Используется для «прямой» отправки в queue.

  • durable — переживает рестарт broker (но сообщения должны быть persistent).
  • exclusive — только одно соединение, удаляется при разрыве.
  • auto-delete — удалить когда нет consumer.
  • delivery_mode=2 (persistent) — broker сохраняет на диск.
  • delivery_mode=1 (transient) — только память.

Для durability нужно: durable queue + persistent message + publisher confirms + (для critical) replicated queue (quorum).

  • ack — обработано, удалить из queue.
  • nack — ошибка, requeue=true вернуть в queue или requeue=false уйдёт в DLX (если настроен).
  • reject — то же что nack для одного сообщения.

Prefetch (QoS): channel.Qos(prefetchCount, prefetchSize, global) — сколько unacked сообщений может быть у consumer. Без prefetch RabbitMQ пушит всё подряд → OOM.

Dead Letter Exchange (DLX) — exchange куда уходят сообщения когда:

  • consumer reject/nack с requeue=false;
  • сообщение протухло (TTL);
  • очередь переполнилась (x-max-length).
Q("orders") [x-dead-letter-exchange="dlx"]
│ reject / TTL / overflow
Exchange "dlx" ──▶ Q("orders.dlq")
  • Per-queue TTL (x-message-ttl) — все сообщения в queue.
  • Per-message TTL — у конкретного сообщения (expiration property).
  • Queue TTL (x-expires) — удалить пустую queue по таймауту.
  • Classic queue — простая, локальна для одного узла.
  • Mirrored classic queue (deprecated в 3.10+, удаляется в 4.0) — реплика на других узлах через master-mirror, известны проблемы с split-brain.
  • Quorum queue — реплика через Raft consensus (с RabbitMQ 3.8+), HA по умолчанию, безопаснее, медленнее на писа́ние, рекомендация прода.
Quorum queue replication (Raft, 3 узла):
┌────────┐ ┌────────┐ ┌────────┐
│ Node 1 │ │ Node 2 │ │ Node 3 │
│(leader)│◀─▶│ follow │◀─▶│ follow │
└────────┘ └────────┘ └────────┘
write подтверждается когда (N/2)+1 узлов записали.

Append-only log в стиле Kafka, но проще ops; поддерживает многократное чтение одного сообщения, replication. Используется для:

  • log/event streaming;
  • replay (re-read sequence range);
  • broadcast многим consumer без копирования.

Отличается от queue: сообщения не удаляются по ack, а живут до retention.

  • Federation — exchange/queue ссылается на upstream-брокер, broker автоматически тянет сообщения. Для слабосвязанных кластеров.
  • Shovel — простой forwarder: «бери из X, клади в Y» (между брокерами / между exchange).
  • rabbitmq_management — UI на :15672.
  • rabbitmq_mqtt, rabbitmq_stomp, rabbitmq_web_mqtt — другие протоколы.
  • rabbitmq_shovel, rabbitmq_federation — replication.
  • rabbitmq_prometheus — Prometheus endpoint.
  • rabbitmq_consistent_hash_exchange — exchange c шардингом по hash.

2025+ рекомендация — github.com/rabbitmq/amqp091-go (форк streadway/amqp, поддерживается RabbitMQ team). streadway/amqp deprecated.

import amqp "github.com/rabbitmq/amqp091-go"
conn, err := amqp.DialConfig("amqp://user:pass@rabbit:5672/vhost",
amqp.Config{
Heartbeat: 10 * time.Second,
Locale: "en_US",
})
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()

⚠️ Channel — не goroutine-safe. Используй отдельный channel на goroutine.

// включаем publisher confirms
if err := ch.Confirm(false); err != nil {
return err
}
confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
err := ch.PublishWithContext(ctx,
"orders", // exchange
"orders.created", // routing key
true, // mandatory (вернёт если нет binding)
false, // immediate (deprecated)
amqp.Publishing{
ContentType: "application/json",
DeliveryMode: amqp.Persistent,
MessageId: msgID,
Timestamp: time.Now(),
Headers: amqp.Table{"trace-id": traceID},
Body: body,
})
confirm := <-confirms
if !confirm.Ack {
return errors.New("publish nack")
}

⚠️ mandatory=true + нет binding → возвращается через NotifyReturn; обрабатывай.

ch.Qos(50, 0, false) // prefetch 50
msgs, err := ch.Consume(
"orders.created", // queue
"consumer-1", // consumer tag
false, // auto-ack=false → manual
false, // exclusive
false, // no-local
false, // no-wait
nil,
)
for d := range msgs {
if err := process(d.Body); err != nil {
// nack с requeue=false → уйдёт в DLX
d.Nack(false, false)
continue
}
d.Ack(false)
}
ch.ExchangeDeclare("orders", "direct", true, false, false, false, nil)
ch.ExchangeDeclare("orders.dlx", "fanout", true, false, false, false, nil)
ch.QueueDeclare("orders.dlq", true, false, false, false, nil)
ch.QueueBind("orders.dlq", "", "orders.dlx", false, nil)
_, err := ch.QueueDeclare(
"orders.created",
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
amqp.Table{
"x-queue-type": "quorum", // QUORUM!
"x-dead-letter-exchange": "orders.dlx",
"x-delivery-limit": 5, // max redelivery
"x-quorum-initial-group-size": 3,
},
)
ch.QueueBind("orders.created", "orders.created", "orders", false, nil)

x-delivery-limit (только quorum) — гарантированный счётчик redeliveries; после превышения сообщение уходит в DLX. Решает проблему «poison message infinite loop».

ch.QueueDeclare("tasks", true, false, false, false, amqp.Table{
"x-max-priority": 10, // priorities 0..10
})
ch.PublishWithContext(ctx, "", "tasks", false, false, amqp.Publishing{
Priority: 9,
Body: body,
})

⚠️ Priority queue реализуется внутри как несколько подочередей; не работает с quorum queues (только classic). Высокий cost при большом числе уровней.

  • Lazy queue (x-queue-mode=lazy) — сообщения сразу на диск, минимальная RAM. Для очень больших backlog. Только classic queue.
  • vm_memory_high_watermark (default 0.4) — порог RAM, выше которого RabbitMQ блокирует producers (flow control). Настраивается через rabbitmq.conf:
vm_memory_high_watermark.relative = 0.5
disk_free_limit.relative = 1.5
// stream-клиент: github.com/rabbitmq/rabbitmq-stream-go-client
env, _ := stream.NewEnvironment(...)
producer, _ := env.NewProducer("orders-stream", nil)
producer.Send(amqp.NewMessage([]byte("event")))

Streams — append-only log, retention по размеру/времени; читаются с любого offset, как Kafka.

rabbit-up.conf:

rabbitmq-federation
upstream-set 'all_dc1'
upstream 'amqp://user:pass@dc1.broker:5672'
federation-upstream-set queue 'orders'

Полезно для DR-копии или для cross-region «вытягивания» событий.

  • Минимум 3 узла для quorum queues.
  • Кластеризация через erlang.cookie совпадение + rabbitmqctl cluster_status.
  • В Kubernetes — RabbitMQ Cluster Operator (CNCF, поддерживается).
  • Quorum-queue выживает падение N/2 узлов.
Cluster (3 узла, quorum queues)
┌────┐ ┌────┐ ┌────┐
│ R1 │──│ R2 │──│ R3 │
└────┘ └────┘ └────┘
падение R1 → R2 и R3 продолжают работать
rabbitmqctl list_queues name messages consumers
rabbitmqctl list_consumers
rabbitmqctl set_policy ha-quorum "^orders\\." '{"queue-type":"quorum"}' --apply-to queues
rabbitmqadmin publish exchange=orders routing_key=orders.created payload='...'
  • Prometheus endpoint (rabbitmq_prometheus): :15692/metrics.
  • Ключевые метрики: rabbitmq_queue_messages_ready, rabbitmq_queue_messages_unacknowledged, rabbitmq_connections_total, memory watermark, disk free.
  • Алерт: rising unacked, queue size > threshold, memory_alarm.
АспектRabbitMQKafkaNATS JetStream
ПарадигмаQueue + ExchangeLogSubject + Stream
Routingочень гибкоепо partition (по key)по subject pattern
Throughput~50k-200k msg/sмиллионысотни тысяч
Latency~ms~ms-10mssub-ms (Core)
Persistenceдадада
Ops complexityсредняявысокаянизкая
Use casetask queues, fan-outstreams, EoS pipelinesRPC, real-time, edge

⚠️ Channel не thread-safe. Один channel — одна goroutine. Создавай по channel’у на consumer/publisher.

⚠️ Connection drops тихие. Используй conn.NotifyClose(...) + reconnect-логику. Готовых reconnect-обёрток мало (есть wabbit, можно писать самим).

⚠️ Publisher confirms забывают включить. Без ch.Confirm(false) ты не знаешь дошло ли сообщение. Не путать с consumer ack.

⚠️ Mandatory + нет binding = «тихая потеря». Если mandatory=false и binding нет — сообщение выкидывается. Включи NotifyReturn чтобы отлавливать.

⚠️ delivery_mode=1 (transient) на durable queue не делает сообщение durable. Нужно delivery_mode=2.

⚠️ Mirrored classic queues deprecated. Используй quorum queues для HA.

⚠️ Quorum queue не поддерживает все фичи classic (priority, exclusive, etc). Проверяй совместимость.

⚠️ Prefetch = 0 без сонной обработки = consumer получает все сообщения сразу → OOM. Всегда ставь разумный prefetch (10-200).

⚠️ requeue=true в nack = сообщение сразу возвращается в head queue → infinite loop с poison message. Лучше requeue=false + DLX или x-delivery-limit.

⚠️ x-message-ttl на queue ≠ deletion time queue. Удаляет сообщения. Для удаления queue — x-expires.

⚠️ Streams ≠ queue. Не поддерживают traditional ack model; используется offset-based чтение.

⚠️ DLX exchange тоже надо declare. Сообщения в несуществующий DLX → теряются.

⚠️ Memory high-watermark блокирует publish. Если broker ушёл в alarm — все producers блокируются. Мониторь, scale-up RAM или используй lazy queues.

⚠️ Federation/Shovel не транзакционные. Сообщение может быть доставлено дважды при failure.

⚠️ HiPE (Erlang JIT) убран в современных версиях. Старые туториалы могут советовать включать — не нужно.

⚠️ rabbitmqadmin требует Python 3.9+ (или Erlang-based). В Alpine может не работать без донастройки.


Классический use case — task queue для отложенных задач: send email, generate report, video transcode.

API ──publish task──▶ "tasks" exchange ──▶ Q("email"), Q("report"), Q("video")
│ │ │
worker-pool worker-pool worker-pool

Дает retry (через nack+DLX), priorities, fairness между tenants.

"event.user.registered" (fanout exchange)
├──▶ Q("email-welcome") ──▶ email-worker
├──▶ Q("analytics") ──▶ analytics-consumer
├──▶ Q("crm-sync") ──▶ crm-worker
└──▶ Q("audit-log") ──▶ audit-worker

Одно событие → много действий, независимых, retry-друг-от-друга.

RabbitMQ имеет встроенный RPC через reply_to property; используется для синхронной коммуникации поверх асинхронного транспорта.

  • Reddit — task queue.
  • Mozilla — internal services.
  • Robinhood — trading event flow.
  • Slack — notification pipelines.

1. Что такое AMQP 0.9.1? Open-протокол с моделью exchange + queue + binding; RabbitMQ — его референсная реализация.

2. Какие типы exchange бывают? direct, fanout, topic, headers (+ default = пустая строка).

3. Как работает topic exchange? Routing по pattern на routing key; * = одно слово, # = ноль или больше слов.

4. Что такое binding? Связь exchange → queue с условием маршрутизации (routing key или header-pattern).

5. Чем отличается persistent от transient message? Persistent (delivery_mode=2) пишется на диск, переживает рестарт; transient — только в памяти.

6. Что такое publisher confirms? Async-механизм подтверждения publish от broker; producer узнаёт ack/nack по сообщению.

7. Что такое consumer ack/nack/reject? ack — обработано; nack/reject — ошибка, с requeue=true → обратно в queue, с requeue=false → DLX (если настроен).

8. Что такое prefetch (QoS)? Ограничение числа unacked сообщений у consumer; без prefetch broker пушит всё → OOM.

9. Что такое DLX (Dead Letter Exchange)? Exchange куда RabbitMQ автоматически отправляет «мёртвые» сообщения (rejected, expired, переполнение queue).

10. Какие способы задать TTL? Per-queue (x-message-ttl), per-message (expiration), queue TTL (x-expires).

11. Что такое quorum queue? HA-очередь на Raft-консенсусе (с 3.8+); рекомендация для prod, заменяет mirrored queues.

12. Чем quorum отличается от mirrored classic? Mirrored — master+mirrors с собственной логикой (deprecated, проблемы с split-brain). Quorum — Raft, безопаснее, медленнее на запись.

13. Что такое stream в RabbitMQ? Append-only log (с 3.9+), Kafka-like; ack-based не используется, читается по offset.

14. Когда выбрать stream vs queue? Stream — replay, multi-consumer read одного и того же сообщения, длинный retention. Queue — task processing с ack/nack.

15. Что такое lazy queue? Classic queue с x-queue-mode=lazy: сразу пишет в диск, минимум RAM. Для очень больших backlog.

16. Что такое vm_memory_high_watermark? Порог RAM (default 0.4 от системы), выше которого broker блокирует producers (flow control / memory alarm).

17. Зачем federation и shovel? Inter-broker репликация: shovel — простой forward, federation — exchange/queue видит upstream-broker.

18. Какая Go-библиотека рекомендуется? github.com/rabbitmq/amqp091-go (форк streadway/amqp, поддерживается RabbitMQ team).

19. Channel в Go-клиенте thread-safe? Нет. Используй один channel на одну goroutine.

20. Что делает mandatory=true при publish? Если ни одна queue не bound на routing key — broker вернёт сообщение через NotifyReturn, не «потеряет».

21. Как защититься от poison message? x-delivery-limit (quorum queue) + DLX, отдельный DLQ-consumer для расследования.

22. Priority queue: ограничения? Только classic queue (не quorum), число приоритетов малое (10-20), внутри = несколько подочередей.

23. Когда выбрать RabbitMQ, а когда Kafka? RabbitMQ — гибкое routing, task-queue, fan-out, request-reply. Kafka — high-throughput log, replay, stream processing.

24. Сколько узлов для HA? 3 для quorum queues (Raft quorum); 2 минимально, но без HA при падении 1.

25. Как мониторить RabbitMQ? rabbitmq_prometheus plugin → Prometheus; Management UI; ключевые метрики — depth, unacked, memory alarm.


  1. Topic routing. Создай events topic exchange + 3 queue: events.us.*, events.eu.*, events.#. Publish 10 сообщений с разными routing keys, проверь куда какие попали.

  2. DLX с x-delivery-limit. Quorum queue orders с DLX orders.dlx; Go-consumer reject каждое 3-е сообщение. После 5 попыток — должно уехать в DLQ.

  3. Publisher confirms. Реализуй надёжный publisher: publish + ждём confirm; в случае nack — retry с backoff.

  4. Priority queue. Очередь tasks с x-max-priority=10; запусти 10 producer’ов с разными priority, проверь порядок обработки.

  5. RPC pattern. Реализуй RPC server (consumes на rpc.calc, отвечает в reply_to); RPC client с correlation_id. Замерь latency vs gRPC.

  6. Cluster + quorum queue. docker-compose с 3 RabbitMQ-узлами; создай quorum queue, отправь сообщения, останови leader-node, проверь продолжение работы.

  7. Streams. Создай stream events, опубликуй 10k сообщений; consumer A читает с начала, consumer B — с offset=5000; проверь replay.

  8. Federation. Подними 2 broker’а, настрой federation queue из dc1→dc2; publish в dc1, прими в dc2.

  9. Memory alarm. Снизь vm_memory_high_watermark до 0.05, наполни queue до alarm, посмотри что publish блокируется; recover.

  10. Prometheus + Grafana. Включи rabbitmq_prometheus, дашборд: queue depth, message rate, unacked, memory.


Exchange "events" (topic)
binding: "events.user.*" │ binding: "events.payment.#"
┌────────────────────────────┼────────────────────────────┐
▼ ▼ ▼
Q("user-events") Q("all-payments") Q("audit", "events.#")
│ │ │
▼ ▼ ▼
user-svc consumer billing-svc consumer audit-svc consumer
publish key="events.user.registered" → user-events ✓, audit ✓
publish key="events.payment.captured" → all-payments ✓, audit ✓
publish key="events.order.created" → audit ✓ (через #)
type Connector struct {
url string
mu sync.RWMutex
conn *amqp.Connection
closed atomic.Bool
}
func (c *Connector) Connect(ctx context.Context) error {
conn, err := amqp.Dial(c.url)
if err != nil { return err }
c.mu.Lock(); c.conn = conn; c.mu.Unlock()
go c.watch(ctx)
return nil
}
func (c *Connector) watch(ctx context.Context) {
errCh := c.conn.NotifyClose(make(chan *amqp.Error, 1))
select {
case <-ctx.Done():
c.conn.Close()
case err := <-errCh:
if c.closed.Load() { return }
log.Warnw("amqp connection lost", "err", err)
// backoff и reconnect
backoff := time.Second
for {
if c.closed.Load() { return }
time.Sleep(backoff)
if err := c.Connect(ctx); err == nil {
log.Info("amqp reconnected")
return
}
backoff = min(backoff*2, 30*time.Second)
}
}
}
func (c *Connector) Channel() (*amqp.Channel, error) {
c.mu.RLock(); defer c.mu.RUnlock()
if c.conn == nil { return nil, errors.New("not connected") }
return c.conn.Channel()
}

В production обычно делают per-channel-recovery + per-consumer-recovery — пакеты типа wagslane/go-rabbitmq оборачивают это.

В amqp091-go топология декларируется кодом при запуске. Антипаттерн: пересоздавать каждый раз. Лучше:

func DeclareTopology(ch *amqp.Channel) error {
if err := ch.ExchangeDeclare("orders", "topic", true, false, false, false, nil); err != nil {
return err
}
if _, err := ch.QueueDeclare("orders.created", true, false, false, false, amqp.Table{
"x-queue-type": "quorum",
"x-dead-letter-exchange": "orders.dlx",
"x-delivery-limit": 5,
}); err != nil {
return err
}
return ch.QueueBind("orders.created", "orders.created.#", "orders", false, nil)
}

Альтернатива — Definitions API: экспортировать JSON топологию через RabbitMQ Management plugin и импортировать декларативно при деплое.

func RunConsumer(ctx context.Context, conn *amqp.Connection, queue string, h Handler) error {
ch, err := conn.Channel()
if err != nil { return err }
defer ch.Close()
ch.Qos(50, 0, false)
msgs, err := ch.Consume(queue, "", false, false, false, false, nil)
if err != nil { return err }
sem := make(chan struct{}, 16) // ограничиваем параллелизм
for d := range msgs {
sem <- struct{}{}
go func(d amqp.Delivery) {
defer func() { <-sem }()
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err := h(ctx, d); err != nil {
d.Nack(false, false) // в DLX
return
}
d.Ack(false)
}(d)
}
return nil
}

⚠️ В цикле range msgs нельзя долго блокироваться внутри одной горутины (channel в Go блокирует) — выноси обработку в goroutine + ограничивай parallelism через семафор.

Сообщения могут быть delivered more than once (RabbitMQ at-least-once). Consumer должен быть идемпотентным:

func (h *Handler) Handle(ctx context.Context, d amqp.Delivery) error {
var ev Event
if err := json.Unmarshal(d.Body, &ev); err != nil {
return err
}
// dedup table
res, err := h.db.Exec(ctx,
`INSERT INTO processed_events (event_id, processed_at) VALUES ($1, NOW())
ON CONFLICT DO NOTHING`, ev.ID)
if err != nil { return err }
if rows, _ := res.RowsAffected(); rows == 0 {
return nil // дубль, уже обработан
}
return h.processBusiness(ctx, ev)
}

Или используй MessageId как ключ дедупликации.

СценарийRabbitMQ QuorumKafkaNATS JetStream
1KB, 1 producer~30k msg/s500k+ msg/s200k msg/s
1KB, fan-out 1→10хорошо (Exchange)плохо (consumers = partitions limit)хорошо
Replication overheadсредний (Raft)низкий (ISR)средний (Raft)

RabbitMQ выигрывает в сценариях с сложной маршрутизацией и request-reply; проигрывает в чистом throughput.

Streams (RabbitMQ 3.9+) — это Kafka-like log. Когда брать RabbitMQ Streams вместо отдельной Kafka:

  • Команда уже эксплуатирует RabbitMQ, не хочет вторую систему.
  • Нужны и queue, и stream.
  • Объёмы средние (не петабайты).

Когда лучше Kafka:

  • Огромный throughput.
  • Нужен ecosystem (Connect, Streams, ksqlDB, Schema Registry).
  • EoS pipelines.

Официальный K8s-оператор:

apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
name: prod
spec:
replicas: 3
persistence:
storage: 100Gi
storageClassName: ssd
resources:
requests:
cpu: 2
memory: 4Gi
rabbitmq:
additionalConfig: |
vm_memory_high_watermark.relative = 0.5
cluster_partition_handling = pause_minority
additionalPlugins:
- rabbitmq_prometheus
- rabbitmq_management

Управляет lifecycle, persistent volumes, инжектит cookie, делает rolling update.

  • 3-node cluster, quorum queues для критичных.
  • Publisher confirms на критичных publish.
  • Manual ack + prefetch на consumers.
  • DLX + x-delivery-limit (quorum) для всех очередей.
  • Mandatory + NotifyReturn для важных publish.
  • Идемпотентность consumer (dedup table или idempotent SQL).
  • Memory high-watermark, disk free limit настроены.
  • Prometheus + Grafana дашборд (queue depth, unacked, alarm).
  • Alerts: memory_alarm, disk_alarm, queue too large, consumer disconnected.
  • TLS на amqps:// и management.
  • Auth через RabbitMQ users + permissions per vhost.
  • Реплицируемые definitions (через RabbitMQ Definitions API).
  • Резервная копия Mnesia + definitions.

Случай 1: «Очередь росла, потом сервис упал». → memory high-watermark был на 0.4, лимит RAM = 4GB → при backlog 2GB всё в alarm → producers blocked. → фикс: lazy queue + scale RAM до 16GB + alert при queue > 100k.

Случай 2: «Сообщения иногда теряются». → producer не использовал confirms; на mandatory false, binding забыли. → фикс: добавили publisher confirms + mandatory + NotifyReturn handler.

Случай 3: «Consumer завис, queue растёт». → prefetch=0 + один consumer + долгая обработка → channel «забит» unacked. → фикс: prefetch=50 + scale consumers + timeout per-message.


  1. RabbitMQ Documentation — официально.
  2. RabbitMQ in Action — Videla, Williams.
  3. RabbitMQ in Depth — Gavin Roy.
  4. rabbitmq/amqp091-go — Go-клиент.
  5. Quorum queues guide — глубокий разбор.
  6. Streams overview — RabbitMQ streams.
  7. RabbitMQ Cluster Operator (K8s) — оператор для Kubernetes.
  8. Prometheus monitoring — Prometheus + Grafana.
  9. «Mirrored queues are deprecated» — guide к миграции на quorum.
  10. Comparing message brokers (CloudAMQP) — обзоры с реальными метриками.