Stream Processing и CDC: обработка неограниченных потоков данных
Зачем знать на Middle 3: Tech lead обязан проектировать системы real-time аналитики, fraud detection, рекомендаций. Stream processing — это не просто «consumer в цикле», а целая дисциплина с event time, watermarks, windowing и stateful processing. CDC (Change Data Capture) — мост между OLTP и аналитикой. Без понимания этих концепций ты будешь строить системы, которые «работают, пока не сломаются».
Содержание
Заголовок раздела «Содержание»- Концепция: что такое stream processing
- Глубже: event time, watermarks, windowing, stateful processing
- Gotchas: late data, out-of-order events, exactly-once
- Real cases: fraud detection, real-time dashboards, CDC pipelines
- Вопросы (20+)
- Practice: построить stream pipeline на Go
- Источники
1. Концепция
Заголовок раздела «1. Концепция»1.1 Unbounded data streams vs batch
Заголовок раздела «1.1 Unbounded data streams vs batch»Batch processing:
- Конечный объём данных (например, лог за день)
- Read → transform → write
- Латентность: минуты/часы
- Tool: Hadoop MapReduce, Spark batch, Airflow
Stream processing:
- Бесконечный (unbounded) поток событий
- События приходят непрерывно
- Латентность: миллисекунды/секунды
- Tool: Kafka Streams, Flink, Materialize, Goka
Batch:[========== bounded data ==========] ↓ process all ↓ result
Stream:event1 → event2 → event3 → event4 → ... (бесконечно) ↓ ↓ ↓ ↓process process process process ↓ ↓ ↓ ↓state state state state (накопление)1.2 Ключевые свойства streaming systems
Заголовок раздела «1.2 Ключевые свойства streaming systems»| Свойство | Описание |
|---|---|
| Low latency | < 1 секунды от события до результата |
| High throughput | Миллионы событий/сек |
| Stateful | Хранение промежуточного состояния (счётчики, окна) |
| Fault-tolerant | Восстановление после сбоев без потери данных |
| Scalable | Горизонтальное масштабирование (партиции) |
| Exactly-once | Каждое событие обработано ровно один раз |
1.3 Event time vs Processing time
Заголовок раздела «1.3 Event time vs Processing time»Processing time — момент, когда система получила/обработала событие. Event time — момент, когда событие реально произошло.
real world: t=10:00:00 user clicked button ↓mobile cache: t=10:00:00 → t=10:05:00 (offline, ждёт сеть) ↓backend: t=10:05:30 received event ↓processor: t=10:05:35 processed event
event_time = 10:00:00 (когда кликнули)processing_time = 10:05:35 (когда обработали)Почему критично:
- Если считать «клики за окно 10:00–10:00:59 по processing time», то event попадёт в окно 10:05–10:05:59 — неверно.
- Аналитика должна работать с event time.
1.4 Tools для stream processing
Заголовок раздела «1.4 Tools для stream processing»| Tool | Язык | Особенности |
|---|---|---|
| Kafka Streams | Java | Library, не отдельный кластер, простой DSL |
| Apache Flink | Java/Scala | Stateful, low-latency, продвинутые join’ы |
| Spark Streaming | Scala/Python | Микро-батчи (Structured Streaming), мощный для ETL |
| Materialize | Rust | PostgreSQL-compatible SQL → streaming materialized views |
| RisingWave | Rust | Аналог Materialize, cloud-native |
| ksqlDB | Java | SQL поверх Kafka Streams |
| Apache Beam | Multi | Унифицированная модель batch + stream |
| Goka (Go) | Go | Lightweight, основан на Kafka, не настоящий «полноценный» framework |
| Benthos / Redpanda Connect | Go | Stream processor as a service |
1.5 Stream processing в Go: реальность 2026
Заголовок раздела «1.5 Stream processing в Go: реальность 2026»Плохая новость: в Go нет «зрелого» полноценного stream processing framework масштаба Flink.
Хорошая новость: для большинства задач хватает:
- Kafka consumer/producer (
segmentio/kafka-go,twmb/franz-go) - Локального state store (Redis, BadgerDB, embedded RocksDB)
- Goka (lovoo/goka) — для простых stateful-pipeline
- Benthos (теперь Redpanda Connect) — declarative
Когда хватает Go:
- Простые фильтры/трансформации
- Aggregation с external state (Redis)
- Routing/enrichment events
Когда нужен Flink/Kafka Streams:
- Stream-stream joins с большими окнами
- Сложный stateful processing (миллионы ключей)
- Exactly-once между несколькими топиками
- Stateful миграции через savepoints
2. Глубже: ключевые концепции
Заголовок раздела «2. Глубже: ключевые концепции»2.1 Watermarks
Заголовок раздела «2.1 Watermarks»Watermark — метка времени, которая говорит «события с event_time < W уже получены (или мы решили их игнорировать)».
events: 1:00 → 1:05 → 1:03 → 1:08 → 1:07 → 1:12 ↑ out-of-order!
watermark = max(event_time) - allowed_lateness = 1:12 - 5min = 1:07
окно 1:00–1:04:59: - закрывается когда watermark >= 1:05 - события с event_time < 1:05, пришедшие позже, считаются lateСтратегии для late events:
- Drop (выбросить)
- Update (пересчитать окно)
- Side output (отдельный поток для late events)
2.2 Windowing
Заголовок раздела «2.2 Windowing»Tumbling windows — фиксированные, не пересекающиеся:
|—— 10:00–10:05 ——|—— 10:05–10:10 ——|—— 10:10–10:15 ——|Sliding windows — пересекающиеся, с интервалом меньше размера:
|———— 10:00–10:05 ————| |———— 10:01–10:06 ————| |———— 10:02–10:07 ————|Session windows — динамические, на основе gap:
events: e1 e2 e3 gap > 5min e4 e5 |session 1| |session 2|Global window — одна общая, без границ (custom trigger).
Go pseudo-code (через Kafka Streams концептуально, но Go-style):
// Tumbling window 5 минутtype Window struct { Start, End time.Time Counts map[string]int // userID → click count}
func (p *Processor) Process(event Event) { bucket := event.EventTime.Truncate(5 * time.Minute) win := p.windows[bucket] if win == nil { win = &Window{Start: bucket, End: bucket.Add(5 * time.Minute), Counts: map[string]int{}} p.windows[bucket] = win } win.Counts[event.UserID]++
// Закрываем окна, когда watermark > window.End p.maybeCloseWindows(event.EventTime)}2.3 Stateful processing
Заголовок раздела «2.3 Stateful processing»Stateless: filter, map, projection — не нужен state.
Stateful:
- aggregation (count, sum, avg)
- joins (соединение streams)
- deduplication (idempotency)
- enrichment (lookup из таблицы)
Где хранить state:
| Store | Pros | Cons |
|---|---|---|
| In-memory map | Быстро | Потеря при рестарте |
| Redis | External, быстро | Network latency, отдельный сервис |
| BadgerDB / Pebble | Embedded, на диске | Per-instance, нужна репликация |
| RocksDB (через CGO) | Production-grade, как в Kafka Streams | CGO, сложность |
| Kafka changelog topic | Failover, replication | Зависимость от Kafka |
Pattern «changelog topic»:
- Каждое изменение state публикуется в compacted Kafka topic.
- При рестарте processor читает топик и восстанавливает state.
state update → write to local KV → emit to changelog topic ↓ (recovery on restart)2.4 Exactly-once semantics
Заголовок раздела «2.4 Exactly-once semantics»At-most-once: возможна потеря (no retry). At-least-once: дубли возможны (retry). Exactly-once: ровно один раз (идемпотентно или через транзакции).
Kafka transactions (с 0.11+):
- Consumer читает, processor обрабатывает, producer пишет — всё в одной transaction.
isolation.level=read_committedу downstream consumers.
Go (franz-go) пример:
client, _ := kgo.NewClient( kgo.SeedBrokers("localhost:9092"), kgo.TransactionalID("my-processor-1"), kgo.RequireStableFetchOffsets(),)
err := client.BeginTransaction()// process input records...// produce output records to another topicerr = client.EndTransaction(ctx, kgo.TryCommit)Альтернатива: idempotency keys (event ID) + dedup store.
2.5 Joins в stream processing
Заголовок раздела «2.5 Joins в stream processing»Stream-stream join — соединение двух streams по ключу за период:
clicks (stream): userID, articleID, timestamppurchases (stream): userID, productID, timestamp
JOIN ON userID WITHIN 10min → did click lead to purchase?Stream-table join — обогащение stream данными из таблицы:
clicks (stream): userID, articleIDusers (table): userID → email, name
JOIN → clicks обогащены email/nameMaterialized table — projection из stream:
orders (stream) → table(userID → total_spent)2.6 CDC (Change Data Capture)
Заголовок раздела «2.6 CDC (Change Data Capture)»Что: механизм отслеживания изменений в БД и публикация их как поток событий.
Подходы:
-
Trigger-based — триггеры пишут в таблицу-журнал.
- Pros: работает везде. Cons: нагрузка на DB.
-
Polling —
SELECT WHERE updated_at > last_check.- Pros: просто. Cons: нагрузка, миссинг deletes.
-
Log-based (WAL/binlog) — читаем WAL.
- Pros: low overhead, captures all changes including DELETE.
- Cons: сложность, version-dependent.
Инструменты CDC:
- Debezium — самый популярный, читает WAL PG/MySQL/Mongo и публикует в Kafka.
- Maxwell — для MySQL.
- Kafka Connect — фреймворк, в нём Debezium как connector.
- PgOutput + pgx logical replication — нативный Go-подход.
Поток с Debezium:
PostgreSQL WAL ↓Debezium connector (Kafka Connect) ↓Kafka topic (db.public.orders) ↓Stream processor (Flink / Goka / your Go service) ↓Real-time analytics / search index / cache2.7 CDC в Go: pgx + logical replication
Заголовок раздела «2.7 CDC в Go: pgx + logical replication»import "github.com/jackc/pglogrepl"
conn, _ := pgconn.Connect(ctx, "postgres://user:pass@host/db?replication=database")
// Create publicationconn.Exec(ctx, "CREATE PUBLICATION my_pub FOR TABLE orders, users")
// Create replication slotpglogrepl.CreateReplicationSlot(ctx, conn, "my_slot", "pgoutput", pglogrepl.CreateReplicationSlotOptions{Temporary: false})
// Start replicationpglogrepl.StartReplication(ctx, conn, "my_slot", pglogrepl.LSN(0), pglogrepl.StartReplicationOptions{PluginArgs: []string{ "proto_version '1'", "publication_names 'my_pub'", }})
// Consume messagesfor { msg, _ := conn.ReceiveMessage(ctx) // Parse pgoutput message // Publish to Kafka}2.8 Goka в Go (если нужен «настоящий» stream processor)
Заголовок раздела «2.8 Goka в Go (если нужен «настоящий» stream processor)»import "github.com/lovoo/goka"
// Define processorg := goka.DefineGroup("click-counter", goka.Input("clicks", new(ClickCodec), func(ctx goka.Context, msg interface{}) { // ctx.Value() — текущий state по ключу var count int64 if v := ctx.Value(); v != nil { count = v.(int64) } count++ ctx.SetValue(count) // обновить state (хранится в Kafka changelog topic) }), goka.Persist(new(Int64Codec)), // state codec)
p, _ := goka.NewProcessor([]string{"localhost:9092"}, g)p.Run(ctx)Goka достоинства:
- Простой DSL
- Использует Kafka для changelog (recovery built-in)
- Hot/cold replicas
Goka ограничения:
- Нет polished windowing
- Нет stream-stream joins (только stream-table)
- Маленькое community
2.9 Real-time dashboards (классический pipeline)
Заголовок раздела «2.9 Real-time dashboards (классический pipeline)»Apps → Kafka (clicks topic) ↓ Stream processor (Goka / Flink) ↓ Aggregation per 1min window ↓ ClickHouse / Druid / TimescaleDB ↓ Grafana / Superset2.10 Materialize / RisingWave — SQL stream processing
Заголовок раздела «2.10 Materialize / RisingWave — SQL stream processing»-- Materialize / RisingWave (CDC + streaming SQL)CREATE SOURCE clicks FROM KAFKA BROKER 'kafka:9092' TOPIC 'clicks';
-- Materialized view = continuously updated stream aggregateCREATE MATERIALIZED VIEW clicks_per_minute ASSELECT DATE_TRUNC('minute', event_time) AS minute, article_id, COUNT(*) AS click_countFROM clicksGROUP BY 1, 2;Из Go это просто SELECT * FROM clicks_per_minute через pgx — выглядит как обычная PG-таблица, но всегда актуальна.
3. Gotchas
Заголовок раздела «3. Gotchas»3.1 ⚠️ Smoothie out-of-order events
Заголовок раздела «3.1 ⚠️ Smoothie out-of-order events»События приходят не в порядке event_time. Watermarks помогают, но late events нужно либо дропать (лосс данных), либо переоткрывать окна (сложность).
3.2 ⚠️ State unbounded growth
Заголовок раздела «3.2 ⚠️ State unbounded growth»Если ключей много (например, session windows per user), state может расти бесконечно. Нужно TTL и compaction.
3.3 ⚠️ Exactly-once — не «бесплатно»
Заголовок раздела «3.3 ⚠️ Exactly-once — не «бесплатно»»EOS требует Kafka transactions + idempotent producer + read_committed consumers. Throughput падает на ~20–30%. Иногда дешевле дедуп по ID на стороне consumer.
3.4 ⚠️ Watermark heuristic, не truth
Заголовок раздела «3.4 ⚠️ Watermark heuristic, не truth»Watermark = эвристика. Если выберешь слишком ранний — много late events. Слишком поздний — высокая latency. Tune под workload.
3.5 ⚠️ Rebalance в Kafka теряет local state
Заголовок раздела «3.5 ⚠️ Rebalance в Kafka теряет local state»Когда партиция переходит к другому consumer, локальный state «остаётся» на старой ноде. Нужен changelog topic для восстановления.
3.6 ⚠️ CDC и schema changes
Заголовок раздела «3.6 ⚠️ CDC и schema changes»ALTER TABLE ломает Debezium pipeline, если downstream не учитывает. Используй Schema Registry с compatibility checks.
3.7 ⚠️ Backpressure
Заголовок раздела «3.7 ⚠️ Backpressure»Если downstream медленный, stream processor должен либо буферизировать (память), либо замедлять чтение из Kafka (offset не коммитим). Без этого OOM.
3.8 ⚠️ Time skew между нодами
Заголовок раздела «3.8 ⚠️ Time skew между нодами»Если event_time проставляется на клиентах с разным временем — chaos. Решение: проставлять server-side timestamp + клиентский в payload для аналитики.
3.9 ⚠️ Reprocessing — не всегда возможно
Заголовок раздела «3.9 ⚠️ Reprocessing — не всегда возможно»Replay данных из Kafka работает, если retention длинная. Иначе нужен Kafka tiered storage (S3) или отдельный архив.
3.10 ⚠️ Goka — не Flink
Заголовок раздела «3.10 ⚠️ Goka — не Flink»Goka хорош для простых случаев, но если нужны сложные joins/windows — он будет тебя ограничивать. Не строй на нём enterprise pipeline без понимания.
3.11 ⚠️ CDC и initial snapshot
Заголовок раздела «3.11 ⚠️ CDC и initial snapshot»При первом запуске Debezium делает full snapshot таблицы. Для больших таблиц — часы, lock issues. Используй incremental snapshots (Debezium 1.6+).
3.12 ⚠️ Stream-stream join — окно нельзя сделать слишком большим
Заголовок раздела «3.12 ⚠️ Stream-stream join — окно нельзя сделать слишком большим»Если окно join = 1 час, state хранит часовой объём событий. При high QPS — десятки ГБ.
4. Real cases
Заголовок раздела «4. Real cases»4.1 Fraud detection (банкинг)
Заголовок раздела «4.1 Fraud detection (банкинг)»Transactions (Kafka) ↓Stream processor: - Stream-table join с user profile (history) - Sliding window 1h: count tx > 5 → flag - Geo anomaly: tx_loc != user.usual_country → flag - ML score (call external model) ↓If flagged → publish to "fraud-alerts" topic ↓Block transaction (sync API) или async ticketТехнологии: Flink (часто) + Kafka + Redis/ScyllaDB для feature store.
4.2 Real-time recommendations (e-commerce)
Заголовок раздела «4.2 Real-time recommendations (e-commerce)»User events: view, click, add_to_cart, purchase ↓Aggregations per user (last 24h, last session) ↓Online feature store (Redis/Feast) ↓ML scoring (TensorFlow Serving / KServe) ↓API returns ranked recommendationsTech leads строят это с командой ML — твоя зона: ingestion, feature pipeline, low-latency lookup.
4.3 CDC для search indexing
Заголовок раздела «4.3 CDC для search indexing»PostgreSQL (orders, products) ↓Debezium → Kafka ↓Go consumer: - Парсит изменения - Обновляет Elasticsearch/OpenSearch index ↓Search всегда актуален (delay ~секунды)Альтернатива: dual write — не делай так (рассинхронизация неизбежна).
4.4 Avito-style real-time analytics
Заголовок раздела «4.4 Avito-style real-time analytics»- Все user actions → Kafka.
- ClickHouse как stream sink (Kafka engine in ClickHouse).
- Дашборды в Grafana / Superset.
- Денежные метрики — отдельный pipeline с EOS.
4.5 Log analytics (ELK alternative)
Заголовок раздела «4.5 Log analytics (ELK alternative)»App logs → Vector / Filebeat → Kafka → stream processor - Парсинг JSON - Маскирование PII - Enrichment с user_id ↓ ClickHouse / Loki / OpenSearch ↓ Alerts (Slack/PagerDuty) при error spikeВ Go: Vector написан на Rust, но Benthos/Redpanda Connect на Go.
4.6 Audit log pipeline (compliance)
Заголовок раздела «4.6 Audit log pipeline (compliance)»DB CDC → Kafka → S3 (Parquet, partitioned by day) ↓ Trino/Athena queries для аудитаTech lead роль: choose between immutable append-only vs mutable replicas, retention strategy, GDPR right-to-delete.
5. Вопросы (20+)
Заголовок раздела «5. Вопросы (20+)»- Чем event time отличается от processing time?
- Что такое watermark, как влияет на latency и точность?
- Опиши tumbling, sliding и session windows. Когда какое?
- Как организовать stateful processing в Go? Где хранить state?
- Что такое exactly-once в Kafka? Как реализуется?
- Чем stream-stream join отличается от stream-table?
- Что такое CDC? Чем log-based лучше polling-based?
- Опиши pipeline Debezium → Kafka → consumer.
- Какие проблемы решает Schema Registry в streaming?
- Как обрабатывать late events?
- Что делать при rebalance Kafka, если у тебя локальный state?
- Чем Goka отличается от Flink? Когда использовать каждый?
- Что такое changelog topic в Kafka Streams / Goka?
- Как избежать unbounded state growth?
- Сравни Materialize/RisingWave с Flink. В чём преимущества SQL-подхода?
- Опиши backpressure. Как его реализовать в Go-консьюмере?
- Как реплицировать stream processor для HA?
- Что такое initial snapshot в Debezium и какие у него проблемы?
- Как мониторить stream pipeline? Какие ключевые метрики?
- Опиши real-time fraud detection pipeline.
- Как сделать reprocessing данных за прошлый месяц?
- Зачем нужны idempotency keys в потоковой обработке?
- Как обрабатывать schema evolution в Kafka topic с тысячами consumers?
- Какие компромиссы между throughput и latency?
- Когда стоит выбрать batch вместо stream?
6. Practice
Заголовок раздела «6. Practice»Задача 1: построить CDC pipeline
Заголовок раздела «Задача 1: построить CDC pipeline»- Поднять PostgreSQL + Debezium + Kafka в Docker.
- Создать таблицу
orders, настроить CDC. - На Go написать consumer, который пишет события в ClickHouse.
- Замерить latency end-to-end.
Задача 2: streaming aggregation
Заголовок раздела «Задача 2: streaming aggregation»- Kafka topic с событиями
click {userID, articleID, ts}. - На Go (через Goka или вручную):
- Окно 1 минута (tumbling).
- Сохранять count в Redis с TTL 1 час.
- HTTP API для чтения
/clicks?article=X&window=last_5min.
Задача 3: dedup pipeline
Заголовок раздела «Задача 3: dedup pipeline»- На Go: consumer Kafka.
- Каждое сообщение имеет
event_id. - Проверять в Redis (SETNX с TTL) — если уже обработано, скипнуть.
- Иначе — обработать + записать в Redis.
- Замерить, как меняется throughput при duplicate rate 0%, 10%, 50%.
Задача 4: stream-table join
Заголовок раздела «Задача 4: stream-table join»- Kafka topic
clicks {userID, articleID}. - Postgres
users {id, country}. - Goroutine кеширует users в memory + слушает CDC от Debezium.
- Consumer обогащает clicks полем country.
- Метрика: % обогащённых событий, latency lookup.
Задача 5: fraud detection MVP
Заголовок раздела «Задача 5: fraud detection MVP»- Транзакции в Kafka.
- На Go:
- Sliding window 1h: count tx по user_id.
- Если > 10 → publish в topic
fraud-alerts.
- Persist state в Redis (sorted set с expiration).
- Тест: при рестарте процесса state восстанавливается.
7. Источники
Заголовок раздела «7. Источники»- Tyler Akidau — Streaming 101 & 102 (статьи O’Reilly): https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/
- Tyler Akidau, Slava Chernyak, Reuven Lax — Streaming Systems (книга, O’Reilly, 2018)
- Confluent docs — Kafka Streams: https://kafka.apache.org/documentation/streams/
- Apache Flink documentation: https://nightlies.apache.org/flink/flink-docs-stable/
- Debezium — CDC for databases: https://debezium.io/documentation/
- lovoo/goka — Go stream processing: https://github.com/lovoo/goka
- Materialize blog — Streaming SQL fundamentals: https://materialize.com/blog/
- RisingWave docs: https://docs.risingwave.com/
- Confluent — Exactly-once semantics: https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
- Martin Kleppmann — Designing Data-Intensive Applications (главы про streams)
- Redpanda Connect (бывший Benthos): https://docs.redpanda.com/redpanda-connect/
- pglogrepl Go library: https://github.com/jackc/pglogrepl
- Trino streaming SQL: https://trino.io/docs/current/
- ClickHouse Kafka engine: https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka
Резюме. Stream processing — это сдвиг ментальной модели: от batch к unbounded streams, от processing time к event time, от stateless к stateful. В Go нет «Flink», но есть Kafka + Goka/franz-go + Redis/embedded KV, чего хватает для 80% задач. CDC через Debezium даёт мост между OLTP и streaming. Tech lead не «пишет вручную windowing» — он выбирает правильные инструменты под latency/throughput/state requirements и понимает trade-offs.