30. NATS JetStream + Apache Pulsar
Зачем знать на Middle 3: Kafka — не единственный messaging-стек на свете. NATS JetStream — лёгкая, low-latency, edge-friendly альтернатива с KV/Object store «из коробки». Apache Pulsar — облачно-нативный конкурент Kafka с tiered storage, multi-tenancy и geo-replication. Senior должен понимать, какая система под какую задачу: high-throughput log streaming → Kafka, real-time микросервисная RPC + KV → NATS, multi-tenant SaaS / infinite retention → Pulsar. Без этого выбираешь по «модности» и потом мучаешься с архитектурой.
Содержание
Заголовок раздела «Содержание»1. Концепция
Заголовок раздела «1. Концепция»NATS / JetStream
Заголовок раздела «NATS / JetStream»NATS — open-source messaging, написан на Go, минималистичный. Базовый NATS = pub/sub без persistence, миллион сообщений в секунду, latency микросекунды.
JetStream (с NATS 2.2) — persistent layer поверх NATS. Streams, consumers, KV-store, Object-store.
Subject-based messaging: иерархия subject’ов (orders.created, orders.updated, wildcard orders.*). Это главное отличие от Kafka: вместо строгих топиков — гибкая иерархия.
Apache Pulsar
Заголовок раздела «Apache Pulsar»Apache Pulsar — distributed messaging, Yahoo origin, top-level Apache. Главные особенности:
- Архитектура с разделением compute (brokers) и storage (BookKeeper).
- Multi-tenancy native: tenants → namespaces → topics.
- Tiered storage: горячие данные на BookKeeper, холодные — на S3/GCS.
- Geo-replication built-in.
NATS Pulsar ┌───────────┐ ┌──────────────────┐ │ │ │ Brokers (st.) │ Publishers │ NATS │ Consumers │ ──────────── │ ─────────► │ Server │ ──────────► │ BookKeeper │ │ JetStream │ │ (storage) │ │ (store) │ │ ──────────── │ └───────────┘ │ ZK / metadata │ └──────────────────┘ │ Tiered → S32. Production-deep dive
Заголовок раздела «2. Production-deep dive»2.1. NATS Core (non-JetStream)
Заголовок раздела «2.1. NATS Core (non-JetStream)»Базовый NATS — fire-and-forget pub/sub. Нет persistence, нет offset’ов, нет retention.
import "github.com/nats-io/nats.go"
nc, _ := nats.Connect(nats.DefaultURL)defer nc.Drain()
// Publishnc.Publish("orders.created", []byte(`{"id":1}`))
// Subscribe (async)sub, _ := nc.Subscribe("orders.*", func(m *nats.Msg) { fmt.Println(string(m.Data))})Request-Reply (синхронный RPC через NATS):
// servernc.Subscribe("svc.greet", func(m *nats.Msg) { m.Respond([]byte("hello " + string(m.Data)))})
// clientresp, _ := nc.Request("svc.greet", []byte("alice"), 2*time.Second)fmt.Println(string(resp.Data))⚠️ NATS Core — это «UDP-like» для distributed apps. Если клиент не подписан в момент publish — сообщение потеряно.
2.2. JetStream Streams
Заголовок раздела «2.2. JetStream Streams»Stream — это persistent log с subject mapping. По сути аналог Kafka topic, но:
- В stream могут собираться несколько subjects (
orders.>— все суб-сабжекты). - Один stream может перекрывать иерархию.
js, _ := jetstream.New(nc)ctx := context.Background()
stream, err := js.CreateStream(ctx, jetstream.StreamConfig{ Name: "ORDERS", Subjects: []string{"orders.>"}, Storage: jetstream.FileStorage, Replicas: 3, MaxBytes: 10 * 1024 * 1024 * 1024, // 10GB MaxAge: 7 * 24 * time.Hour, Retention: jetstream.LimitsPolicy, Discard: jetstream.DiscardOld,})Retention policies:
| Policy | Когда удалять |
|---|---|
LimitsPolicy | По max_bytes / max_age / max_msgs (как Kafka) |
WorkQueuePolicy | После ack хотя бы одним consumer’ом (как «очередь») |
InterestPolicy | Пока есть подписчики (consumer’ы) |
Discard policy:
DiscardOld— выкидывает старые сообщения, когда полно.DiscardNew— отвергает новые publish при полном stream’е.
Storage:
FileStorage— на диск (default, durable).MemoryStorage— RAM (быстро, теряется при рестарте).
Replication: R=3 (typical). Координация — Raft под капотом (Raft группа на каждый stream).
2.3. JetStream Consumers
Заголовок раздела «2.3. JetStream Consumers»Consumer = stateful подписка на stream. Хранит свой offset (sequence).
Push vs Pull:
| Тип | Поведение |
|---|---|
| Push | JetStream отправляет сообщения на subject (как обычный NATS sub). |
| Pull | Клиент сам делает Fetch — точный контроль скорости, batching. |
Recommended: Pull-consumers для production. Естественный backpressure.
// Pull consumercons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Name: "worker", Durable: "worker", AckPolicy: jetstream.AckExplicitPolicy, DeliverPolicy: jetstream.DeliverAllPolicy, FilterSubject: "orders.created", MaxDeliver: 5, // до 5 ретраев AckWait: 30 * time.Second, // через сколько считать not-ack'нутым})
for { msgs, err := cons.Fetch(100, jetstream.FetchMaxWait(5*time.Second)) if err != nil { continue } for msg := range msgs.Messages() { handle(msg.Data()) msg.Ack() }}Ack policies:
| Policy | Поведение |
|---|---|
AckNone | Без ack. JetStream удаляет сразу при доставке. |
AckAll | Ack высокого offset подтверждает все ниже. |
AckExplicit | Каждое сообщение требует свой Ack. Recommended. |
Durable vs Ephemeral:
- Durable — имеет имя, переживёт рестарт клиента, продолжит с того же offset.
- Ephemeral — без имени, удаляется при inactivity.
Filter subject — consumer слушает только нужный subset stream’а (например, orders.created, а не все orders.>).
Replay options:
DeliverAllPolicy— с начала stream’а.DeliverLastPolicy— только последнее.DeliverNewPolicy— только новые.DeliverByStartSequencePolicy— с конкретного seq.DeliverByStartTimePolicy— с конкретного времени.
2.4. JetStream KV Store
Заголовок раздела «2.4. JetStream KV Store»Built-in key-value store поверх JetStream:
kv, _ := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{ Bucket: "config", History: 10, // храним 10 версий каждого ключа Replicas: 3, TTL: 24 * time.Hour,})
kv.Put(ctx, "app.feature.new_ui", []byte("true"))entry, _ := kv.Get(ctx, "app.feature.new_ui")fmt.Println(string(entry.Value()))
// Watch — реактивные обновленияw, _ := kv.Watch(ctx, "app.>")for entry := range w.Updates() { if entry == nil { continue } // initial sync done fmt.Println(entry.Key(), string(entry.Value()))}⚠️ KV в NATS — это stream под капотом с compaction по ключу. Полезен для:
- Конфигурации.
- Service discovery.
- Распределённых locks.
- Feature flags.
Альтернатива Redis/etcd/Consul в небольших проектах.
2.5. Object Store
Заголовок раздела «2.5. Object Store»Бинарные большие объекты (файлы, медиа). Чанкуются и хранятся в JetStream.
obj, _ := js.CreateOrUpdateObjectStore(ctx, jetstream.ObjectStoreConfig{ Bucket: "uploads",})
_, _ = obj.PutBytes(ctx, "avatar.jpg", largeBytes)data, _ := obj.GetBytes(ctx, "avatar.jpg")Полезно для embedded scenarios (IoT, edge), где S3 недоступен.
2.6. Mirror и Sourcing streams
Заголовок раздела «2.6. Mirror и Sourcing streams»Mirror stream — точная копия другого stream’а (read-only).
js.CreateStream(ctx, jetstream.StreamConfig{ Name: "ORDERS_MIRROR", Mirror: &jetstream.StreamSource{ Name: "ORDERS" },})Sourcing stream — multi-source агрегатор:
js.CreateStream(ctx, jetstream.StreamConfig{ Name: "ALL_EVENTS", Sources: []*jetstream.StreamSource{ {Name: "ORDERS"}, {Name: "PAYMENTS"}, {Name: "REFUNDS"}, },})2.7. Leaf nodes и Super-cluster
Заголовок раздела «2.7. Leaf nodes и Super-cluster»Leaf node — отдельный NATS-сервер, который подключается к main кластеру как клиент, но при этом сам обслуживает локальных клиентов.
┌────────────────────┐ │ Main NATS cluster│ (HQ data center) └─────────┬──────────┘ │ secure TLS ┌────────────┼────────────┐ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ Leaf RU │ │ Leaf US │ │ Leaf EU │ └─────────┘ └─────────┘ └─────────┘ │ │ │ IoT devs edge devs plant devsUse case: IoT, edge computing. Local clients не видят latency главного кластера. Pre-aggregation и фильтрация — на leaf.
Super-cluster — несколько NATS clusters в разных регионах, соединены gateway’ями. Subject hierarchy общая, но трафик роутится «прыжками» только когда есть локальный подписчик.
2.8. JetStream production setup
Заголовок раздела «2.8. JetStream production setup»# nats-server.confserver_name: nats-1listen: 0.0.0.0:4222http_port: 8222
cluster { name: nats-prod listen: 0.0.0.0:6222 routes: [nats-route://nats-2:6222, nats-route://nats-3:6222]}
jetstream { store_dir: /var/lib/nats max_mem: 4G max_file: 200G}
leafnodes { port: 7422}
authorization { users: [ {user: app, password: "...", permissions: {publish: ["orders.>"], subscribe: ["responses.>"]}} ]}Operator-mode через nats-account-server / nsc-tool — для multi-tenant и iso между namespaces. В production советуется.
2.9. NATS vs Kafka
Заголовок раздела «2.9. NATS vs Kafka»| Аспект | NATS JetStream | Kafka |
|---|---|---|
| Persistence | Optional (Core fire-and-forget) | Always persistent |
| Subject hierarchy | Гибкая (orders.>, wildcard) | Жёсткие topic names |
| Throughput | Высокий (~1M msg/s per stream) | Очень высокий (~5M+ msg/s) |
| Latency | Микросекунды (Core) | Миллисекунды |
| EoS | Через message dedup + idempotency keys (нет встроенных tx) | Transactional producer + read_committed |
| Operational complexity | Низкая (один Go-binary) | Высокая (Brokers + ZK/KRaft) |
| KV/Object store | Встроены | Нет (нужен внешний) |
| Edge / leaf nodes | Да (leaf nodes) | Нет (MirrorMaker — кластер↔кластер) |
| Ecosystem | Меньше | Огромный (Connect, Streams, KSQL) |
| Best for | Microservices, real-time, edge, IoT | High-throughput log streaming, EoS |
2.10. Apache Pulsar — Architecture
Заголовок раздела «2.10. Apache Pulsar — Architecture» ┌────────────────────────────────────────┐ │ Pulsar Brokers (stateless) │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ Broker1 │ │ Broker2 │ │ Broker3 │ │ │ └─────────┘ └─────────┘ └─────────┘ │ └──────────────────┬─────────────────────┘ │ ┌──────────────────┴─────────────────────┐ │ Apache BookKeeper │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │ Bookie1 │ │ Bookie2 │ │ Bookie3 │ │ │ └─────────┘ └─────────┘ └─────────┘ │ └────────────────────────────────────────┘ │ ┌──────────────────┴─────────────────────┐ │ ZooKeeper / metadata │ └────────────────────────────────────────┘ │ ┌──────────────────┴─────────────────────┐ │ Tiered storage: S3 / GCS / HDFS │ └────────────────────────────────────────┘Брокеры — stateless. Реально хранят данные Bookies (BookKeeper).
Ledger в BookKeeper — append-only log, реплицирован на N bookies. Topic = sequence ledgers.
Преимущество разделения:
- Брокер можно перезапустить без data movement.
- Storage и compute масштабируются независимо.
- Bookies можно добавлять под рост хранения, brokers — под рост чтения.
2.11. Pulsar Subscriptions
Заголовок раздела «2.11. Pulsar Subscriptions»| Subscription | Поведение |
|---|---|
| Exclusive | Только один consumer. |
| Failover | Один active, остальные standby. |
| Shared | Round-robin между consumer’ами. Нет order. |
| Key_Shared | Hash by key → consumer. Order внутри ключа. |
Это больше гибкости, чем в Kafka: в Kafka выбор «consumer group» = почти эквивалент Failover. В Pulsar один топик одновременно может иметь все 4 типа подписок.
2.12. Pulsar topics
Заголовок раздела «2.12. Pulsar topics»- Persistent (
persistent://tenant/ns/topic) — на BookKeeper. - Non-persistent (
non-persistent://...) — в памяти брокера, для real-time.
Partitioned topics:
bin/pulsar-admin topics create-partitioned-topic persistent://my/ns/events -p 16Под капотом — 16 internal topics. Producer’ам прозрачно.
2.13. Tiered storage
Заголовок раздела «2.13. Tiered storage»bin/pulsar-admin topics offload-status persistent://my/ns/eventsСтарые segments выгружаются на S3 / GCS. Брокер читает с S3 для исторических consumer’ов (медленнее, но дешевле).
Конфиг:
managedLedgerOffloadDriver=aws-s3s3ManagedLedgerOffloadBucket=my-pulsar-archivemanagedLedgerOffloadAutoTriggerSizeThresholdBytes=10737418240 # 10GB2.14. Multi-tenancy
Заголовок раздела «2.14. Multi-tenancy»tenant: sales | logistics | hr namespace: orders | invoices topic: orders.created | invoices.draftQuotas и authz — на уровне namespace. Идеально для SaaS.
2.15. Schema Registry
Заголовок раздела «2.15. Schema Registry»В Pulsar — встроенный schema registry. AVRO, JSON-schema, Protobuf. Не отдельный сервис (как Confluent SR), а часть кластера. Schema эволюционирует с проверкой совместимости.
2.16. Pulsar Functions
Заголовок раздела «2.16. Pulsar Functions»Lightweight serverless поверх Pulsar. Функции на Java/Python/Go, обрабатывают сообщения inline. Аналог Kafka Streams, но проще.
2.17. Go-client Pulsar
Заголовок раздела «2.17. Go-client Pulsar»import "github.com/apache/pulsar-client-go/pulsar"
client, _ := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://broker:6650"})defer client.Close()
// Producerprod, _ := client.CreateProducer(pulsar.ProducerOptions{ Topic: "persistent://my/ns/events", BatchingMaxPublishDelay: 10 * time.Millisecond, CompressionType: pulsar.ZSTD,})_, _ = prod.Send(ctx, &pulsar.ProducerMessage{Payload: []byte("hi"), Key: "user-1"})
// Consumer (Key_Shared)cons, _ := client.Subscribe(pulsar.ConsumerOptions{ Topic: "persistent://my/ns/events", SubscriptionName: "workers", Type: pulsar.KeyShared,})for msg := range cons.Chan() { handle(msg.Payload()) cons.Ack(msg)}2.18. Pulsar vs Kafka
Заголовок раздела «2.18. Pulsar vs Kafka»| Аспект | Pulsar | Kafka |
|---|---|---|
| Architecture | Brokers + BookKeeper + ZK (3 layers) | Brokers + ZK/KRaft |
| Storage | Pluggable, tiered (S3 built-in) | Local disk (S3 — через Connect) |
| Multi-tenancy | Native (tenant/ns/topic) | Через ACL + topic naming |
| Subscription | 4 типа в одном топике | Только Consumer Group |
| Geo-replication | Built-in | Через MirrorMaker 2 |
| Schema Registry | Built-in | Внешний (Confluent SR) |
| Ecosystem | Меньше | Огромный |
| Operational | Сложнее (3 кластера) | Проще (после KRaft) |
| Best for | Multi-tenant SaaS, infinite retention | Throughput, event log |
3. Gotchas (12+)
Заголовок раздела «3. Gotchas (12+)»⚠️ 1. JetStream WorkQueuePolicy и multiple consumers
Заголовок раздела «⚠️ 1. JetStream WorkQueuePolicy и multiple consumers»Если 2+ consumer’а с разными filter’ами одновременно — стрим может удалить сообщение, что нужно одному из них (если другой acknowledge’нул раньше). WorkQueue = single consumer (или строго filter’ы без overlap).
⚠️ 2. JetStream Push consumer и slow client
Заголовок раздела «⚠️ 2. JetStream Push consumer и slow client»Push отправляет сообщения по NATS. Если клиент не успевает — буфер переполняется → drops или disconnect. Используй Pull для production.
⚠️ 3. JetStream MaxDeliver и DLQ
Заголовок раздела «⚠️ 3. JetStream MaxDeliver и DLQ»Если сообщение не ack’нуто после MaxDeliver — оно просто прекращает доставляться (terminated). Нет встроенного DLQ. Реализуй DLQ вручную (см. файл 31).
⚠️ 4. JetStream Replicas = 1 в проде
Заголовок раздела «⚠️ 4. JetStream Replicas = 1 в проде»Без репликации потеря узла = потеря данных. Всегда Replicas: 3 для важных streams.
⚠️ 5. JetStream Storage = Memory + рестарт
Заголовок раздела «⚠️ 5. JetStream Storage = Memory + рестарт»Memory storage не выживает рестарт. Если думал, что данные «постоянны» — облом.
⚠️ 6. NATS Core vs JetStream subscriptions
Заголовок раздела «⚠️ 6. NATS Core vs JetStream subscriptions»Если приложение делает nc.Subscribe(...) (без JetStream API), сообщения из JetStream stream тоже доставляются — но БЕЗ ack, persistent semantics. Легко перепутать. Используй js.Subscribe(...) (legacy) или consumer.Consume(...) (новый API).
⚠️ 7. NATS request-reply timeout
Заголовок раздела «⚠️ 7. NATS request-reply timeout»Если ни один subscriber не отвечает на subject в nc.Request() — будет timeout, не ошибка «нет получателя». Различай явно.
⚠️ 8. Pulsar: stale ZK = stale broker list
Заголовок раздела «⚠️ 8. Pulsar: stale ZK = stale broker list»Если ZK partitioned, Pulsar brokers могут «потерять» свои ledgers. Восстановление — через pulsar-admin namespaces unload. ZK — слабое место, мигрируй на metadata store K8s + etcd (есть proposal).
⚠️ 9. Pulsar: бэклог и retention policies
Заголовок раздела «⚠️ 9. Pulsar: бэклог и retention policies»Если subscription отстаёт и backlog растёт — Pulsar накапливает ledgers, диск растёт. Настрой backlog quotas:
bin/pulsar-admin namespaces set-backlog-quota my/ns -l 10G -p producer_request_hold⚠️ 10. Pulsar Key_Shared + key skew
Заголовок раздела «⚠️ 10. Pulsar Key_Shared + key skew»Если 90% сообщений идут с одним и тем же key — Key_Shared отправит их одному consumer’у → перегрузка. Уменьшай key cardinality аккуратно.
⚠️ 11. Tiered storage latency
Заголовок раздела «⚠️ 11. Tiered storage latency»Чтение с S3 в 1000x медленнее, чем с BookKeeper. Не подходит для real-time replay, только для исторических запросов.
⚠️ 12. Pulsar schema compatibility
Заголовок раздела «⚠️ 12. Pulsar schema compatibility»Default — FORWARD_TRANSITIVE. Несовместимые изменения отвергаются. Понимай policy перед миграцией схем.
⚠️ 13. JetStream KV: history vs TTL
Заголовок раздела «⚠️ 13. JetStream KV: history vs TTL»History: N хранит N последних версий до tombstone/compaction. TTL удаляет по времени. Они работают вместе, но overlap не очевиден.
⚠️ 14. JetStream Mirror не реплицирует consumer’ов
Заголовок раздела «⚠️ 14. JetStream Mirror не реплицирует consumer’ов»Mirror stream — данные да, consumer state — нет. После failover к mirror’у консьюмеры начнут с DeliverAll.
⚠️ 15. Pulsar Functions cold start
Заголовок раздела «⚠️ 15. Pulsar Functions cold start»Stateful function после restart должен прогреться. На критичном пути может быть latency 1-3 секунды.
4. Real cases
Заголовок раздела «4. Real cases»Case 1: IoT платформа на NATS leaf nodes
Заголовок раздела «Case 1: IoT платформа на NATS leaf nodes»Контекст: 5000 промышленных предприятий, каждое присылает датчики (~1000 metrics/sec на предприятие). Главный кластер в Москве.
Решение:
- Каждое предприятие — leaf NATS node (Raspberry-Pi class).
- Локальные сенсоры публикуют
factory.<id>.sensor.<name>. - Leaf фильтрует/агрегирует (1 значение в секунду вместо 100) перед отправкой в main.
- Main cluster — JetStream с retention 30 дней.
Throughput main снизился в 100 раз, latency для локальных аналитик — < 5 мс.
Case 2: KV-store вместо Consul
Заголовок раздела «Case 2: KV-store вместо Consul»Контекст: маленький стартап, нужно service discovery + config + locks. Не хотят админить Consul.
Решение:
- NATS JetStream cluster (3 ноды).
- KV bucket для service registry:
services.<name>.<instance> → {addr, last_seen}. - TTL 10s, services делают update каждые 3 секунды.
- KV bucket для feature flags, читается через Watch — мгновенные обновления в Go-сервисах.
Заменил Consul + etcd одним кластером NATS. -2 системы в проде.
Case 3: Pulsar для multi-tenant SaaS
Заголовок раздела «Case 3: Pulsar для multi-tenant SaaS»Контекст: B2B SaaS-провайдер, 200 клиентов. Нужна изоляция и ограничения per-tenant.
Решение:
- В Pulsar — tenant = клиент, namespace = environment (prod/staging).
- Quotas: max storage, max throughput per namespace.
- Каждый клиент получает creds только на свой tenant.
- Старые данные авто-выгружаются на S3 (tiered storage).
Cost per tenant — predictable. У клиентов нет доступа к чужим топикам даже на уровне Pulsar.
Case 4: Pulsar geo-replication
Заголовок раздела «Case 4: Pulsar geo-replication»Контекст: банк с присутствием в РФ и Казахстане. Регуляторные требования — backup в обоих регионах.
Решение:
- Два Pulsar кластера (Moscow + Almaty).
- Namespace настроен с
geo-replication: [moscow, almaty]. - Сообщения автоматически копируются.
- Local consumer’ы читают только локальный кластер (lower latency).
При DR — переключение namespace на read-only в Moscow за минуты.
Case 5: NATS JetStream для микросервисной шины
Заголовок раздела «Case 5: NATS JetStream для микросервисной шины»Контекст: ~30 микросервисов, нужна шина событий с durability, но без overhead Kafka.
Решение:
- NATS JetStream cluster (3 узла).
- Streams:
orders,payments,notifications. - Каждый сервис — JetStream pull consumer с durable.
- DLQ-pattern (отдельный stream
*_dlq, см. файл 31).
Operational overhead — почти 0. Один Go-binary, без ZK/KRaft. Запустили за день.
5. Вопросы (25)
Заголовок раздела «5. Вопросы (25)»- Что такое NATS Core и чем он отличается от JetStream?
- Что такое subject hierarchy в NATS? Чем отличается от Kafka topics?
- Что такое stream в JetStream? Какие retention policies?
- Чем push consumer отличается от pull consumer в JetStream?
- Какие AckPolicy есть в JetStream и какая default?
- Что такое durable vs ephemeral consumer?
- Что такое FilterSubject и зачем?
- Как работает MaxDeliver и почему нужен DLQ pattern руками?
- Что такое JetStream KV-store и для каких задач?
- Что такое Watch в KV и как реализован?
- Что такое JetStream Object Store?
- Что такое Mirror и Sourcing streams?
- Что такое leaf node и super-cluster в NATS?
- Каковы limits JetStream по throughput vs Kafka?
- Когда выбрать NATS, когда Kafka?
- Что такое Apache Pulsar и какие у него три слоя архитектуры?
- Зачем нужен BookKeeper и что такое ledger?
- Какие типы subscriptions есть в Pulsar и в чём разница?
- Что такое Key_Shared subscription? Какие у неё гарантии?
- Что такое tiered storage в Pulsar и как настроить S3 offload?
- Что такое namespace в Pulsar? Зачем multi-tenancy?
- Что такое Pulsar Functions?
- Как Pulsar делает geo-replication?
- В чём преимущество отделения brokers от storage в Pulsar?
- Какие risks при backlog в Pulsar и как их предотвратить?
6. Practice
Заголовок раздела «6. Practice»6.1. Локальный JetStream
Заголовок раздела «6.1. Локальный JetStream»docker run -p 4222:4222 -p 8222:8222 nats:2.10 -jsnc, _ := nats.Connect("nats://localhost:4222")js, _ := jetstream.New(nc)ctx := context.Background()js.CreateStream(ctx, jetstream.StreamConfig{ Name: "DEMO", Subjects: []string{"demo.>"},})js.Publish(ctx, "demo.hello", []byte("world"))cons, _ := js.CreateOrUpdateConsumer(ctx, "DEMO", jetstream.ConsumerConfig{ Durable: "c1", AckPolicy: jetstream.AckExplicitPolicy,})msgs, _ := cons.Fetch(10)for m := range msgs.Messages() { fmt.Println(string(m.Data())) m.Ack()}6.2. NATS KV
Заголовок раздела «6.2. NATS KV»kv, _ := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "cfg", History: 5})kv.Put(ctx, "flag.x", []byte("true"))e, _ := kv.Get(ctx, "flag.x")fmt.Println(string(e.Value()))w, _ := kv.Watch(ctx, "flag.>")go func() { for u := range w.Updates() { if u != nil { fmt.Println("UPDATE:", u.Key(), string(u.Value())) } }}()kv.Put(ctx, "flag.y", []byte("false"))6.3. Pulsar standalone
Заголовок раздела «6.3. Pulsar standalone»docker run -d --name pulsar -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:3.2 \ bin/pulsar standaloneclient, _ := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})prod, _ := client.CreateProducer(pulsar.ProducerOptions{Topic: "persistent://public/default/demo"})prod.Send(ctx, &pulsar.ProducerMessage{Payload: []byte("hi")})
cons, _ := client.Subscribe(pulsar.ConsumerOptions{ Topic: "persistent://public/default/demo", SubscriptionName: "sub", Type: pulsar.Shared,})for m := range cons.Chan() { fmt.Println(string(m.Payload())); cons.Ack(m) }6.4. Bench
Заголовок раздела «6.4. Bench»Сравни JetStream vs Pulsar vs Kafka на одном hardware:
- Throughput single producer / consumer.
- p99 latency.
- Storage footprint при retention 1h.
6.5. Leaf node
Заголовок раздела «6.5. Leaf node»Подними второй NATS server, настрой как leaf к первому:
leafnodes { remotes: [{url: "nats://main:7422"}] }Опубликуй на leaf — увидь на main.
6.6. Pulsar geo-replication
Заголовок раздела «6.6. Pulsar geo-replication»Подними два Pulsar standalone в Docker, настрой replicate namespace, проверь, что сообщения видны в обоих.
7. Источники
Заголовок раздела «7. Источники»- NATS Documentation. https://docs.nats.io/
- JetStream Documentation. https://docs.nats.io/nats-concepts/jetstream
- nats.go. https://github.com/nats-io/nats.go
- NATS by Example. https://natsbyexample.com/
- «NATS: Connecting Everything» — Derek Collison. Talks / Synadia blog.
- Apache Pulsar Documentation. https://pulsar.apache.org/docs/
- pulsar-client-go. https://github.com/apache/pulsar-client-go
- «Apache Pulsar in Action» — David Kjerrumgaard, Manning, 2021.
- Pulsar vs Kafka — StreamNative blog. https://streamnative.io/blog
- BookKeeper Internals. https://bookkeeper.apache.org/docs/getting-started/concepts
- Synadia / NATS production patterns. https://www.synadia.com/blog
- «Designing Data-Intensive Applications» — M. Kleppmann. Глава об messaging systems.
- YouTube: NATS / Pulsar Summit talks 2024-2025.
- Synadia Cloud (managed NATS). https://www.synadia.com/cloud
- StreamNative Cloud (managed Pulsar). https://streamnative.io/
Приложение A. NATS Operator (NATS accounts model)
Заголовок раздела «Приложение A. NATS Operator (NATS accounts model)»В production NATS используется operator/account/user иерархия (multi-tenant):
operator └─ account: app ├─ user: backend └─ user: worker └─ account: analytics └─ user: readerJWT-based auth, инструмент nsc для генерации:
nsc add operator -n prodnsc add account -n appnsc add user -a app -n backendnsc edit user -n backend --allow-pub "orders.>" --allow-sub "responses.>"В nats-server.conf:
operator: "/etc/nats/operator.jwt"resolver: { type: full dir: "/etc/nats/jwt"}system_account: "SYSADMIN"Это даёт строгую изоляцию: account A не видит subject’ы account B даже при subscribe >.
Приложение B. JetStream consumer best practices
Заголовок раздела «Приложение B. JetStream consumer best practices»consumerConfig := jetstream.ConsumerConfig{ Name: "payment-processor-v1", Durable: "payment-processor-v1", // persistent FilterSubject: "payments.created", AckPolicy: jetstream.AckExplicitPolicy, DeliverPolicy: jetstream.DeliverAllPolicy, MaxDeliver: 5, MaxAckPending: 500, AckWait: 30 * time.Second, Backoff: []time.Duration{1*time.Second, 5*time.Second, 30*time.Second, 5*time.Minute, 30*time.Minute}, MaxRequestBatch: 200,}Backoff (NATS 2.8+) — массив delay’ев для retry attempts. Replaces «всегда AckWait».
Приложение C. JetStream — Stream replay для тестов
Заголовок раздела «Приложение C. JetStream — Stream replay для тестов»nats stream view ORDERSnats stream backup ORDERS /tmp/orders.baknats stream restore ORDERS /tmp/orders.bakBackup’ы — обычный tar с file storage. Можно копировать между кластерами.
Приложение D. Pulsar — Schema evolution policies
Заголовок раздела «Приложение D. Pulsar — Schema evolution policies»bin/pulsar-admin schemas set-schema-validation-enforced my/ns --enablebin/pulsar-admin schemas set-compatibility-strategy --compatibility-strategy FORWARD_TRANSITIVE my/ns| Strategy | Что разрешено |
|---|---|
| BACKWARD | Старая схема может читать новые данные |
| FORWARD | Новая схема может читать старые данные |
| FULL | И то и другое |
| BACKWARD_TRANSITIVE | По всем предыдущим версиям |
| ALWAYS_COMPATIBLE | Принимаем всё (опасно) |
| ALWAYS_INCOMPATIBLE | Запрещаем любую эволюцию (для критичных таблиц) |
Приложение E. Pulsar — Functions runtime
Заголовок раздела «Приложение E. Pulsar — Functions runtime»bin/pulsar-admin functions create \ --name uppercaser \ --inputs persistent://my/ns/input \ --output persistent://my/ns/output \ --classname org.example.UppercaserFn \ --jar /path/to/uppercaser.jarPulsar Function deployment modes:
process— отдельный процесс на брокере.thread— поток внутри брокера (low overhead).kubernetes— отдельный pod (recommended для prod).
Приложение F. Bench-сценарии NATS vs Kafka vs Pulsar (типичные числа)
Заголовок раздела «Приложение F. Bench-сценарии NATS vs Kafka vs Pulsar (типичные числа)»| Сценарий | NATS JS | Kafka | Pulsar |
|---|---|---|---|
| Pub/sub без persistence (Core) | 5-10M/s | n/a | n/a |
| Persistent throughput single partition | 100-500k/s | 500k-1M/s | 200-500k/s |
| Persistent total cluster | 1-3M/s | 5-10M/s | 3-7M/s |
| End-to-end latency p99 (acks=all) | 1-5ms | 5-20ms | 5-15ms |
| Memory footprint per broker | 200-500MB | 4-32GB | 2-8GB |
| Time to bootstrap fresh node | < 30s | 1-5 min | 2-10 min |
⚠️ Числа индикативные. Реальные зависят от сообщения size, диска, сети.
Приложение G. Choosing matrix (быстро)
Заголовок раздела «Приложение G. Choosing matrix (быстро)»Спроси себя:
- Нужна ли infinite retention (события за годы)? → Pulsar (tiered) или Kafka + S3.
- Нужны ли transactions EoS между топиками? → Kafka.
- Нужен ли KV-store рядом с очередью и хотим минимум сервисов? → NATS JS.
- Edge/IoT с локальной агрегацией? → NATS leaf nodes.
- Multi-tenant SaaS с строгой изоляцией? → Pulsar.
- Low-latency RPC между микросервисами? → NATS Core (request-reply).
- Огромный поток логов, сжатие, real-time analytics через ClickHouse? → Kafka.
- Geo-replication с минимумом конфига? → Pulsar (built-in).
Приложение H. Tarantool как «message bus»
Заголовок раздела «Приложение H. Tarantool как «message bus»»Tarantool тоже умеет «очереди» через модуль queue:
local queue = require('queue')queue.create_tube('tasks', 'fifo')queue.tube.tasks:put('hello')local task = queue.tube.tasks:take(5) -- блокирующее getqueue.tube.tasks:ack(task[1])⚠️ Это не replacement для Kafka/NATS — Tarantool queue хорош для тысяч задач, не миллионов событий в секунду. Хорош, когда уже стоит Tarantool как DB + хочется простой очереди без отдельной системы.
Приложение I. Production monitoring NATS
Заголовок раздела «Приложение I. Production monitoring NATS»Endpoint /varz, /jsz, /connz — JSON метрики.
Key metrics:
jsz.streams.<name>.state.messages— глубина очереди.jsz.consumers.<name>.num_pending— backlog consumer’а.jsz.consumers.<name>.num_ack_pending— сколько висит в полёте.varz.slow_consumers— счётчик slow consumers (push mode).varz.total_connections— health.
Prometheus exporter: nats-prometheus-exporter (official).
Приложение J. Production monitoring Pulsar
Заголовок раздела «Приложение J. Production monitoring Pulsar»pulsar-admin broker-stats topics | jq .pulsar-admin topics stats persistent://my/ns/topicpulsar-admin topics stats-internal persistent://my/ns/topicKey metrics (Prometheus exporter native):
pulsar_subscription_msg_rate_out.pulsar_subscription_backlog.pulsar_producer_msg_rate_in.pulsar_storage_size.pulsar_ml_AddEntryLatency— disk write latency BookKeeper.bookkeeper_journal_JOURNAL_ADD_ENTRY— latency append.