Перейти к содержимому

NATS и RabbitMQ в Go

Зачем знать: Kafka — не единственный брокер. NATS — лёгкий, быстрый для real-time messaging и RPC между микросервисами; JetStream добавляет persistence. RabbitMQ — гибкая маршрутизация на AMQP, идеален для task queues. Middle 1 Go-разработчик в 2026 должен понимать, когда выбрать NATS vs Kafka vs RabbitMQ, уметь работать с обоими клиентами и осознавать ограничения (at-most-once у Core NATS, retention у RabbitMQ).

  1. Базовая концепция
  2. Как в Go (с примерами)
  3. Gotchas
  4. Best practices в production
  5. Вопросы на собесе
  6. Practice
  7. Источники

NATS (от Apcera, сейчас CNCF) — высокопроизводительный pub-sub и request-reply messaging. Single binary, < 20MB, миллионы сообщений в секунду.

Не очереди и не topics — subjects. Иерархия через .:

  • orders.created
  • payments.refunded.eu

Wildcards:

  • * — один уровень: orders.* ловит orders.created, orders.canceled, но не orders.eu.created.
  • > — много уровней: orders.> ловит всё под orders.
  • Fire-and-forget. Подписчика нет / в моменте недоступен → сообщение теряется.
  • Очень быстрый: < 1ms latency, миллионы msg/sec.
  • Использует TCP, но без durability.
  • Persistent layer поверх Core NATS.
  • Streams (как Kafka topics), Consumers (durable / push / pull / ephemeral).
  • At-least-once / exactly-once delivery.
  • KV Store, Object Store (S3-like).
  • Replication (Raft).
  • Microservices RPC (request-reply).
  • Real-time notifications.
  • IoT / edge messaging.
  • Service mesh control plane.
  • KV (как distributed Redis-lite).

RabbitMQ — реализация AMQP 0.9.1 (Advanced Message Queuing Protocol). Erlang-based, зрелый (с 2007).

  • Connection — TCP-соединение клиент-сервер.
  • Channel — мультиплексирование внутри connection (1 connection, N channels).
  • Exchange — routing primitive (direct, fanout, topic, headers).
  • Queue — собственно очередь сообщений (FIFO, persistent или нет).
  • Binding — правило: какие сообщения exchange → queue по routing key.
  • Consumer — приложение, читающее из queue.
  • Direct — routing key точно равно binding key. (orders.created → queue с binding orders.created)
  • Fanout — игнорирует routing key, шлёт всем bind queues. (broadcast)
  • Topic — wildcards (*, #): orders.*orders.created matches.
  • Headers — по headers (редко используется).
  • autoAck — сразу ack при доставке (риск потери).
  • manualAck — consumer вызывает Ack(deliveryTag, multiple) после обработки.
  • Nack/Reject — отрицательное подтверждение, может вернуть в очередь.
  • Classic queues — оригинальные. Не для prod в 2026 (mirrored queues deprecated в 3.9).
  • Quorum queues — Raft-based, HA, replication. Стандарт для prod.
  • Streams — append-only, Kafka-like (RabbitMQ 3.9+).

Сообщения, которые отвергли (nack, expired, queue overflow), отправляются в DLX → DLQ.

  • Task queues (фоновые задачи, retries).
  • Complex routing (one event → 5 queues).
  • RPC (через reply_to и correlation_id).
  • Workflow orchestration.
KafkaNATS (Core)NATS JetStreamRabbitMQ
Latency5-50ms< 1ms1-5ms1-10ms
Throughput1M msg/s/node5M+ msg/s100K-1M msg/s50K msg/s/queue
Persistenceyesnoyesyes
Orderingper partitionnoper streamper queue
Replayyesnoyesstreams only
Routingпо partitionsubjects + wildcardssubjectsexchanges (direct/topic/fanout/header)
Per-msg acknonoyesyes
Prioritynononoyes
Delay/schedulingnononakDelayplugin / delayed exchange
Setupheavy (broker + KRaft)single binarysingle binarymedium
  • Real-time messaging, low latency, microservices RPC → NATS Core.
  • At-least-once + persistence + streams, медиум scale → NATS JetStream.
  • High throughput streaming, replay, audit logKafka.
  • Complex routing, task queues, retries/delays → RabbitMQ.
  • Tiny scale, dev/localRedis Streams.

Окно терминала
go get github.com/nats-io/nats.go

Локальный nats-server:

Окно терминала
docker run -p 4222:4222 nats:latest
package main
import (
"github.com/nats-io/nats.go"
"log"
"time"
)
func main() {
nc, err := nats.Connect("nats://localhost:4222",
nats.Name("my-service"),
nats.MaxReconnects(-1),
nats.ReconnectWait(time.Second),
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
log.Printf("disconnected: %v", err)
}),
nats.ReconnectHandler(func(c *nats.Conn) {
log.Printf("reconnected to %s", c.ConnectedUrl())
}),
)
if err != nil {
log.Fatal(err)
}
defer nc.Close()
// Subscriber (async)
sub, _ := nc.Subscribe("orders.*", func(m *nats.Msg) {
log.Printf("got: subject=%s data=%s", m.Subject, m.Data)
})
defer sub.Unsubscribe()
// Publisher
for i := 0; i < 10; i++ {
nc.Publish("orders.created", []byte("hello"))
}
nc.Flush()
time.Sleep(time.Second)
}
// Server side
nc.Subscribe("math.add", func(m *nats.Msg) {
a, b := parse(m.Data)
m.Respond([]byte(fmt.Sprintf("%d", a+b)))
})
// Client side
resp, err := nc.Request("math.add", []byte("2 3"), 2*time.Second)
if err == nil {
fmt.Println(string(resp.Data)) // "5"
}

