Перейти к содержимому

30. NATS JetStream + Apache Pulsar

Зачем знать на Middle 3: Kafka — не единственный messaging-стек на свете. NATS JetStream — лёгкая, low-latency, edge-friendly альтернатива с KV/Object store «из коробки». Apache Pulsar — облачно-нативный конкурент Kafka с tiered storage, multi-tenancy и geo-replication. Senior должен понимать, какая система под какую задачу: high-throughput log streaming → Kafka, real-time микросервисная RPC + KV → NATS, multi-tenant SaaS / infinite retention → Pulsar. Без этого выбираешь по «модности» и потом мучаешься с архитектурой.


  1. Концепция
  2. Production-deep dive
  3. Gotchas
  4. Real cases
  5. Вопросы
  6. Practice
  7. Источники

NATS — open-source messaging, написан на Go, минималистичный. Базовый NATS = pub/sub без persistence, миллион сообщений в секунду, latency микросекунды.

JetStream (с NATS 2.2) — persistent layer поверх NATS. Streams, consumers, KV-store, Object-store.

Subject-based messaging: иерархия subject’ов (orders.created, orders.updated, wildcard orders.*). Это главное отличие от Kafka: вместо строгих топиков — гибкая иерархия.

Apache Pulsar — distributed messaging, Yahoo origin, top-level Apache. Главные особенности:

  • Архитектура с разделением compute (brokers) и storage (BookKeeper).
  • Multi-tenancy native: tenants → namespaces → topics.
  • Tiered storage: горячие данные на BookKeeper, холодные — на S3/GCS.
  • Geo-replication built-in.
NATS Pulsar
┌───────────┐ ┌──────────────────┐
│ │ │ Brokers (st.) │
Publishers │ NATS │ Consumers │ ──────────── │
─────────► │ Server │ ──────────► │ BookKeeper │
│ JetStream │ │ (storage) │
│ (store) │ │ ──────────── │
└───────────┘ │ ZK / metadata │
└──────────────────┘
Tiered → S3

Базовый NATS — fire-and-forget pub/sub. Нет persistence, нет offset’ов, нет retention.

import "github.com/nats-io/nats.go"
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Drain()
// Publish
nc.Publish("orders.created", []byte(`{"id":1}`))
// Subscribe (async)
sub, _ := nc.Subscribe("orders.*", func(m *nats.Msg) {
fmt.Println(string(m.Data))
})

Request-Reply (синхронный RPC через NATS):

// server
nc.Subscribe("svc.greet", func(m *nats.Msg) {
m.Respond([]byte("hello " + string(m.Data)))
})
// client
resp, _ := nc.Request("svc.greet", []byte("alice"), 2*time.Second)
fmt.Println(string(resp.Data))

⚠️ NATS Core — это «UDP-like» для distributed apps. Если клиент не подписан в момент publish — сообщение потеряно.

Stream — это persistent log с subject mapping. По сути аналог Kafka topic, но:

  • В stream могут собираться несколько subjects (orders.> — все суб-сабжекты).
  • Один stream может перекрывать иерархию.
js, _ := jetstream.New(nc)
ctx := context.Background()
stream, err := js.CreateStream(ctx, jetstream.StreamConfig{
Name: "ORDERS",
Subjects: []string{"orders.>"},
Storage: jetstream.FileStorage,
Replicas: 3,
MaxBytes: 10 * 1024 * 1024 * 1024, // 10GB
MaxAge: 7 * 24 * time.Hour,
Retention: jetstream.LimitsPolicy,
Discard: jetstream.DiscardOld,
})

Retention policies:

PolicyКогда удалять
LimitsPolicyПо max_bytes / max_age / max_msgs (как Kafka)
WorkQueuePolicyПосле ack хотя бы одним consumer’ом (как «очередь»)
InterestPolicyПока есть подписчики (consumer’ы)

Discard policy:

  • DiscardOld — выкидывает старые сообщения, когда полно.
  • DiscardNew — отвергает новые publish при полном stream’е.

Storage:

  • FileStorage — на диск (default, durable).
  • MemoryStorage — RAM (быстро, теряется при рестарте).

Replication: R=3 (typical). Координация — Raft под капотом (Raft группа на каждый stream).

