Перейти к содержимому

Stream Processing и CDC: обработка неограниченных потоков данных

Зачем знать на Middle 3: Tech lead обязан проектировать системы real-time аналитики, fraud detection, рекомендаций. Stream processing — это не просто «consumer в цикле», а целая дисциплина с event time, watermarks, windowing и stateful processing. CDC (Change Data Capture) — мост между OLTP и аналитикой. Без понимания этих концепций ты будешь строить системы, которые «работают, пока не сломаются».

  1. Концепция: что такое stream processing
  2. Глубже: event time, watermarks, windowing, stateful processing
  3. Gotchas: late data, out-of-order events, exactly-once
  4. Real cases: fraud detection, real-time dashboards, CDC pipelines
  5. Вопросы (20+)
  6. Practice: построить stream pipeline на Go
  7. Источники

Batch processing:

  • Конечный объём данных (например, лог за день)
  • Read → transform → write
  • Латентность: минуты/часы
  • Tool: Hadoop MapReduce, Spark batch, Airflow

Stream processing:

  • Бесконечный (unbounded) поток событий
  • События приходят непрерывно
  • Латентность: миллисекунды/секунды
  • Tool: Kafka Streams, Flink, Materialize, Goka
Batch:
[========== bounded data ==========]
process all
result
Stream:
event1 → event2 → event3 → event4 → ... (бесконечно)
↓ ↓ ↓ ↓
process process process process
↓ ↓ ↓ ↓
state state state state (накопление)
СвойствоОписание
Low latency< 1 секунды от события до результата
High throughputМиллионы событий/сек
StatefulХранение промежуточного состояния (счётчики, окна)
Fault-tolerantВосстановление после сбоев без потери данных
ScalableГоризонтальное масштабирование (партиции)
Exactly-onceКаждое событие обработано ровно один раз

Processing time — момент, когда система получила/обработала событие. Event time — момент, когда событие реально произошло.

real world: t=10:00:00 user clicked button
mobile cache: t=10:00:00 → t=10:05:00 (offline, ждёт сеть)
backend: t=10:05:30 received event
processor: t=10:05:35 processed event
event_time = 10:00:00 (когда кликнули)
processing_time = 10:05:35 (когда обработали)

Почему критично:

  • Если считать «клики за окно 10:00–10:00:59 по processing time», то event попадёт в окно 10:05–10:05:59 — неверно.
  • Аналитика должна работать с event time.
ToolЯзыкОсобенности
Kafka StreamsJavaLibrary, не отдельный кластер, простой DSL
Apache FlinkJava/ScalaStateful, low-latency, продвинутые join’ы
Spark StreamingScala/PythonМикро-батчи (Structured Streaming), мощный для ETL
MaterializeRustPostgreSQL-compatible SQL → streaming materialized views
RisingWaveRustАналог Materialize, cloud-native
ksqlDBJavaSQL поверх Kafka Streams
Apache BeamMultiУнифицированная модель batch + stream
Goka (Go)GoLightweight, основан на Kafka, не настоящий «полноценный» framework
Benthos / Redpanda ConnectGoStream processor as a service

Плохая новость: в Go нет «зрелого» полноценного stream processing framework масштаба Flink.

Хорошая новость: для большинства задач хватает:

  1. Kafka consumer/producer (segmentio/kafka-go, twmb/franz-go)
  2. Локального state store (Redis, BadgerDB, embedded RocksDB)
  3. Goka (lovoo/goka) — для простых stateful-pipeline
  4. Benthos (теперь Redpanda Connect) — declarative

Когда хватает Go:

  • Простые фильтры/трансформации
  • Aggregation с external state (Redis)
  • Routing/enrichment events

Когда нужен Flink/Kafka Streams:

  • Stream-stream joins с большими окнами
  • Сложный stateful processing (миллионы ключей)
  • Exactly-once между несколькими топиками
  • Stateful миграции через savepoints

Watermark — метка времени, которая говорит «события с event_time < W уже получены (или мы решили их игнорировать)».

events: 1:00 → 1:05 → 1:03 → 1:08 → 1:07 → 1:12
↑ out-of-order!
watermark = max(event_time) - allowed_lateness
= 1:12 - 5min = 1:07
окно 1:00–1:04:59:
- закрывается когда watermark >= 1:05
- события с event_time < 1:05, пришедшие позже, считаются late

Стратегии для late events:

  • Drop (выбросить)
  • Update (пересчитать окно)
  • Side output (отдельный поток для late events)

Tumbling windows — фиксированные, не пересекающиеся:

|—— 10:00–10:05 ——|—— 10:05–10:10 ——|—— 10:10–10:15 ——|

Sliding windows — пересекающиеся, с интервалом меньше размера:

|———— 10:00–10:05 ————|
|———— 10:01–10:06 ————|
|———— 10:02–10:07 ————|