Под капотом — inbox subject и reply-to header.

nc.QueueSubscribe("orders.created", "orders-workers", func(m *nats.Msg) {
// сообщение получит один из подписчиков queue group
})

В отличие от обычного subscribe, в queue group сообщение получает только один подписчик. Это load balancing.

js, _ := jetstream.New(nc) // jetstream package в nats.go
ctx := context.Background()
// Создаём stream
_, err := js.CreateStream(ctx, jetstream.StreamConfig{
Name: "ORDERS",
Subjects: []string{"orders.>"},
Storage: jetstream.FileStorage,
Retention: jetstream.WorkQueuePolicy, // или LimitsPolicy
MaxAge: 24 * time.Hour,
Replicas: 3,
})
// Producer
ack, _ := js.Publish(ctx, "orders.created", []byte(`{"id":"o-1"}`))
log.Printf("seq=%d", ack.Sequence)
// Consumer (pull-based, рекомендуется)
stream, _ := js.Stream(ctx, "ORDERS")
cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "workers",
AckPolicy: jetstream.AckExplicitPolicy,
MaxDeliver: 5,
AckWait: 30 * time.Second,
DeliverPolicy: jetstream.DeliverNewPolicy,
})
// Consume loop
it, _ := cons.Messages()
for {
msg, err := it.Next()
if err != nil { continue }
if err := process(msg.Data()); err != nil {
msg.Nak()
continue
}
msg.Ack()
}
kv, _ := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "config",
TTL: time.Hour,
})
kv.Put(ctx, "service.url", []byte("https://..."))
entry, _ := kv.Get(ctx, "service.url")

Распределённый KV с TTL, watch, history.

Окно терминала
go get github.com/rabbitmq/amqp091-go

Локально:

Окно терминала
docker run -p 5672:5672 -p 15672:15672 rabbitmq:3-management

UI на http://localhost:15672 (guest/guest).

package main
import (
"context"
"github.com/rabbitmq/amqp091-go"
"log"
)
func main() {
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
if err != nil { log.Fatal(err) }
defer conn.Close()
ch, err := conn.Channel()
if err != nil { log.Fatal(err) }
defer ch.Close()
// Declare exchange (idempotent)
err = ch.ExchangeDeclare(
"orders", // name
"topic", // type
true, // durable
false, // auto-delete
false, // internal
false, // no-wait
nil,
)
if err != nil { log.Fatal(err) }
// Publish
ctx := context.Background()
err = ch.PublishWithContext(ctx,
"orders", // exchange
"orders.created", // routing key
false, // mandatory
false, // immediate
amqp091.Publishing{
ContentType: "application/json",
Body: []byte(`{"id":"o-1"}`),
DeliveryMode: amqp091.Persistent, // запись на диск
MessageId: "msg-1",
Timestamp: time.Now(),
},
)
}
ch, _ := conn.Channel()
defer ch.Close()
// Declare queue (idempotent)
q, _ := ch.QueueDeclare(
"orders-workers", // name
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
amqp091.Table{
"x-queue-type": "quorum", // quorum queue
"x-dead-letter-exchange": "orders.dlx",
},
)
// Bind queue to exchange
ch.QueueBind(q.Name, "orders.*", "orders", false, nil)
// Prefetch (QoS)
ch.Qos(10, 0, false) // 10 unacked messages per consumer
// Consume
msgs, _ := ch.Consume(
q.Name, // queue
"", // consumer tag (auto-generated)
false, // auto-ack=false (manual)
false, // exclusive
false, // no-local
false, // no-wait
nil,
)
for msg := range msgs {
if err := process(msg.Body); err != nil {
msg.Nack(false, false) // не вернуть в очередь → DLX
continue
}
msg.Ack(false)
}
// Main queue с DLX
ch.QueueDeclare("orders", true, false, false, false, amqp091.Table{
"x-dead-letter-exchange": "orders.dlx",
})
// DLX exchange
ch.ExchangeDeclare("orders.dlx", "fanout", true, false, false, false, nil)
// DLQ queue, bound to DLX
ch.QueueDeclare("orders.dlq", true, false, false, false, nil)
ch.QueueBind("orders.dlq", "", "orders.dlx", false, nil)