Consumer = stateful подписка на stream. Хранит свой offset (sequence).

Push vs Pull:

ТипПоведение
PushJetStream отправляет сообщения на subject (как обычный NATS sub).
PullКлиент сам делает Fetch — точный контроль скорости, batching.

Recommended: Pull-consumers для production. Естественный backpressure.

// Pull consumer
cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Name: "worker",
Durable: "worker",
AckPolicy: jetstream.AckExplicitPolicy,
DeliverPolicy: jetstream.DeliverAllPolicy,
FilterSubject: "orders.created",
MaxDeliver: 5, // до 5 ретраев
AckWait: 30 * time.Second, // через сколько считать not-ack'нутым
})
for {
msgs, err := cons.Fetch(100, jetstream.FetchMaxWait(5*time.Second))
if err != nil { continue }
for msg := range msgs.Messages() {
handle(msg.Data())
msg.Ack()
}
}

Ack policies:

PolicyПоведение
AckNoneБез ack. JetStream удаляет сразу при доставке.
AckAllAck высокого offset подтверждает все ниже.
AckExplicitКаждое сообщение требует свой Ack. Recommended.

Durable vs Ephemeral:

  • Durable — имеет имя, переживёт рестарт клиента, продолжит с того же offset.
  • Ephemeral — без имени, удаляется при inactivity.

Filter subject — consumer слушает только нужный subset stream’а (например, orders.created, а не все orders.>).

Replay options:

  • DeliverAllPolicy — с начала stream’а.
  • DeliverLastPolicy — только последнее.
  • DeliverNewPolicy — только новые.
  • DeliverByStartSequencePolicy — с конкретного seq.
  • DeliverByStartTimePolicy — с конкретного времени.

Built-in key-value store поверх JetStream:

kv, _ := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: "config",
History: 10, // храним 10 версий каждого ключа
Replicas: 3,
TTL: 24 * time.Hour,
})
kv.Put(ctx, "app.feature.new_ui", []byte("true"))
entry, _ := kv.Get(ctx, "app.feature.new_ui")
fmt.Println(string(entry.Value()))
// Watch — реактивные обновления
w, _ := kv.Watch(ctx, "app.>")
for entry := range w.Updates() {
if entry == nil { continue } // initial sync done
fmt.Println(entry.Key(), string(entry.Value()))
}

⚠️ KV в NATS — это stream под капотом с compaction по ключу. Полезен для:

  • Конфигурации.
  • Service discovery.
  • Распределённых locks.
  • Feature flags.

Альтернатива Redis/etcd/Consul в небольших проектах.

Бинарные большие объекты (файлы, медиа). Чанкуются и хранятся в JetStream.

obj, _ := js.CreateOrUpdateObjectStore(ctx, jetstream.ObjectStoreConfig{
Bucket: "uploads",
})
_, _ = obj.PutBytes(ctx, "avatar.jpg", largeBytes)
data, _ := obj.GetBytes(ctx, "avatar.jpg")

Полезно для embedded scenarios (IoT, edge), где S3 недоступен.

Mirror stream — точная копия другого stream’а (read-only).

js.CreateStream(ctx, jetstream.StreamConfig{
Name: "ORDERS_MIRROR",
Mirror: &jetstream.StreamSource{ Name: "ORDERS" },
})

Sourcing stream — multi-source агрегатор:

js.CreateStream(ctx, jetstream.StreamConfig{
Name: "ALL_EVENTS",
Sources: []*jetstream.StreamSource{
{Name: "ORDERS"}, {Name: "PAYMENTS"}, {Name: "REFUNDS"},
},
})

Leaf node — отдельный NATS-сервер, который подключается к main кластеру как клиент, но при этом сам обслуживает локальных клиентов.

┌────────────────────┐
│ Main NATS cluster│ (HQ data center)
└─────────┬──────────┘
│ secure TLS
┌────────────┼────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Leaf RU │ │ Leaf US │ │ Leaf EU │
└─────────┘ └─────────┘ └─────────┘
│ │ │
IoT devs edge devs plant devs

Use case: IoT, edge computing. Local clients не видят latency главного кластера. Pre-aggregation и фильтрация — на leaf.