Session windows — динамические, на основе gap:

events: e1 e2 e3 gap > 5min e4 e5
|session 1| |session 2|

Global window — одна общая, без границ (custom trigger).

Go pseudo-code (через Kafka Streams концептуально, но Go-style):

// Tumbling window 5 минут
type Window struct {
Start, End time.Time
Counts map[string]int // userID → click count
}
func (p *Processor) Process(event Event) {
bucket := event.EventTime.Truncate(5 * time.Minute)
win := p.windows[bucket]
if win == nil {
win = &Window{Start: bucket, End: bucket.Add(5 * time.Minute), Counts: map[string]int{}}
p.windows[bucket] = win
}
win.Counts[event.UserID]++
// Закрываем окна, когда watermark > window.End
p.maybeCloseWindows(event.EventTime)
}

Stateless: filter, map, projection — не нужен state.

Stateful:

  • aggregation (count, sum, avg)
  • joins (соединение streams)
  • deduplication (idempotency)
  • enrichment (lookup из таблицы)

Где хранить state:

StoreProsCons
In-memory mapБыстроПотеря при рестарте
RedisExternal, быстроNetwork latency, отдельный сервис
BadgerDB / PebbleEmbedded, на дискеPer-instance, нужна репликация
RocksDB (через CGO)Production-grade, как в Kafka StreamsCGO, сложность
Kafka changelog topicFailover, replicationЗависимость от Kafka

Pattern «changelog topic»:

  • Каждое изменение state публикуется в compacted Kafka topic.
  • При рестарте processor читает топик и восстанавливает state.
state update → write to local KV → emit to changelog topic
(recovery on restart)

At-most-once: возможна потеря (no retry). At-least-once: дубли возможны (retry). Exactly-once: ровно один раз (идемпотентно или через транзакции).

Kafka transactions (с 0.11+):

  • Consumer читает, processor обрабатывает, producer пишет — всё в одной transaction.
  • isolation.level=read_committed у downstream consumers.

Go (franz-go) пример:

client, _ := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
kgo.TransactionalID("my-processor-1"),
kgo.RequireStableFetchOffsets(),
)
err := client.BeginTransaction()
// process input records...
// produce output records to another topic
err = client.EndTransaction(ctx, kgo.TryCommit)

Альтернатива: idempotency keys (event ID) + dedup store.

Stream-stream join — соединение двух streams по ключу за период:

clicks (stream): userID, articleID, timestamp
purchases (stream): userID, productID, timestamp
JOIN ON userID WITHIN 10min → did click lead to purchase?

Stream-table join — обогащение stream данными из таблицы:

clicks (stream): userID, articleID
users (table): userID → email, name
JOIN → clicks обогащены email/name

Materialized table — projection из stream:

orders (stream) → table(userID → total_spent)

Что: механизм отслеживания изменений в БД и публикация их как поток событий.

Подходы:

  1. Trigger-based — триггеры пишут в таблицу-журнал.

    • Pros: работает везде. Cons: нагрузка на DB.
  2. PollingSELECT WHERE updated_at > last_check.

    • Pros: просто. Cons: нагрузка, миссинг deletes.
  3. Log-based (WAL/binlog) — читаем WAL.

    • Pros: low overhead, captures all changes including DELETE.
    • Cons: сложность, version-dependent.

Инструменты CDC:

  • Debezium — самый популярный, читает WAL PG/MySQL/Mongo и публикует в Kafka.
  • Maxwell — для MySQL.
  • Kafka Connect — фреймворк, в нём Debezium как connector.
  • PgOutput + pgx logical replication — нативный Go-подход.

Поток с Debezium:

PostgreSQL WAL
Debezium connector (Kafka Connect)
Kafka topic (db.public.orders)
Stream processor (Flink / Goka / your Go service)
Real-time analytics / search index / cache
import "github.com/jackc/pglogrepl"
conn, _ := pgconn.Connect(ctx, "postgres://user:pass@host/db?replication=database")
// Create publication
conn.Exec(ctx, "CREATE PUBLICATION my_pub FOR TABLE orders, users")
// Create replication slot
pglogrepl.CreateReplicationSlot(ctx, conn, "my_slot", "pgoutput",
pglogrepl.CreateReplicationSlotOptions{Temporary: false})
// Start replication
pglogrepl.StartReplication(ctx, conn, "my_slot", pglogrepl.LSN(0),
pglogrepl.StartReplicationOptions{PluginArgs: []string{
"proto_version '1'", "publication_names 'my_pub'",
}})
// Consume messages
for {
msg, _ := conn.ReceiveMessage(ctx)
// Parse pgoutput message
// Publish to Kafka
}

2.8 Goka в Go (если нужен «настоящий» stream processor)