Nack(requeue=false) → exchange orders.dlx → queue orders.dlq.

// Requires rabbitmq_delayed_message_exchange plugin
ch.ExchangeDeclare("orders.delayed", "x-delayed-message", true, false, false, false, amqp091.Table{
"x-delayed-type": "topic",
})
ch.PublishWithContext(ctx, "orders.delayed", "orders.retry", false, false, amqp091.Publishing{
Body: []byte(`...`),
Headers: amqp091.Table{
"x-delay": 30000, // 30 секунд
},
})

Чтобы гарантировать, что сообщение записано на диск:

ch.Confirm(false)
confirms := ch.NotifyPublish(make(chan amqp091.Confirmation, 1))
ch.PublishWithContext(...)
conf := <-confirms
if !conf.Ack {
// сообщение не записано, retry
}

В amqp091-go есть удобный PublishWithDeferredConfirmWithContext.

// Client
replyQ, _ := ch.QueueDeclare("", false, false, true, false, nil)
corrID := uuid.NewString()
ch.PublishWithContext(ctx, "", "rpc.add", false, false, amqp091.Publishing{
ContentType: "application/json",
CorrelationId: corrID,
ReplyTo: replyQ.Name,
Body: []byte(`{"a":2,"b":3}`),
})
msgs, _ := ch.Consume(replyQ.Name, "", true, false, false, false, nil)
for m := range msgs {
if m.CorrelationId == corrID {
fmt.Println(string(m.Body))
break
}
}

NATS:

import "github.com/nats-io/nats.go/encoders/protobuf"
// trace propagation вручную через msg.Header.Set("traceparent", ...)

RabbitMQ:

// trace propagation через headers в amqp091.Publishing.Headers

Готовых contrib-инструментаций как у HTTP пока меньше — обычно пишут middleware.

Лежащие в разных DC NATS-кластеры можно связать через gateways и leaf nodes. Сообщения проходят между ними по subjects. Используется для гео-распределённых систем и edge.


nc.Publish("x", data) // если подписчика нет → потеря

Core NATS — at-most-once. Для гарантий — JetStream.

nats.SlowConsumerError

Если ваш handler медленный, NATS буферизует, но при переполнении буфера — сообщения дропаются, consumer помечается «slow». Решения: вынести обработку в горутины с bounded channel.

nc.Publish("x", data)
os.Exit(0) // сообщение в буфере, не отправлено

nc.Flush() перед exit или defer nc.Close() (закрытие flush-ит).

Default 1MB. Конфигурируется на stream / server. Big payloads → Object Store.

Default ack_wait = 30 секунд. Если обработка дольше — сообщение redelivered (дубль). Увеличьте AckWait или периодически вызывайте msg.InProgress().

Один Channel нельзя использовать из нескольких горутин одновременно. Solution: pool of channels или channel per goroutine.

ch.Qos(0, 0, false) // дефолт: unbounded prefetch

Без QoS consumer получит все сообщения сразу → OOM. Установите prefetch=10-100.

ch.QueueDeclare(...) // default: classic queue

В 2026 для prod — x-queue-type: quorum. Classic queues не HA, mirrored deprecated.

Без ch.Confirm(false) и NotifyPublish — продюсер думает, что отправил, но broker мог потерять. Для важных событий — обязательно publisher confirms.

Стандартная библиотека amqp091-go не делает auto-reconnect. Wrapper нужен (например github.com/wagslane/go-rabbitmq или своя обёртка).

orders.*.created — НЕ matches orders.eu.uk.created (только один уровень)
orders.> — matches orders.eu.uk.created

256 символов — деградация производительности. Держите subjects короткими.