Super-cluster — несколько NATS clusters в разных регионах, соединены gateway’ями. Subject hierarchy общая, но трафик роутится «прыжками» только когда есть локальный подписчик.

# nats-server.conf
server_name: nats-1
listen: 0.0.0.0:4222
http_port: 8222
cluster {
name: nats-prod
listen: 0.0.0.0:6222
routes: [nats-route://nats-2:6222, nats-route://nats-3:6222]
}
jetstream {
store_dir: /var/lib/nats
max_mem: 4G
max_file: 200G
}
leafnodes {
port: 7422
}
authorization {
users: [
{user: app, password: "...", permissions: {publish: ["orders.>"], subscribe: ["responses.>"]}}
]
}

Operator-mode через nats-account-server / nsc-tool — для multi-tenant и iso между namespaces. В production советуется.

АспектNATS JetStreamKafka
PersistenceOptional (Core fire-and-forget)Always persistent
Subject hierarchyГибкая (orders.>, wildcard)Жёсткие topic names
ThroughputВысокий (~1M msg/s per stream)Очень высокий (~5M+ msg/s)
LatencyМикросекунды (Core)Миллисекунды
EoSЧерез message dedup + idempotency keys (нет встроенных tx)Transactional producer + read_committed
Operational complexityНизкая (один Go-binary)Высокая (Brokers + ZK/KRaft)
KV/Object storeВстроеныНет (нужен внешний)
Edge / leaf nodesДа (leaf nodes)Нет (MirrorMaker — кластер↔кластер)
EcosystemМеньшеОгромный (Connect, Streams, KSQL)
Best forMicroservices, real-time, edge, IoTHigh-throughput log streaming, EoS
┌────────────────────────────────────────┐
│ Pulsar Brokers (stateless) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Broker1 │ │ Broker2 │ │ Broker3 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└──────────────────┬─────────────────────┘
┌──────────────────┴─────────────────────┐
│ Apache BookKeeper │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Bookie1 │ │ Bookie2 │ │ Bookie3 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└────────────────────────────────────────┘
┌──────────────────┴─────────────────────┐
│ ZooKeeper / metadata │
└────────────────────────────────────────┘
┌──────────────────┴─────────────────────┐
│ Tiered storage: S3 / GCS / HDFS │
└────────────────────────────────────────┘

Брокеры — stateless. Реально хранят данные Bookies (BookKeeper).

Ledger в BookKeeper — append-only log, реплицирован на N bookies. Topic = sequence ledgers.

Преимущество разделения:

  • Брокер можно перезапустить без data movement.
  • Storage и compute масштабируются независимо.
  • Bookies можно добавлять под рост хранения, brokers — под рост чтения.
SubscriptionПоведение
ExclusiveТолько один consumer.
FailoverОдин active, остальные standby.
SharedRound-robin между consumer’ами. Нет order.
Key_SharedHash by key → consumer. Order внутри ключа.

Это больше гибкости, чем в Kafka: в Kafka выбор «consumer group» = почти эквивалент Failover. В Pulsar один топик одновременно может иметь все 4 типа подписок.

  • Persistent (persistent://tenant/ns/topic) — на BookKeeper.
  • Non-persistent (non-persistent://...) — в памяти брокера, для real-time.

Partitioned topics:

Окно терминала
bin/pulsar-admin topics create-partitioned-topic persistent://my/ns/events -p 16

Под капотом — 16 internal topics. Producer’ам прозрачно.

Окно терминала
bin/pulsar-admin topics offload-status persistent://my/ns/events

Старые segments выгружаются на S3 / GCS. Брокер читает с S3 для исторических consumer’ов (медленнее, но дешевле).

Конфиг:

managedLedgerOffloadDriver=aws-s3
s3ManagedLedgerOffloadBucket=my-pulsar-archive
managedLedgerOffloadAutoTriggerSizeThresholdBytes=10737418240 # 10GB
tenant: sales | logistics | hr
namespace: orders | invoices
topic: orders.created | invoices.draft

Quotas и authz — на уровне namespace. Идеально для SaaS.

В Pulsar — встроенный schema registry. AVRO, JSON-schema, Protobuf. Не отдельный сервис (как Confluent SR), а часть кластера. Schema эволюционирует с проверкой совместимости.

Lightweight serverless поверх Pulsar. Функции на Java/Python/Go, обрабатывают сообщения inline. Аналог Kafka Streams, но проще.

import "github.com/apache/pulsar-client-go/pulsar"
client, _ := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://broker:6650"})
defer client.Close()
// Producer
prod, _ := client.CreateProducer(pulsar.ProducerOptions{
Topic: "persistent://my/ns/events",
BatchingMaxPublishDelay: 10 * time.Millisecond,
CompressionType: pulsar.ZSTD,
})
_, _ = prod.Send(ctx, &pulsar.ProducerMessage{Payload: []byte("hi"), Key: "user-1"})
// Consumer (Key_Shared)
cons, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://my/ns/events",
SubscriptionName: "workers",
Type: pulsar.KeyShared,
})
for msg := range cons.Chan() {
handle(msg.Payload())
cons.Ack(msg)
}
АспектPulsarKafka
ArchitectureBrokers + BookKeeper + ZK (3 layers)Brokers + ZK/KRaft
StoragePluggable, tiered (S3 built-in)Local disk (S3 — через Connect)
Multi-tenancyNative (tenant/ns/topic)Через ACL + topic naming
Subscription4 типа в одном топикеТолько Consumer Group
Geo-replicationBuilt-inЧерез MirrorMaker 2
Schema RegistryBuilt-inВнешний (Confluent SR)
EcosystemМеньшеОгромный
OperationalСложнее (3 кластера)Проще (после KRaft)
Best forMulti-tenant SaaS, infinite retentionThroughput, event log

Если 2+ consumer’а с разными filter’ами одновременно — стрим может удалить сообщение, что нужно одному из них (если другой acknowledge’нул раньше). WorkQueue = single consumer (или строго filter’ы без overlap).

Push отправляет сообщения по NATS. Если клиент не успевает — буфер переполняется → drops или disconnect. Используй Pull для production.

Если сообщение не ack’нуто после MaxDeliver — оно просто прекращает доставляться (terminated). Нет встроенного DLQ. Реализуй DLQ вручную (см. файл 31).

Без репликации потеря узла = потеря данных. Всегда Replicas: 3 для важных streams.

Memory storage не выживает рестарт. Если думал, что данные «постоянны» — облом.

Если приложение делает nc.Subscribe(...) (без JetStream API), сообщения из JetStream stream тоже доставляются — но БЕЗ ack, persistent semantics. Легко перепутать. Используй js.Subscribe(...) (legacy) или consumer.Consume(...) (новый API).

Если ни один subscriber не отвечает на subject в nc.Request() — будет timeout, не ошибка «нет получателя». Различай явно.

Если ZK partitioned, Pulsar brokers могут «потерять» свои ledgers. Восстановление — через pulsar-admin namespaces unload. ZK — слабое место, мигрируй на metadata store K8s + etcd (есть proposal).

Если subscription отстаёт и backlog растёт — Pulsar накапливает ledgers, диск растёт. Настрой backlog quotas:

Окно терминала
bin/pulsar-admin namespaces set-backlog-quota my/ns -l 10G -p producer_request_hold

Если 90% сообщений идут с одним и тем же key — Key_Shared отправит их одному consumer’у → перегрузка. Уменьшай key cardinality аккуратно.

Чтение с S3 в 1000x медленнее, чем с BookKeeper. Не подходит для real-time replay, только для исторических запросов.

Default — FORWARD_TRANSITIVE. Несовместимые изменения отвергаются. Понимай policy перед миграцией схем.

History: N хранит N последних версий до tombstone/compaction. TTL удаляет по времени. Они работают вместе, но overlap не очевиден.

Mirror stream — данные да, consumer state — нет. После failover к mirror’у консьюмеры начнут с DeliverAll.

Stateful function после restart должен прогреться. На критичном пути может быть latency 1-3 секунды.


Контекст: 5000 промышленных предприятий, каждое присылает датчики (~1000 metrics/sec на предприятие). Главный кластер в Москве.

Решение:

  • Каждое предприятие — leaf NATS node (Raspberry-Pi class).
  • Локальные сенсоры публикуют factory.<id>.sensor.<name>.
  • Leaf фильтрует/агрегирует (1 значение в секунду вместо 100) перед отправкой в main.
  • Main cluster — JetStream с retention 30 дней.

Throughput main снизился в 100 раз, latency для локальных аналитик — < 5 мс.

Контекст: маленький стартап, нужно service discovery + config + locks. Не хотят админить Consul.

Решение:

  • NATS JetStream cluster (3 ноды).
  • KV bucket для service registry: services.<name>.<instance> → {addr, last_seen}.
  • TTL 10s, services делают update каждые 3 секунды.
  • KV bucket для feature flags, читается через Watch — мгновенные обновления в Go-сервисах.

Заменил Consul + etcd одним кластером NATS. -2 системы в проде.

Контекст: B2B SaaS-провайдер, 200 клиентов. Нужна изоляция и ограничения per-tenant.

Решение:

  • В Pulsar — tenant = клиент, namespace = environment (prod/staging).
  • Quotas: max storage, max throughput per namespace.
  • Каждый клиент получает creds только на свой tenant.
  • Старые данные авто-выгружаются на S3 (tiered storage).

Cost per tenant — predictable. У клиентов нет доступа к чужим топикам даже на уровне Pulsar.

Контекст: банк с присутствием в РФ и Казахстане. Регуляторные требования — backup в обоих регионах.

Решение:

  • Два Pulsar кластера (Moscow + Almaty).
  • Namespace настроен с geo-replication: [moscow, almaty].
  • Сообщения автоматически копируются.
  • Local consumer’ы читают только локальный кластер (lower latency).

При DR — переключение namespace на read-only в Moscow за минуты.

Контекст: ~30 микросервисов, нужна шина событий с durability, но без overhead Kafka.

Решение:

  • NATS JetStream cluster (3 узла).
  • Streams: orders, payments, notifications.
  • Каждый сервис — JetStream pull consumer с durable.
  • DLQ-pattern (отдельный stream *_dlq, см. файл 31).

Operational overhead — почти 0. Один Go-binary, без ZK/KRaft. Запустили за день.


  1. Что такое NATS Core и чем он отличается от JetStream?
  2. Что такое subject hierarchy в NATS? Чем отличается от Kafka topics?
  3. Что такое stream в JetStream? Какие retention policies?
  4. Чем push consumer отличается от pull consumer в JetStream?
  5. Какие AckPolicy есть в JetStream и какая default?
  6. Что такое durable vs ephemeral consumer?
  7. Что такое FilterSubject и зачем?
  8. Как работает MaxDeliver и почему нужен DLQ pattern руками?
  9. Что такое JetStream KV-store и для каких задач?
  10. Что такое Watch в KV и как реализован?
  11. Что такое JetStream Object Store?
  12. Что такое Mirror и Sourcing streams?
  13. Что такое leaf node и super-cluster в NATS?
  14. Каковы limits JetStream по throughput vs Kafka?
  15. Когда выбрать NATS, когда Kafka?
  16. Что такое Apache Pulsar и какие у него три слоя архитектуры?
  17. Зачем нужен BookKeeper и что такое ledger?
  18. Какие типы subscriptions есть в Pulsar и в чём разница?
  19. Что такое Key_Shared subscription? Какие у неё гарантии?
  20. Что такое tiered storage в Pulsar и как настроить S3 offload?
  21. Что такое namespace в Pulsar? Зачем multi-tenancy?
  22. Что такое Pulsar Functions?
  23. Как Pulsar делает geo-replication?
  24. В чём преимущество отделения brokers от storage в Pulsar?
  25. Какие risks при backlog в Pulsar и как их предотвратить?

Окно терминала
docker run -p 4222:4222 -p 8222:8222 nats:2.10 -js
nc, _ := nats.Connect("nats://localhost:4222")
js, _ := jetstream.New(nc)
ctx := context.Background()
js.CreateStream(ctx, jetstream.StreamConfig{
Name: "DEMO", Subjects: []string{"demo.>"},
})
js.Publish(ctx, "demo.hello", []byte("world"))
cons, _ := js.CreateOrUpdateConsumer(ctx, "DEMO", jetstream.ConsumerConfig{
Durable: "c1", AckPolicy: jetstream.AckExplicitPolicy,
})
msgs, _ := cons.Fetch(10)
for m := range msgs.Messages() {
fmt.Println(string(m.Data()))
m.Ack()
}
kv, _ := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "cfg", History: 5})
kv.Put(ctx, "flag.x", []byte("true"))
e, _ := kv.Get(ctx, "flag.x")
fmt.Println(string(e.Value()))
w, _ := kv.Watch(ctx, "flag.>")
go func() {
for u := range w.Updates() { if u != nil { fmt.Println("UPDATE:", u.Key(), string(u.Value())) } }
}()
kv.Put(ctx, "flag.y", []byte("false"))
Окно терминала
docker run -d --name pulsar -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:3.2 \
bin/pulsar standalone
client, _ := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
prod, _ := client.CreateProducer(pulsar.ProducerOptions{Topic: "persistent://public/default/demo"})
prod.Send(ctx, &pulsar.ProducerMessage{Payload: []byte("hi")})
cons, _ := client.Subscribe(pulsar.ConsumerOptions{
Topic: "persistent://public/default/demo",
SubscriptionName: "sub", Type: pulsar.Shared,
})
for m := range cons.Chan() { fmt.Println(string(m.Payload())); cons.Ack(m) }

