RabbitMQ в production
Зачем знать. RabbitMQ — зрелый AMQP-брокер с гибкой маршрутизацией, идеально для task-очередей, fan-out нотификаций и сложного routing. Middle 2 Go-инженер обязан понимать AMQP 0.9.1 (exchanges, queues, bindings), различать quorum vs classic queues, настроить DLX, prefetch, TTL, priority queues, корректно использовать
amqp091-go, отличать сценарии где RabbitMQ выигрывает у Kafka/NATS.
Содержание
Заголовок раздела «Содержание»- Концепция: AMQP 0.9.1, объекты, routing
- Production-практики
- Gotchas
- Real cases
- 25 вопросов
- Practice
- Источники
1. Концепция (кратко)
Заголовок раздела «1. Концепция (кратко)»1.1 AMQP 0.9.1 — основа
Заголовок раздела «1.1 AMQP 0.9.1 — основа»RabbitMQ реализует AMQP 0.9.1 — open-протокол для асинхронного обмена сообщениями. Ключевые сущности:
┌──────────┐ publish ┌─────────────┐ routing ┌────────┐ deliver ┌──────────┐ │ Producer │ ─────────────▶│ Exchange │ ──────────▶│ Queue │──────────▶│ Consumer │ └──────────┘ └─────────────┘ └────────┘ └──────────┘ ▲ ▲ │ │ binding сообщения хранятся (routing key / pattern) в очереди- Producer публикует в Exchange (никогда напрямую в queue).
- Exchange через Binding маршрутизирует в Queue на основе routing key / headers.
- Consumer подписывается на Queue.
1.2 Типы Exchange
Заголовок раздела «1.2 Типы Exchange» ┌─ direct ─ routing key точно равен binding key ├─ fanout ─ broadcast: всем bound queues ├─ topic ─ pattern matching: "orders.*.created", "logs.#" └─ headers ─ matching по headers (не по routing key)direct:
publish key="payment.ok" ──▶ Exchange (direct) │ ┌───────────────┼───────────────┐ ▼ ▼ ▼ Q("payment.ok") Q("payment.fail") Q("audit", key="payment.ok") ✓ ✓fanout — игнорирует routing key, в каждую bound queue.
topic — patterns:
*= ровно одно слово (segment).#= ноль или больше слов.orders.*.createdmatchesorders.us.created, не matchesorders.us.eu.created.logs.#matches любые logs.*.
headers — binding со словарём; matches x-match=all (все хедеры) или any (любой).
Default exchange — пустая строка "", routing key = queue name. Используется для «прямой» отправки в queue.
1.3 Queue properties
Заголовок раздела «1.3 Queue properties»- durable — переживает рестарт broker (но сообщения должны быть persistent).
- exclusive — только одно соединение, удаляется при разрыве.
- auto-delete — удалить когда нет consumer.
1.4 Persistent vs transient messages
Заголовок раздела «1.4 Persistent vs transient messages»delivery_mode=2(persistent) — broker сохраняет на диск.delivery_mode=1(transient) — только память.
Для durability нужно: durable queue + persistent message + publisher confirms + (для critical) replicated queue (quorum).
1.5 Acknowledgements
Заголовок раздела «1.5 Acknowledgements»- ack — обработано, удалить из queue.
- nack — ошибка,
requeue=trueвернуть в queue илиrequeue=falseуйдёт в DLX (если настроен). - reject — то же что nack для одного сообщения.
Prefetch (QoS): channel.Qos(prefetchCount, prefetchSize, global) — сколько unacked сообщений может быть у consumer. Без prefetch RabbitMQ пушит всё подряд → OOM.
1.6 DLX и DLQ
Заголовок раздела «1.6 DLX и DLQ»Dead Letter Exchange (DLX) — exchange куда уходят сообщения когда:
- consumer reject/nack с
requeue=false; - сообщение протухло (TTL);
- очередь переполнилась (
x-max-length).
Q("orders") [x-dead-letter-exchange="dlx"] │ reject / TTL / overflow ▼ Exchange "dlx" ──▶ Q("orders.dlq")1.7 TTL
Заголовок раздела «1.7 TTL»- Per-queue TTL (
x-message-ttl) — все сообщения в queue. - Per-message TTL — у конкретного сообщения (
expirationproperty). - Queue TTL (
x-expires) — удалить пустую queue по таймауту.
1.8 Quorum queues vs Classic / Mirrored
Заголовок раздела «1.8 Quorum queues vs Classic / Mirrored»- Classic queue — простая, локальна для одного узла.
- Mirrored classic queue (deprecated в 3.10+, удаляется в 4.0) — реплика на других узлах через master-mirror, известны проблемы с split-brain.
- Quorum queue — реплика через Raft consensus (с RabbitMQ 3.8+), HA по умолчанию, безопаснее, медленнее на писа́ние, рекомендация прода.
Quorum queue replication (Raft, 3 узла): ┌────────┐ ┌────────┐ ┌────────┐ │ Node 1 │ │ Node 2 │ │ Node 3 │ │(leader)│◀─▶│ follow │◀─▶│ follow │ └────────┘ └────────┘ └────────┘ write подтверждается когда (N/2)+1 узлов записали.1.9 Streams (RabbitMQ 3.9+)
Заголовок раздела «1.9 Streams (RabbitMQ 3.9+)»Append-only log в стиле Kafka, но проще ops; поддерживает многократное чтение одного сообщения, replication. Используется для:
- log/event streaming;
- replay (re-read sequence range);
- broadcast многим consumer без копирования.
Отличается от queue: сообщения не удаляются по ack, а живут до retention.
1.10 Federation и Shovel
Заголовок раздела «1.10 Federation и Shovel»- Federation — exchange/queue ссылается на upstream-брокер, broker автоматически тянет сообщения. Для слабосвязанных кластеров.
- Shovel — простой forwarder: «бери из X, клади в Y» (между брокерами / между exchange).
1.11 Plugins
Заголовок раздела «1.11 Plugins»rabbitmq_management— UI на:15672.rabbitmq_mqtt,rabbitmq_stomp,rabbitmq_web_mqtt— другие протоколы.rabbitmq_shovel,rabbitmq_federation— replication.rabbitmq_prometheus— Prometheus endpoint.rabbitmq_consistent_hash_exchange— exchange c шардингом по hash.
2. Production-практики
Заголовок раздела «2. Production-практики»2.1 Go-клиент: rabbitmq/amqp091-go
Заголовок раздела «2.1 Go-клиент: rabbitmq/amqp091-go»2025+ рекомендация —
github.com/rabbitmq/amqp091-go(форкstreadway/amqp, поддерживается RabbitMQ team).streadway/amqpdeprecated.
import amqp "github.com/rabbitmq/amqp091-go"
conn, err := amqp.DialConfig("amqp://user:pass@rabbit:5672/vhost", amqp.Config{ Heartbeat: 10 * time.Second, Locale: "en_US", })defer conn.Close()
ch, _ := conn.Channel()defer ch.Close()⚠️ Channel — не goroutine-safe. Используй отдельный channel на goroutine.
2.2 Publish c confirms
Заголовок раздела «2.2 Publish c confirms»// включаем publisher confirmsif err := ch.Confirm(false); err != nil { return err}confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 1))
err := ch.PublishWithContext(ctx, "orders", // exchange "orders.created", // routing key true, // mandatory (вернёт если нет binding) false, // immediate (deprecated) amqp.Publishing{ ContentType: "application/json", DeliveryMode: amqp.Persistent, MessageId: msgID, Timestamp: time.Now(), Headers: amqp.Table{"trace-id": traceID}, Body: body, })
confirm := <-confirmsif !confirm.Ack { return errors.New("publish nack")}⚠️ mandatory=true + нет binding → возвращается через NotifyReturn; обрабатывай.
2.3 Consume
Заголовок раздела «2.3 Consume»ch.Qos(50, 0, false) // prefetch 50
msgs, err := ch.Consume( "orders.created", // queue "consumer-1", // consumer tag false, // auto-ack=false → manual false, // exclusive false, // no-local false, // no-wait nil,)
for d := range msgs { if err := process(d.Body); err != nil { // nack с requeue=false → уйдёт в DLX d.Nack(false, false) continue } d.Ack(false)}2.4 Declare для quorum queue + DLX
Заголовок раздела «2.4 Declare для quorum queue + DLX»ch.ExchangeDeclare("orders", "direct", true, false, false, false, nil)ch.ExchangeDeclare("orders.dlx", "fanout", true, false, false, false, nil)
ch.QueueDeclare("orders.dlq", true, false, false, false, nil)ch.QueueBind("orders.dlq", "", "orders.dlx", false, nil)
_, err := ch.QueueDeclare( "orders.created", true, // durable false, // auto-delete false, // exclusive false, // no-wait amqp.Table{ "x-queue-type": "quorum", // QUORUM! "x-dead-letter-exchange": "orders.dlx", "x-delivery-limit": 5, // max redelivery "x-quorum-initial-group-size": 3, },)ch.QueueBind("orders.created", "orders.created", "orders", false, nil)x-delivery-limit (только quorum) — гарантированный счётчик redeliveries; после превышения сообщение уходит в DLX. Решает проблему «poison message infinite loop».
2.5 Priority queue
Заголовок раздела «2.5 Priority queue»ch.QueueDeclare("tasks", true, false, false, false, amqp.Table{ "x-max-priority": 10, // priorities 0..10})
ch.PublishWithContext(ctx, "", "tasks", false, false, amqp.Publishing{ Priority: 9, Body: body,})⚠️ Priority queue реализуется внутри как несколько подочередей; не работает с quorum queues (только classic). Высокий cost при большом числе уровней.
2.6 Lazy queues и Memory high-watermark
Заголовок раздела «2.6 Lazy queues и Memory high-watermark»- Lazy queue (
x-queue-mode=lazy) — сообщения сразу на диск, минимальная RAM. Для очень больших backlog. Только classic queue. - vm_memory_high_watermark (default 0.4) — порог RAM, выше которого RabbitMQ блокирует producers (flow control). Настраивается через
rabbitmq.conf:
vm_memory_high_watermark.relative = 0.5disk_free_limit.relative = 1.52.7 Streams
Заголовок раздела «2.7 Streams»// stream-клиент: github.com/rabbitmq/rabbitmq-stream-go-clientenv, _ := stream.NewEnvironment(...)producer, _ := env.NewProducer("orders-stream", nil)producer.Send(amqp.NewMessage([]byte("event")))Streams — append-only log, retention по размеру/времени; читаются с любого offset, как Kafka.
2.8 Federation: один из примеров
Заголовок раздела «2.8 Federation: один из примеров»rabbit-up.conf:
rabbitmq-federation upstream-set 'all_dc1' upstream 'amqp://user:pass@dc1.broker:5672' federation-upstream-set queue 'orders'Полезно для DR-копии или для cross-region «вытягивания» событий.
2.9 Cluster и HA
Заголовок раздела «2.9 Cluster и HA»- Минимум 3 узла для quorum queues.
- Кластеризация через
erlang.cookieсовпадение +rabbitmqctl cluster_status. - В Kubernetes — RabbitMQ Cluster Operator (CNCF, поддерживается).
- Quorum-queue выживает падение N/2 узлов.
Cluster (3 узла, quorum queues) ┌────┐ ┌────┐ ┌────┐ │ R1 │──│ R2 │──│ R3 │ └────┘ └────┘ └────┘ падение R1 → R2 и R3 продолжают работать2.10 Управление через rabbitmqctl и rabbitmqadmin
Заголовок раздела «2.10 Управление через rabbitmqctl и rabbitmqadmin»rabbitmqctl list_queues name messages consumersrabbitmqctl list_consumersrabbitmqctl set_policy ha-quorum "^orders\\." '{"queue-type":"quorum"}' --apply-to queuesrabbitmqadmin publish exchange=orders routing_key=orders.created payload='...'2.11 Monitoring
Заголовок раздела «2.11 Monitoring»- Prometheus endpoint (
rabbitmq_prometheus)::15692/metrics. - Ключевые метрики:
rabbitmq_queue_messages_ready,rabbitmq_queue_messages_unacknowledged,rabbitmq_connections_total, memory watermark, disk free. - Алерт: rising unacked, queue size > threshold, memory_alarm.
2.12 Сравнение
Заголовок раздела «2.12 Сравнение»| Аспект | RabbitMQ | Kafka | NATS JetStream |
|---|---|---|---|
| Парадигма | Queue + Exchange | Log | Subject + Stream |
| Routing | очень гибкое | по partition (по key) | по subject pattern |
| Throughput | ~50k-200k msg/s | миллионы | сотни тысяч |
| Latency | ~ms | ~ms-10ms | sub-ms (Core) |
| Persistence | да | да | да |
| Ops complexity | средняя | высокая | низкая |
| Use case | task queues, fan-out | streams, EoS pipelines | RPC, real-time, edge |
3. Gotchas
Заголовок раздела «3. Gotchas»⚠️ Channel не thread-safe. Один channel — одна goroutine. Создавай по channel’у на consumer/publisher.
⚠️ Connection drops тихие. Используй conn.NotifyClose(...) + reconnect-логику. Готовых reconnect-обёрток мало (есть wabbit, можно писать самим).
⚠️ Publisher confirms забывают включить. Без ch.Confirm(false) ты не знаешь дошло ли сообщение. Не путать с consumer ack.
⚠️ Mandatory + нет binding = «тихая потеря». Если mandatory=false и binding нет — сообщение выкидывается. Включи NotifyReturn чтобы отлавливать.
⚠️ delivery_mode=1 (transient) на durable queue не делает сообщение durable. Нужно delivery_mode=2.
⚠️ Mirrored classic queues deprecated. Используй quorum queues для HA.
⚠️ Quorum queue не поддерживает все фичи classic (priority, exclusive, etc). Проверяй совместимость.
⚠️ Prefetch = 0 без сонной обработки = consumer получает все сообщения сразу → OOM. Всегда ставь разумный prefetch (10-200).
⚠️ requeue=true в nack = сообщение сразу возвращается в head queue → infinite loop с poison message. Лучше requeue=false + DLX или x-delivery-limit.
⚠️ x-message-ttl на queue ≠ deletion time queue. Удаляет сообщения. Для удаления queue — x-expires.
⚠️ Streams ≠ queue. Не поддерживают traditional ack model; используется offset-based чтение.
⚠️ DLX exchange тоже надо declare. Сообщения в несуществующий DLX → теряются.
⚠️ Memory high-watermark блокирует publish. Если broker ушёл в alarm — все producers блокируются. Мониторь, scale-up RAM или используй lazy queues.
⚠️ Federation/Shovel не транзакционные. Сообщение может быть доставлено дважды при failure.
⚠️ HiPE (Erlang JIT) убран в современных версиях. Старые туториалы могут советовать включать — не нужно.
⚠️ rabbitmqadmin требует Python 3.9+ (или Erlang-based). В Alpine может не работать без донастройки.
4. Real cases
Заголовок раздела «4. Real cases»4.1 Celery / Sidekiq replacement в Go
Заголовок раздела «4.1 Celery / Sidekiq replacement в Go»Классический use case — task queue для отложенных задач: send email, generate report, video transcode.
API ──publish task──▶ "tasks" exchange ──▶ Q("email"), Q("report"), Q("video") │ │ │ worker-pool worker-pool worker-poolДает retry (через nack+DLX), priorities, fairness между tenants.
4.2 Notification fan-out
Заголовок раздела «4.2 Notification fan-out» "event.user.registered" (fanout exchange) ├──▶ Q("email-welcome") ──▶ email-worker ├──▶ Q("analytics") ──▶ analytics-consumer ├──▶ Q("crm-sync") ──▶ crm-worker └──▶ Q("audit-log") ──▶ audit-workerОдно событие → много действий, независимых, retry-друг-от-друга.
4.3 Request-Reply (RPC)
Заголовок раздела «4.3 Request-Reply (RPC)»RabbitMQ имеет встроенный RPC через reply_to property; используется для синхронной коммуникации поверх асинхронного транспорта.
4.4 Реальные пользователи
Заголовок раздела «4.4 Реальные пользователи»- Reddit — task queue.
- Mozilla — internal services.
- Robinhood — trading event flow.
- Slack — notification pipelines.
5. Вопросы
Заголовок раздела «5. Вопросы»1. Что такое AMQP 0.9.1? Open-протокол с моделью exchange + queue + binding; RabbitMQ — его референсная реализация.
2. Какие типы exchange бывают? direct, fanout, topic, headers (+ default = пустая строка).
3. Как работает topic exchange?
Routing по pattern на routing key; * = одно слово, # = ноль или больше слов.
4. Что такое binding? Связь exchange → queue с условием маршрутизации (routing key или header-pattern).
5. Чем отличается persistent от transient message?
Persistent (delivery_mode=2) пишется на диск, переживает рестарт; transient — только в памяти.
6. Что такое publisher confirms? Async-механизм подтверждения publish от broker; producer узнаёт ack/nack по сообщению.
7. Что такое consumer ack/nack/reject?
ack — обработано; nack/reject — ошибка, с requeue=true → обратно в queue, с requeue=false → DLX (если настроен).
8. Что такое prefetch (QoS)? Ограничение числа unacked сообщений у consumer; без prefetch broker пушит всё → OOM.
9. Что такое DLX (Dead Letter Exchange)? Exchange куда RabbitMQ автоматически отправляет «мёртвые» сообщения (rejected, expired, переполнение queue).
10. Какие способы задать TTL?
Per-queue (x-message-ttl), per-message (expiration), queue TTL (x-expires).
11. Что такое quorum queue? HA-очередь на Raft-консенсусе (с 3.8+); рекомендация для prod, заменяет mirrored queues.
12. Чем quorum отличается от mirrored classic? Mirrored — master+mirrors с собственной логикой (deprecated, проблемы с split-brain). Quorum — Raft, безопаснее, медленнее на запись.
13. Что такое stream в RabbitMQ? Append-only log (с 3.9+), Kafka-like; ack-based не используется, читается по offset.
14. Когда выбрать stream vs queue? Stream — replay, multi-consumer read одного и того же сообщения, длинный retention. Queue — task processing с ack/nack.
15. Что такое lazy queue?
Classic queue с x-queue-mode=lazy: сразу пишет в диск, минимум RAM. Для очень больших backlog.
16. Что такое vm_memory_high_watermark? Порог RAM (default 0.4 от системы), выше которого broker блокирует producers (flow control / memory alarm).
17. Зачем federation и shovel? Inter-broker репликация: shovel — простой forward, federation — exchange/queue видит upstream-broker.
18. Какая Go-библиотека рекомендуется?
github.com/rabbitmq/amqp091-go (форк streadway/amqp, поддерживается RabbitMQ team).
19. Channel в Go-клиенте thread-safe? Нет. Используй один channel на одну goroutine.
20. Что делает mandatory=true при publish?
Если ни одна queue не bound на routing key — broker вернёт сообщение через NotifyReturn, не «потеряет».
21. Как защититься от poison message?
x-delivery-limit (quorum queue) + DLX, отдельный DLQ-consumer для расследования.
22. Priority queue: ограничения? Только classic queue (не quorum), число приоритетов малое (10-20), внутри = несколько подочередей.
23. Когда выбрать RabbitMQ, а когда Kafka? RabbitMQ — гибкое routing, task-queue, fan-out, request-reply. Kafka — high-throughput log, replay, stream processing.
24. Сколько узлов для HA? 3 для quorum queues (Raft quorum); 2 минимально, но без HA при падении 1.
25. Как мониторить RabbitMQ?
rabbitmq_prometheus plugin → Prometheus; Management UI; ключевые метрики — depth, unacked, memory alarm.
6. Practice
Заголовок раздела «6. Practice»-
Topic routing. Создай
eventstopic exchange + 3 queue:events.us.*,events.eu.*,events.#. Publish 10 сообщений с разными routing keys, проверь куда какие попали. -
DLX с x-delivery-limit. Quorum queue
ordersс DLXorders.dlx; Go-consumer reject каждое 3-е сообщение. После 5 попыток — должно уехать в DLQ. -
Publisher confirms. Реализуй надёжный publisher: publish + ждём confirm; в случае nack — retry с backoff.
-
Priority queue. Очередь tasks с
x-max-priority=10; запусти 10 producer’ов с разными priority, проверь порядок обработки. -
RPC pattern. Реализуй RPC server (consumes на
rpc.calc, отвечает вreply_to); RPC client сcorrelation_id. Замерь latency vs gRPC. -
Cluster + quorum queue. docker-compose с 3 RabbitMQ-узлами; создай quorum queue, отправь сообщения, останови leader-node, проверь продолжение работы.
-
Streams. Создай stream
events, опубликуй 10k сообщений; consumer A читает с начала, consumer B — с offset=5000; проверь replay. -
Federation. Подними 2 broker’а, настрой federation queue из dc1→dc2; publish в dc1, прими в dc2.
-
Memory alarm. Снизь
vm_memory_high_watermarkдо 0.05, наполни queue до alarm, посмотри что publish блокируется; recover. -
Prometheus + Grafana. Включи
rabbitmq_prometheus, дашборд: queue depth, message rate, unacked, memory.
7. Дополнительные блоки
Заголовок раздела «7. Дополнительные блоки»7.1 ASCII: routing topic exchange
Заголовок раздела «7.1 ASCII: routing topic exchange» Exchange "events" (topic) │ binding: "events.user.*" │ binding: "events.payment.#" ┌────────────────────────────┼────────────────────────────┐ ▼ ▼ ▼ Q("user-events") Q("all-payments") Q("audit", "events.#") │ │ │ ▼ ▼ ▼ user-svc consumer billing-svc consumer audit-svc consumer
publish key="events.user.registered" → user-events ✓, audit ✓ publish key="events.payment.captured" → all-payments ✓, audit ✓ publish key="events.order.created" → audit ✓ (через #)7.2 Reconnect-обёртка для amqp091-go
Заголовок раздела «7.2 Reconnect-обёртка для amqp091-go»type Connector struct { url string mu sync.RWMutex conn *amqp.Connection closed atomic.Bool}
func (c *Connector) Connect(ctx context.Context) error { conn, err := amqp.Dial(c.url) if err != nil { return err } c.mu.Lock(); c.conn = conn; c.mu.Unlock()
go c.watch(ctx) return nil}
func (c *Connector) watch(ctx context.Context) { errCh := c.conn.NotifyClose(make(chan *amqp.Error, 1)) select { case <-ctx.Done(): c.conn.Close() case err := <-errCh: if c.closed.Load() { return } log.Warnw("amqp connection lost", "err", err) // backoff и reconnect backoff := time.Second for { if c.closed.Load() { return } time.Sleep(backoff) if err := c.Connect(ctx); err == nil { log.Info("amqp reconnected") return } backoff = min(backoff*2, 30*time.Second) } }}
func (c *Connector) Channel() (*amqp.Channel, error) { c.mu.RLock(); defer c.mu.RUnlock() if c.conn == nil { return nil, errors.New("not connected") } return c.conn.Channel()}В production обычно делают per-channel-recovery + per-consumer-recovery — пакеты типа wagslane/go-rabbitmq оборачивают это.
7.3 Topology declaration: declarative подход
Заголовок раздела «7.3 Topology declaration: declarative подход»В amqp091-go топология декларируется кодом при запуске. Антипаттерн: пересоздавать каждый раз. Лучше:
func DeclareTopology(ch *amqp.Channel) error { if err := ch.ExchangeDeclare("orders", "topic", true, false, false, false, nil); err != nil { return err } if _, err := ch.QueueDeclare("orders.created", true, false, false, false, amqp.Table{ "x-queue-type": "quorum", "x-dead-letter-exchange": "orders.dlx", "x-delivery-limit": 5, }); err != nil { return err } return ch.QueueBind("orders.created", "orders.created.#", "orders", false, nil)}Альтернатива — Definitions API: экспортировать JSON топологию через RabbitMQ Management plugin и импортировать декларативно при деплое.
7.4 Шаблон надёжного consumer
Заголовок раздела «7.4 Шаблон надёжного consumer»func RunConsumer(ctx context.Context, conn *amqp.Connection, queue string, h Handler) error { ch, err := conn.Channel() if err != nil { return err } defer ch.Close()
ch.Qos(50, 0, false)
msgs, err := ch.Consume(queue, "", false, false, false, false, nil) if err != nil { return err }
sem := make(chan struct{}, 16) // ограничиваем параллелизм
for d := range msgs { sem <- struct{}{} go func(d amqp.Delivery) { defer func() { <-sem }() ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() if err := h(ctx, d); err != nil { d.Nack(false, false) // в DLX return } d.Ack(false) }(d) } return nil}⚠️ В цикле range msgs нельзя долго блокироваться внутри одной горутины (channel в Go блокирует) — выноси обработку в goroutine + ограничивай parallelism через семафор.
7.5 Идемпотентность consumer
Заголовок раздела «7.5 Идемпотентность consumer»Сообщения могут быть delivered more than once (RabbitMQ at-least-once). Consumer должен быть идемпотентным:
func (h *Handler) Handle(ctx context.Context, d amqp.Delivery) error { var ev Event if err := json.Unmarshal(d.Body, &ev); err != nil { return err } // dedup table res, err := h.db.Exec(ctx, `INSERT INTO processed_events (event_id, processed_at) VALUES ($1, NOW()) ON CONFLICT DO NOTHING`, ev.ID) if err != nil { return err } if rows, _ := res.RowsAffected(); rows == 0 { return nil // дубль, уже обработан } return h.processBusiness(ctx, ev)}Или используй MessageId как ключ дедупликации.
7.6 Bench-ориентиры RabbitMQ vs Kafka
Заголовок раздела «7.6 Bench-ориентиры RabbitMQ vs Kafka»| Сценарий | RabbitMQ Quorum | Kafka | NATS JetStream |
|---|---|---|---|
| 1KB, 1 producer | ~30k msg/s | 500k+ msg/s | 200k msg/s |
| 1KB, fan-out 1→10 | хорошо (Exchange) | плохо (consumers = partitions limit) | хорошо |
| Replication overhead | средний (Raft) | низкий (ISR) | средний (Raft) |
RabbitMQ выигрывает в сценариях с сложной маршрутизацией и request-reply; проигрывает в чистом throughput.
7.7 Когда выбрать streams (RabbitMQ) vs Kafka
Заголовок раздела «7.7 Когда выбрать streams (RabbitMQ) vs Kafka»Streams (RabbitMQ 3.9+) — это Kafka-like log. Когда брать RabbitMQ Streams вместо отдельной Kafka:
- Команда уже эксплуатирует RabbitMQ, не хочет вторую систему.
- Нужны и queue, и stream.
- Объёмы средние (не петабайты).
Когда лучше Kafka:
- Огромный throughput.
- Нужен ecosystem (Connect, Streams, ksqlDB, Schema Registry).
- EoS pipelines.
7.8 RabbitMQ Cluster Operator (Kubernetes)
Заголовок раздела «7.8 RabbitMQ Cluster Operator (Kubernetes)»Официальный K8s-оператор:
apiVersion: rabbitmq.com/v1beta1kind: RabbitmqClustermetadata: name: prodspec: replicas: 3 persistence: storage: 100Gi storageClassName: ssd resources: requests: cpu: 2 memory: 4Gi rabbitmq: additionalConfig: | vm_memory_high_watermark.relative = 0.5 cluster_partition_handling = pause_minority additionalPlugins: - rabbitmq_prometheus - rabbitmq_managementУправляет lifecycle, persistent volumes, инжектит cookie, делает rolling update.
7.9 Чек-лист продакшен RabbitMQ-deployment
Заголовок раздела «7.9 Чек-лист продакшен RabbitMQ-deployment»- 3-node cluster, quorum queues для критичных.
- Publisher confirms на критичных publish.
- Manual ack + prefetch на consumers.
- DLX +
x-delivery-limit(quorum) для всех очередей. - Mandatory + NotifyReturn для важных publish.
- Идемпотентность consumer (dedup table или idempotent SQL).
- Memory high-watermark, disk free limit настроены.
- Prometheus + Grafana дашборд (queue depth, unacked, alarm).
- Alerts: memory_alarm, disk_alarm, queue too large, consumer disconnected.
- TLS на amqps:// и management.
- Auth через RabbitMQ users + permissions per vhost.
- Реплицируемые definitions (через RabbitMQ Definitions API).
- Резервная копия Mnesia + definitions.
7.10 Pitfalls в реальности
Заголовок раздела «7.10 Pitfalls в реальности»Случай 1: «Очередь росла, потом сервис упал». → memory high-watermark был на 0.4, лимит RAM = 4GB → при backlog 2GB всё в alarm → producers blocked. → фикс: lazy queue + scale RAM до 16GB + alert при queue > 100k.
Случай 2: «Сообщения иногда теряются». → producer не использовал confirms; на mandatory false, binding забыли. → фикс: добавили publisher confirms + mandatory + NotifyReturn handler.
Случай 3: «Consumer завис, queue растёт». → prefetch=0 + один consumer + долгая обработка → channel «забит» unacked. → фикс: prefetch=50 + scale consumers + timeout per-message.
8. Источники
Заголовок раздела «8. Источники»- RabbitMQ Documentation — официально.
- RabbitMQ in Action — Videla, Williams.
- RabbitMQ in Depth — Gavin Roy.
- rabbitmq/amqp091-go — Go-клиент.
- Quorum queues guide — глубокий разбор.
- Streams overview — RabbitMQ streams.
- RabbitMQ Cluster Operator (K8s) — оператор для Kubernetes.
- Prometheus monitoring — Prometheus + Grafana.
- «Mirrored queues are deprecated» — guide к миграции на quorum.
- Comparing message brokers (CloudAMQP) — обзоры с реальными метриками.