Длинная queue (миллионы сообщений) → деградация. Используйте Streams или лимиты.

При shutdown:

nc.Drain() // обработать все буферизованные, потом close

Без drain — pending сообщения теряются.

Если heartbeat не отвечает (default 60s) — connection закрывается. На медленных сетях — увеличить.

Когда RAM > 40% (default), publishers блокируются. Симптом: producer зависает. Алертить.

NATS Core не имеет «декларации» subjects — публикуй куда угодно. В JetStream — нужно создать stream.

Если queue не durable, при рестарте RabbitMQ — пропадает. Если message не persistent, тоже пропадает. Для durability нужно ОБА: durable:true + DeliveryMode: Persistent.


Core NATS — для real-time non-critical (notifications, presence, metrics). JetStream — для critical events.

Между N инстансами consumer-а используйте QueueSubscribe. Сообщение получит только один.

defer nc.Drain() // не nc.Close()

В prod: TLS, NKey/JWT auth, accounts (multi-tenancy).

Replicas: 3 для отказоустойчивости. Без этого падение одного брокера = потеря stream.

Pull > Push для большинства случаев: контроль скорости consumer-ом, backpressure, нет slow-consumer проблемы.

Не AckNonePolicy (потеря). Не AckAllPolicy (если crash после последнего ack, дубли). Explicit — ack каждое сообщение.

В 2026 default для prod. Mirrored queues — deprecated.

Для важных событий — ch.Confirm(false) + NotifyPublish. Иначе можно потерять (broker crash до flush).

autoAck=false. Ack только после успешной обработки. На ошибке — Nack с requeue=false → DLX.

ch.Qos(10-100, 0, false). Без этого consumer задохнётся.

Для каждой важной queue настройте x-dead-letter-exchange. Без DLX — отвергнутые сообщения уходят в небытие.

type Pool struct {
conn *amqp091.Connection
channels chan *amqp091.Channel
}

Один connection, много channels. Или несколько connections для distribute.

message_id deduplication: храните processed IDs (с TTL), skip duplicates.

Headers (traceparent) пробрасывайте вручную или через middleware. RecordHeader в NATS, Headers в RabbitMQ.

NATS: nats-prometheus-exporter, nats_* метрики (slow consumers, msgs received/sent).

RabbitMQ: management plugin + Prometheus exporter (queue depth, message rate, channel count).

Метрики клиента: ack/nack rate, consumer latency.

NATS: nc.IsConnected() или /healthz endpoint.

RabbitMQ: conn.IsClosed() + ping channel.

RabbitMQ RPC pattern OK для редких запросов. Для high QPS — gRPC или NATS.

NATS: Drain().

RabbitMQ: cancel consumer (ch.Cancel), wait inflight, then Close().

NATS: in-memory, scales с RAM. JetStream — диск + replicas.