Сравни JetStream vs Pulsar vs Kafka на одном hardware:

  • Throughput single producer / consumer.
  • p99 latency.
  • Storage footprint при retention 1h.

Подними второй NATS server, настрой как leaf к первому:

leafnodes { remotes: [{url: "nats://main:7422"}] }

Опубликуй на leaf — увидь на main.

Подними два Pulsar standalone в Docker, настрой replicate namespace, проверь, что сообщения видны в обоих.


  1. NATS Documentation. https://docs.nats.io/
  2. JetStream Documentation. https://docs.nats.io/nats-concepts/jetstream
  3. nats.go. https://github.com/nats-io/nats.go
  4. NATS by Example. https://natsbyexample.com/
  5. «NATS: Connecting Everything» — Derek Collison. Talks / Synadia blog.
  6. Apache Pulsar Documentation. https://pulsar.apache.org/docs/
  7. pulsar-client-go. https://github.com/apache/pulsar-client-go
  8. «Apache Pulsar in Action» — David Kjerrumgaard, Manning, 2021.
  9. Pulsar vs Kafka — StreamNative blog. https://streamnative.io/blog
  10. BookKeeper Internals. https://bookkeeper.apache.org/docs/getting-started/concepts
  11. Synadia / NATS production patterns. https://www.synadia.com/blog
  12. «Designing Data-Intensive Applications» — M. Kleppmann. Глава об messaging systems.
  13. YouTube: NATS / Pulsar Summit talks 2024-2025.
  14. Synadia Cloud (managed NATS). https://www.synadia.com/cloud
  15. StreamNative Cloud (managed Pulsar). https://streamnative.io/