Заголовок раздела «2.8 Goka в Go (если нужен «настоящий» stream processor)»
import "github.com/lovoo/goka"
// Define processor
g := goka.DefineGroup("click-counter",
goka.Input("clicks", new(ClickCodec), func(ctx goka.Context, msg interface{}) {
// ctx.Value() — текущий state по ключу
var count int64
if v := ctx.Value(); v != nil {
count = v.(int64)
}
count++
ctx.SetValue(count) // обновить state (хранится в Kafka changelog topic)
}),
goka.Persist(new(Int64Codec)), // state codec
)
p, _ := goka.NewProcessor([]string{"localhost:9092"}, g)
p.Run(ctx)

Goka достоинства:

  • Простой DSL
  • Использует Kafka для changelog (recovery built-in)
  • Hot/cold replicas

Goka ограничения:

  • Нет polished windowing
  • Нет stream-stream joins (только stream-table)
  • Маленькое community
Apps → Kafka (clicks topic)
Stream processor (Goka / Flink)
Aggregation per 1min window
ClickHouse / Druid / TimescaleDB
Grafana / Superset
-- Materialize / RisingWave (CDC + streaming SQL)
CREATE SOURCE clicks FROM KAFKA BROKER 'kafka:9092' TOPIC 'clicks';
-- Materialized view = continuously updated stream aggregate
CREATE MATERIALIZED VIEW clicks_per_minute AS
SELECT
DATE_TRUNC('minute', event_time) AS minute,
article_id,
COUNT(*) AS click_count
FROM clicks
GROUP BY 1, 2;

Из Go это просто SELECT * FROM clicks_per_minute через pgx — выглядит как обычная PG-таблица, но всегда актуальна.


События приходят не в порядке event_time. Watermarks помогают, но late events нужно либо дропать (лосс данных), либо переоткрывать окна (сложность).

Если ключей много (например, session windows per user), state может расти бесконечно. Нужно TTL и compaction.

EOS требует Kafka transactions + idempotent producer + read_committed consumers. Throughput падает на ~20–30%. Иногда дешевле дедуп по ID на стороне consumer.

Watermark = эвристика. Если выберешь слишком ранний — много late events. Слишком поздний — высокая latency. Tune под workload.

Когда партиция переходит к другому consumer, локальный state «остаётся» на старой ноде. Нужен changelog topic для восстановления.

ALTER TABLE ломает Debezium pipeline, если downstream не учитывает. Используй Schema Registry с compatibility checks.

Если downstream медленный, stream processor должен либо буферизировать (память), либо замедлять чтение из Kafka (offset не коммитим). Без этого OOM.

Если event_time проставляется на клиентах с разным временем — chaos. Решение: проставлять server-side timestamp + клиентский в payload для аналитики.

Replay данных из Kafka работает, если retention длинная. Иначе нужен Kafka tiered storage (S3) или отдельный архив.

Goka хорош для простых случаев, но если нужны сложные joins/windows — он будет тебя ограничивать. Не строй на нём enterprise pipeline без понимания.

При первом запуске Debezium делает full snapshot таблицы. Для больших таблиц — часы, lock issues. Используй incremental snapshots (Debezium 1.6+).

3.12 ⚠️ Stream-stream join — окно нельзя сделать слишком большим

Заголовок раздела «3.12 ⚠️ Stream-stream join — окно нельзя сделать слишком большим»

Если окно join = 1 час, state хранит часовой объём событий. При high QPS — десятки ГБ.


Transactions (Kafka)
Stream processor:
- Stream-table join с user profile (history)
- Sliding window 1h: count tx > 5 → flag
- Geo anomaly: tx_loc != user.usual_country → flag
- ML score (call external model)
If flagged → publish to "fraud-alerts" topic
Block transaction (sync API) или async ticket

Технологии: Flink (часто) + Kafka + Redis/ScyllaDB для feature store.

User events: view, click, add_to_cart, purchase
Aggregations per user (last 24h, last session)
Online feature store (Redis/Feast)
ML scoring (TensorFlow Serving / KServe)
API returns ranked recommendations

Tech leads строят это с командой ML — твоя зона: ingestion, feature pipeline, low-latency lookup.

PostgreSQL (orders, products)
Debezium → Kafka
Go consumer:
- Парсит изменения
- Обновляет Elasticsearch/OpenSearch index
Search всегда актуален (delay ~секунды)

Альтернатива: dual write — не делай так (рассинхронизация неизбежна).

  • Все user actions → Kafka.
  • ClickHouse как stream sink (Kafka engine in ClickHouse).
  • Дашборды в Grafana / Superset.
  • Денежные метрики — отдельный pipeline с EOS.
App logs → Vector / Filebeat → Kafka → stream processor
- Парсинг JSON
- Маскирование PII
- Enrichment с user_id
ClickHouse / Loki / OpenSearch
Alerts (Slack/PagerDuty) при error spike