RabbitMQ: проверьте disk free alarm, memory high watermark, file descriptors. Один broker — до ~50K connection, 100K queues.


  1. Что такое subject в NATS? Иерархическое имя (orders.created). Wildcards * (один уровень) и > (много).

  2. Core NATS vs JetStream? Core — at-most-once, fire-forget, без persistence. JetStream — persistent streams + consumers + acks.

  3. Что такое queue group? Группа подписчиков, разделяющих сообщения (load balancing). Одно сообщение → один consumer в группе.

  4. Чем NATS лучше Kafka для real-time? Низкая latency (< 1ms), single binary, не нужен JVM/Zookeeper. Но Kafka мощнее для high throughput streaming.

  5. Что такое NATS Request-Reply? Sync RPC через subject + inbox для ответа. nc.Request(subject, data, timeout).

  6. Что такое slow consumer? Consumer не успевает обрабатывать → буфер NATS переполняется → сообщения дропаются. Решение: вынести в горутины, увеличить буфер, переходить на JetStream pull.

  7. AckPolicy в JetStream? None (без ack), All (acks все до текущего), Explicit (ack каждое — стандарт для гарантий).

  8. Что такое Leaf Node / Gateway? Связь NATS кластеров (multi-cluster, edge-to-cloud).

  9. NATS KV — что это? Distributed key-value поверх JetStream stream с compaction. Подходит для конфигов, флагов, координации.

  10. Когда нельзя использовать NATS? Для очень больших объёмов retention (TB+, Kafka лучше), для priority/delay сообщений (RabbitMQ).

  1. Что такое exchange? Routing primitive: получает сообщение и распределяет в queues по правилам.

  2. Типы exchanges? Direct (точное match key), Fanout (broadcast), Topic (wildcards * и #), Headers (редко).

  3. Что такое binding? Правило: exchange + queue + routing key (или pattern). Без binding queue не получает сообщения.

  4. Manual vs auto ack? Auto-ack — сразу после получения (риск потери при crash). Manual — после обработки.

  5. Чем quorum queues отличаются от classic? Quorum — Raft, HA, replicated. Classic — single node (mirrored deprecated). В 2026 quorum default для prod.

  6. Что такое DLX? Dead Letter Exchange. Куда уходят сообщения после nack/expired/overflow. Бывает связан с DLQ.

  7. Что такое prefetch? Лимит unacked сообщений на consumer (Qos(prefetch_count, 0, false)). Без него consumer получит всё сразу.

  8. Publisher confirms — зачем? Гарантия, что сообщение записано broker-ом. Без confirms producer не знает, дошло ли.

  9. Channel vs Connection? Connection — TCP. Channel — мультиплекс внутри connection. Один connection — много channels.

  10. Когда выбрать RabbitMQ? Сложная маршрутизация (один event → много queues разными правилами), per-message ack/delay/priority, task queues с retries.

  1. NATS vs Kafka? NATS — низкая latency, lightweight, simple ops. Kafka — высокий throughput, persistent log, replay.

  2. RabbitMQ vs Kafka? RabbitMQ — гибкая routing, queue per consumer, ack per message. Kafka — log-based, replay, ordered, partitioned.

  3. NATS vs RabbitMQ? NATS быстрее, проще, но менее гибкий routing. RabbitMQ — complex routing, scheduling, per-message features.

  4. Какой брокер для микросервисной RPC? NATS Core (request-reply) или gRPC. RabbitMQ RPC возможен, но overkill.

  5. Какой брокер для real-time notifications? NATS Core (fast pub-sub). Если нужна durability — JetStream.

  6. Какой брокер для task queue с retries? RabbitMQ (quorum + DLX + delayed exchange) или NATS JetStream (MaxDeliver + NakWithDelay).

  7. Какой брокер для audit log / event sourcing? Kafka (replay, retention годами, partitioned). NATS JetStream — для среднего масштаба.

  8. Что такое Pulsar и зачем? Apache Pulsar — гибрид: pub-sub + queue, multi-tenancy, tiered storage. Альтернатива Kafka для multi-region.

  9. NATS в K8s? Helm chart, StatefulSet (для JetStream), supercluster для multi-region.

  10. RabbitMQ в K8s? RabbitMQ Cluster Operator. StatefulSet, persistent storage, quorum queues.


Запустите nats-server локально. Напишите publisher (10 msg/sec) и subscriber (orders.*). Проверьте wildcards.

Реализуйте «math server» — отвечает на math.add, math.mul. Клиент с timeout 1s.

3 инстанса consumer с одной queue group. Запустите 30 сообщений — увидьте ~10 каждому.

Создайте stream EVENTS (subject events.>). Producer пишет, pull consumer читает с manual ack. Симулируйте crash после получения, до ack — сообщение redelivered.

Запишите конфиг в KV bucket config. Сделайте watcher, который реагирует на изменения.

Exchange orders, два queue: orders.eu (binding key orders.eu), orders.us (orders.us). Producer публикует с разными keys, проверьте маршрутизацию.

Exchange events (topic). Queue audit (binding *.created.*). Producer публикует с разными keys, проверьте кто получает.

Создайте quorum queue с DLX. Симулируйте обработку с ошибкой — увидьте сообщение в DLQ.

Producer с Confirm mode. Симулируйте disconnect (docker stop rabbitmq на 5 сек) — увидите, что confirms не приходят, retry в коде.

Producer добавляет traceparent header. Consumer читает header и продолжает trace. В Jaeger увидите связанный trace producer → consumer.


  1. NATS docshttps://docs.nats.io/.
  2. nats.gohttps://github.com/nats-io/nats.go.
  3. JetStream guidehttps://docs.nats.io/nats-concepts/jetstream.
  4. RabbitMQ docshttps://www.rabbitmq.com/documentation.html.
  5. amqp091-gohttps://github.com/rabbitmq/amqp091-go.
  6. AMQP 0.9.1 spechttps://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf.
  7. “RabbitMQ in Depth” by Gavin M. Roy (Manning, 2017).
  8. “Mastering RabbitMQ” recent editions / RabbitMQ Summit talks — YouTube.
  9. NATS by Examplehttps://natsbyexample.com/.
  10. CNCF Cloud Native Landscape: Messaginghttps://landscape.cncf.io/.