В production NATS используется operator/account/user иерархия (multi-tenant):

operator
└─ account: app
├─ user: backend
└─ user: worker
└─ account: analytics
└─ user: reader

JWT-based auth, инструмент nsc для генерации:

Окно терминала
nsc add operator -n prod
nsc add account -n app
nsc add user -a app -n backend
nsc edit user -n backend --allow-pub "orders.>" --allow-sub "responses.>"

В nats-server.conf:

operator: "/etc/nats/operator.jwt"
resolver: {
type: full
dir: "/etc/nats/jwt"
}
system_account: "SYSADMIN"

Это даёт строгую изоляцию: account A не видит subject’ы account B даже при subscribe >.

consumerConfig := jetstream.ConsumerConfig{
Name: "payment-processor-v1",
Durable: "payment-processor-v1", // persistent
FilterSubject: "payments.created",
AckPolicy: jetstream.AckExplicitPolicy,
DeliverPolicy: jetstream.DeliverAllPolicy,
MaxDeliver: 5,
MaxAckPending: 500,
AckWait: 30 * time.Second,
Backoff: []time.Duration{1*time.Second, 5*time.Second, 30*time.Second, 5*time.Minute, 30*time.Minute},
MaxRequestBatch: 200,
}

Backoff (NATS 2.8+) — массив delay’ев для retry attempts. Replaces «всегда AckWait».