В Go: Vector написан на Rust, но Benthos/Redpanda Connect на Go.

DB CDC → Kafka → S3 (Parquet, partitioned by day)
Trino/Athena queries для аудита

Tech lead роль: choose between immutable append-only vs mutable replicas, retention strategy, GDPR right-to-delete.


  1. Чем event time отличается от processing time?
  2. Что такое watermark, как влияет на latency и точность?
  3. Опиши tumbling, sliding и session windows. Когда какое?
  4. Как организовать stateful processing в Go? Где хранить state?
  5. Что такое exactly-once в Kafka? Как реализуется?
  6. Чем stream-stream join отличается от stream-table?
  7. Что такое CDC? Чем log-based лучше polling-based?
  8. Опиши pipeline Debezium → Kafka → consumer.
  9. Какие проблемы решает Schema Registry в streaming?
  10. Как обрабатывать late events?
  11. Что делать при rebalance Kafka, если у тебя локальный state?
  12. Чем Goka отличается от Flink? Когда использовать каждый?
  13. Что такое changelog topic в Kafka Streams / Goka?
  14. Как избежать unbounded state growth?
  15. Сравни Materialize/RisingWave с Flink. В чём преимущества SQL-подхода?
  16. Опиши backpressure. Как его реализовать в Go-консьюмере?
  17. Как реплицировать stream processor для HA?
  18. Что такое initial snapshot в Debezium и какие у него проблемы?
  19. Как мониторить stream pipeline? Какие ключевые метрики?
  20. Опиши real-time fraud detection pipeline.
  21. Как сделать reprocessing данных за прошлый месяц?
  22. Зачем нужны idempotency keys в потоковой обработке?
  23. Как обрабатывать schema evolution в Kafka topic с тысячами consumers?
  24. Какие компромиссы между throughput и latency?
  25. Когда стоит выбрать batch вместо stream?

  1. Поднять PostgreSQL + Debezium + Kafka в Docker.
  2. Создать таблицу orders, настроить CDC.
  3. На Go написать consumer, который пишет события в ClickHouse.
  4. Замерить latency end-to-end.
  1. Kafka topic с событиями click {userID, articleID, ts}.
  2. На Go (через Goka или вручную):
    • Окно 1 минута (tumbling).
    • Сохранять count в Redis с TTL 1 час.
  3. HTTP API для чтения /clicks?article=X&window=last_5min.
  1. На Go: consumer Kafka.
  2. Каждое сообщение имеет event_id.
  3. Проверять в Redis (SETNX с TTL) — если уже обработано, скипнуть.
  4. Иначе — обработать + записать в Redis.
  5. Замерить, как меняется throughput при duplicate rate 0%, 10%, 50%.
  1. Kafka topic clicks {userID, articleID}.
  2. Postgres users {id, country}.
  3. Goroutine кеширует users в memory + слушает CDC от Debezium.
  4. Consumer обогащает clicks полем country.
  5. Метрика: % обогащённых событий, latency lookup.
  1. Транзакции в Kafka.
  2. На Go:
    • Sliding window 1h: count tx по user_id.
    • Если > 10 → publish в topic fraud-alerts.
  3. Persist state в Redis (sorted set с expiration).
  4. Тест: при рестарте процесса state восстанавливается.

  1. Tyler Akidau — Streaming 101 & 102 (статьи O’Reilly): https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/
  2. Tyler Akidau, Slava Chernyak, Reuven Lax — Streaming Systems (книга, O’Reilly, 2018)
  3. Confluent docs — Kafka Streams: https://kafka.apache.org/documentation/streams/
  4. Apache Flink documentation: https://nightlies.apache.org/flink/flink-docs-stable/
  5. Debezium — CDC for databases: https://debezium.io/documentation/
  6. lovoo/goka — Go stream processing: https://github.com/lovoo/goka
  7. Materialize blog — Streaming SQL fundamentals: https://materialize.com/blog/
  8. RisingWave docs: https://docs.risingwave.com/
  9. Confluent — Exactly-once semantics: https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
  10. Martin Kleppmann — Designing Data-Intensive Applications (главы про streams)
  11. Redpanda Connect (бывший Benthos): https://docs.redpanda.com/redpanda-connect/
  12. pglogrepl Go library: https://github.com/jackc/pglogrepl
  13. Trino streaming SQL: https://trino.io/docs/current/
  14. ClickHouse Kafka engine: https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka

Резюме. Stream processing — это сдвиг ментальной модели: от batch к unbounded streams, от processing time к event time, от stateless к stateful. В Go нет «Flink», но есть Kafka + Goka/franz-go + Redis/embedded KV, чего хватает для 80% задач. CDC через Debezium даёт мост между OLTP и streaming. Tech lead не «пишет вручную windowing» — он выбирает правильные инструменты под latency/throughput/state requirements и понимает trade-offs.