Окно терминала
nats stream view ORDERS
nats stream backup ORDERS /tmp/orders.bak
nats stream restore ORDERS /tmp/orders.bak

Backup’ы — обычный tar с file storage. Можно копировать между кластерами.

Окно терминала
bin/pulsar-admin schemas set-schema-validation-enforced my/ns --enable
bin/pulsar-admin schemas set-compatibility-strategy --compatibility-strategy FORWARD_TRANSITIVE my/ns
StrategyЧто разрешено
BACKWARDСтарая схема может читать новые данные
FORWARDНовая схема может читать старые данные
FULLИ то и другое
BACKWARD_TRANSITIVEПо всем предыдущим версиям
ALWAYS_COMPATIBLEПринимаем всё (опасно)
ALWAYS_INCOMPATIBLEЗапрещаем любую эволюцию (для критичных таблиц)
Окно терминала
bin/pulsar-admin functions create \
--name uppercaser \
--inputs persistent://my/ns/input \
--output persistent://my/ns/output \
--classname org.example.UppercaserFn \
--jar /path/to/uppercaser.jar

Pulsar Function deployment modes:

  • process — отдельный процесс на брокере.
  • thread — поток внутри брокера (low overhead).
  • kubernetes — отдельный pod (recommended для prod).

Приложение F. Bench-сценарии NATS vs Kafka vs Pulsar (типичные числа)

Заголовок раздела «Приложение F. Bench-сценарии NATS vs Kafka vs Pulsar (типичные числа)»
СценарийNATS JSKafkaPulsar
Pub/sub без persistence (Core)5-10M/sn/an/a
Persistent throughput single partition100-500k/s500k-1M/s200-500k/s
Persistent total cluster1-3M/s5-10M/s3-7M/s
End-to-end latency p99 (acks=all)1-5ms5-20ms5-15ms
Memory footprint per broker200-500MB4-32GB2-8GB
Time to bootstrap fresh node< 30s1-5 min2-10 min

⚠️ Числа индикативные. Реальные зависят от сообщения size, диска, сети.

Спроси себя:

  1. Нужна ли infinite retention (события за годы)? → Pulsar (tiered) или Kafka + S3.
  2. Нужны ли transactions EoS между топиками? → Kafka.
  3. Нужен ли KV-store рядом с очередью и хотим минимум сервисов? → NATS JS.
  4. Edge/IoT с локальной агрегацией? → NATS leaf nodes.
  5. Multi-tenant SaaS с строгой изоляцией? → Pulsar.
  6. Low-latency RPC между микросервисами? → NATS Core (request-reply).
  7. Огромный поток логов, сжатие, real-time analytics через ClickHouse? → Kafka.
  8. Geo-replication с минимумом конфига? → Pulsar (built-in).

Tarantool тоже умеет «очереди» через модуль queue:

local queue = require('queue')
queue.create_tube('tasks', 'fifo')
queue.tube.tasks:put('hello')
local task = queue.tube.tasks:take(5) -- блокирующее get
queue.tube.tasks:ack(task[1])

⚠️ Это не replacement для Kafka/NATS — Tarantool queue хорош для тысяч задач, не миллионов событий в секунду. Хорош, когда уже стоит Tarantool как DB + хочется простой очереди без отдельной системы.

Endpoint /varz, /jsz, /connz — JSON метрики.

Key metrics:

  • jsz.streams.<name>.state.messages — глубина очереди.
  • jsz.consumers.<name>.num_pending — backlog consumer’а.
  • jsz.consumers.<name>.num_ack_pending — сколько висит в полёте.
  • varz.slow_consumers — счётчик slow consumers (push mode).
  • varz.total_connections — health.

Prometheus exporter: nats-prometheus-exporter (official).

pulsar-admin broker-stats topics | jq .
pulsar-admin topics stats persistent://my/ns/topic
pulsar-admin topics stats-internal persistent://my/ns/topic

Key metrics (Prometheus exporter native):

  • pulsar_subscription_msg_rate_out.
  • pulsar_subscription_backlog.
  • pulsar_producer_msg_rate_in.
  • pulsar_storage_size.
  • pulsar_ml_AddEntryLatency — disk write latency BookKeeper.
  • bookkeeper_journal_JOURNAL_ADD_ENTRY — latency append.