Координационные примитивы: errgroup, semaphore, singleflight, backpressure, rate limiting
Зачем знать на Middle 2: реальные сервисы редко используют «голые» goroutines + channels — они построены на координационных примитивах:
errgroupдля параллельного выполнения с ошибками,semaphoreдля ограничения concurrency,singleflightпротив thundering herd, token-bucket rate limiter, backpressure паттерны. Без знания этих библиотек middle 2 не сможет реализовать production-ready параллельные пайплайны, защиту от cache stampede, корректное ограничение нагрузки. Это то, что отличает «знаю горутины» от «умею писать конкурентный backend».
Содержание
Заголовок раздела «Содержание»- errgroup: параллелизм с ошибками и контекстом
- semaphore: weighted ограничение concurrency
- singleflight: дедупликация и cache stampede
- Backpressure: давление обратно
- Rate limiting: token bucket, leaky bucket, distributed
- Структурированная concurrency в Go
- Тестирование concurrent кода
- Gotchas (15)
- Производительность
- Вопросы на собесе (30)
- Practice (8)
- Источники
1. errgroup
Заголовок раздела «1. errgroup»1.1. Что это
Заголовок раздела «1.1. Что это»golang.org/x/sync/errgroup — стандартный де-факто инструмент для параллельного выполнения с агрегацией первой ошибки и кооперативной отменой.
Базовый паттерн:
import "golang.org/x/sync/errgroup"
func fetchAll(ctx context.Context, urls []string) ([]byte, error) { g, ctx := errgroup.WithContext(ctx) results := make([][]byte, len(urls))
for i, url := range urls { i, url := i, url // capture (до Go 1.22) g.Go(func() error { body, err := fetch(ctx, url) if err != nil { return err // → cancel других через ctx } results[i] = body return nil }) }
if err := g.Wait(); err != nil { return nil, err } return bytes.Join(results, nil), nil}1.2. Внутреннее устройство
Заголовок раздела «1.2. Внутреннее устройство»type Group struct { cancel func(error) // CancelCauseFunc (Go 1.20+) wg sync.WaitGroup sem chan token // limit (Go 1.20+ SetLimit) errOnce sync.Once err error}wg— стандартныйWaitGroupдля синхронизации goroutines.errOnce— первая ошибка побеждает, остальные игнорируются.cancel— сохраняется изWithContext, вызывается при первой ошибке.sem— буферизованный канал для лимита concurrency (Go 1.20+).
func (g *Group) Go(f func() error) { if g.sem != nil { g.sem <- token{} // block если лимит достигнут } g.wg.Add(1) go func() { defer g.wg.Done() defer func() { if g.sem != nil { <-g.sem } }()
if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel(err) // cancel ctx } }) } }()}
func (g *Group) Wait() error { g.wg.Wait() if g.cancel != nil { g.cancel(g.err) } return g.err}1.3. WithContext: cancel on first error
Заголовок раздела «1.3. WithContext: cancel on first error»g, ctx := errgroup.WithContext(parent)Это создаёт derived context, который отменяется при первой ошибке в любой g.Go функции. Остальные goroutines, которые слушают ctx, должны корректно завершиться.
g.Go(func() error { select { case <-ctx.Done(): return ctx.Err() case res := <-fetchAsync(ctx, url): return process(res) }})1.4. SetLimit + TryGo (Go 1.20+)
Заголовок раздела «1.4. SetLimit + TryGo (Go 1.20+)»g, ctx := errgroup.WithContext(ctx)g.SetLimit(10) // максимум 10 параллельных goroutines
for _, url := range urls { url := url if !g.TryGo(func() error { return fetch(ctx, url) }) { // нет слота — пропустить или ждать log.Printf("dropped %s", url) }}g.Wait()SetLimit(n)— создаёт internal semaphore размера n.Go(f)— блокируется, если все слоты заняты.TryGo(f)— non-blocking, возвращает false если нет слота.
1.5. Пример: параллельная обработка с лимитом
Заголовок раздела «1.5. Пример: параллельная обработка с лимитом»func processItems(ctx context.Context, items []Item) error { g, ctx := errgroup.WithContext(ctx) g.SetLimit(runtime.NumCPU())
for _, item := range items { item := item g.Go(func() error { return processItem(ctx, item) }) }
return g.Wait()}1.6. Пример: параллельный DB fetch с агрегацией
Заголовок раздела «1.6. Пример: параллельный DB fetch с агрегацией»type Result struct { User *User Orders []Order History []Event}
func loadUserPage(ctx context.Context, db *sql.DB, uid int64) (*Result, error) { var r Result g, ctx := errgroup.WithContext(ctx)
g.Go(func() error { u, err := loadUser(ctx, db, uid) r.User = u return err }) g.Go(func() error { o, err := loadOrders(ctx, db, uid) r.Orders = o return err }) g.Go(func() error { h, err := loadHistory(ctx, db, uid) r.History = h return err })
if err := g.Wait(); err != nil { return nil, err } return &r, nil}⚠️ Внимание: запись в
r.User/r.Ordersиз разных goroutines на разных полях — формально race? Нет, разные поля структуры в Go memory model — отдельные адреса, race detector это пропускает. Но если бы это была map — был бы race.
1.7. Anti-pattern: errgroup без context
Заголовок раздела «1.7. Anti-pattern: errgroup без context»// ПЛОХОg := new(errgroup.Group)g.Go(func() error { return longOp() }) // не cancel'абельно
// ХОРОШОg, ctx := errgroup.WithContext(parent)g.Go(func() error { return longOpCtx(ctx) })2. semaphore
Заголовок раздела «2. semaphore»2.1. Что это
Заголовок раздела «2.1. Что это»golang.org/x/sync/semaphore — weighted counting semaphore. В отличие от буферизованного канала, каждый Acquire может требовать произвольный вес.
import "golang.org/x/sync/semaphore"
sem := semaphore.NewWeighted(int64(100)) // 100 единиц ресурса
// Acquire 10 единицif err := sem.Acquire(ctx, 10); err != nil { return err // context cancelled}defer sem.Release(10)
// ... use 10 units of resource ...2.2. API
Заголовок раздела «2.2. API»NewWeighted(n int64) *Weighted(s *Weighted) Acquire(ctx context.Context, n int64) error(s *Weighted) TryAcquire(n int64) bool(s *Weighted) Release(n int64)2.3. Внутреннее устройство
Заголовок раздела «2.3. Внутреннее устройство»type Weighted struct { size int64 cur int64 mu sync.Mutex waiters list.List // FIFO очередь}Acquire:
- Lock.
- Если
cur + n <= sizeи очередь пуста →cur += n, Unlock, success. - Иначе — добавить в waiters, разлочить mutex, ждать на channel.
Release:
- Lock.
cur -= n.- Пройти waiters head→tail, разбудить тех, кто помещается.
- Unlock.
FIFO fairness: новые Acquire ждут позади очереди, не «прыгают».
2.4. semaphore vs buffered chan
Заголовок раздела «2.4. semaphore vs buffered chan»| buffered chan | semaphore | |
|---|---|---|
| Weighted | нет (один вес) | да |
| Context cancel | через select | напрямую в Acquire |
| Fairness | FIFO (Go runtime) | FIFO |
| API | ch <- struct{}{} / <-ch | Acquire/Release |
| Overhead | мьютекс + park | мьютекс + park |
| TryAcquire | select default | TryAcquire |
Для uniform concurrency лимита buffered chan проще:
sem := make(chan struct{}, 10)for _, item := range items { sem <- struct{}{} go func(i Item) { defer func() { <-sem }() process(i) }(item)}Для weighted (например, разные задачи берут разный объём памяти):
sem := semaphore.NewWeighted(int64(totalMemory))for _, task := range tasks { weight := task.MemorySize if err := sem.Acquire(ctx, weight); err != nil { return err } go func(t Task) { defer sem.Release(t.MemorySize) run(t) }(task)}2.5. Real case: GPU memory лимит
Заголовок раздела «2.5. Real case: GPU memory лимит»// 24 GB GPU, разные задачи требуют от 1 до 8 GBsem := semaphore.NewWeighted(24)
func runInference(ctx context.Context, model Model) (*Result, error) { weight := int64(model.MemoryGB) if err := sem.Acquire(ctx, weight); err != nil { return nil, err } defer sem.Release(weight)
return doInference(model)}2.6. Real case: ограничение параллельных subprocesses
Заголовок раздела «2.6. Real case: ограничение параллельных subprocesses»// max 5 ffmpeg subprocesses на машину (CPU-bound)sem := semaphore.NewWeighted(5)2.7. semaphore + errgroup combo
Заголовок раздела «2.7. semaphore + errgroup combo»g, ctx := errgroup.WithContext(ctx)sem := semaphore.NewWeighted(50)
for _, task := range tasks { task := task g.Go(func() error { if err := sem.Acquire(ctx, task.Weight); err != nil { return err } defer sem.Release(task.Weight) return process(ctx, task) })}return g.Wait()💡 Можно вместо semaphore использовать
g.SetLimit(...), но если веса разные — нужен semaphore.
3. singleflight
Заголовок раздела «3. singleflight»3.1. Проблема: thundering herd / cache stampede
Заголовок раздела «3.1. Проблема: thundering herd / cache stampede»Кеш: key=user:42, TTL=60s истёк в T=0.T=0.001: 1000 requests за user:42 одновременно.Каждая goroutine видит cache miss.1000 идентичных DB-запросов одновременно.DB падает.3.2. singleflight решение
Заголовок раздела «3.2. singleflight решение»golang.org/x/sync/singleflight:
import "golang.org/x/sync/singleflight"
var group singleflight.Group
func getUser(ctx context.Context, id int64) (*User, error) { key := fmt.Sprintf("user:%d", id) v, err, _ := group.Do(key, func() (any, error) { // только один goroutine выполнит это return loadFromDB(ctx, id) }) if err != nil { return nil, err } return v.(*User), nil}При 1000 одновременных вызовах getUser(ctx, 42):
- Один выполнит
loadFromDB. - 999 заблокируются и получат тот же результат.
3.3. Cache-aside + singleflight = stampede protection
Заголовок раздела «3.3. Cache-aside + singleflight = stampede protection»type CachedStore struct { cache *ristretto.Cache g singleflight.Group db *sql.DB}
func (s *CachedStore) Get(ctx context.Context, id int64) (*User, error) { key := fmt.Sprintf("user:%d", id) if v, ok := s.cache.Get(key); ok { return v.(*User), nil }
v, err, _ := s.g.Do(key, func() (any, error) { // двойная проверка (другая goroutine могла уже загрузить) if v, ok := s.cache.Get(key); ok { return v, nil } u, err := loadFromDB(ctx, id) if err != nil { return nil, err } s.cache.Set(key, u, 1) return u, nil }) if err != nil { return nil, err } return v.(*User), nil}3.4. DoChan: асинхронный вариант
Заголовок раздела «3.4. DoChan: асинхронный вариант»ch := group.DoChan(key, fn)select {case res := <-ch: return res.Val.(*User), res.Errcase <-ctx.Done(): return nil, ctx.Err()}⚠️ Даже если context cancelled у вас, fn продолжит выполняться — другие waiters ждут.
3.5. Forget: evict in-flight
Заголовок раздела «3.5. Forget: evict in-flight»group.Forget(key) // forces next Do() to re-execute fnИспользуется после получения ошибки, чтобы не кэшировать неудачу:
v, err, _ := g.Do(key, fn)if err != nil { g.Forget(key) // re-execute next time return nil, err}3.6. Shared флаг — третий return
Заголовок раздела «3.6. Shared флаг — третий return»v, err, shared := group.Do(key, fn)// shared == true, если этот результат разделён с другими waitersПолезно для метрик (count deduplicated requests).
3.7. Внутреннее устройство
Заголовок раздела «3.7. Внутреннее устройство»type Group struct { mu sync.Mutex m map[string]*call}
type call struct { wg sync.WaitGroup val any err error dups int // count waiters chans []chan<- Result}- При первом Do(key, fn) создаётся
*call, wg.Add(1), запускается fn. - При втором Do(key, …) — находит существующий call,
dups++,wg.Wait(). - Когда fn завершается —
wg.Done(), broadcasting через wg.
3.8. Pitfall: singleflight не понимает invalidation
Заголовок раздела «3.8. Pitfall: singleflight не понимает invalidation»Если данные изменились во время in-flight запроса, все waiters получат старый результат. Это особенность дедупликации — не корректность.
3.9. Pitfall: long-running fn блокирует всех
Заголовок раздела «3.9. Pitfall: long-running fn блокирует всех»Если fn выполняется 30 секунд, все waiters ждут 30 секунд (даже если их contexts cancelled). Используйте timeout внутри fn, не полагайтесь на caller context.
4. Backpressure
Заголовок раздела «4. Backpressure»4.1. Что это
Заголовок раздела «4.1. Что это»Backpressure — давление обратно от медленного consumer’а на producer’а. Без backpressure:
- Producer заполняет очередь быстрее, чем consumer разбирает.
- Очередь растёт → OOM.
- Latency бесконечно растёт.
4.2. Паттерн 1: Bounded queue (buffered chan)
Заголовок раздела «4.2. Паттерн 1: Bounded queue (buffered chan)»queue := make(chan Event, 1000)
// producerqueue <- event // блокируется, если queue full
// consumerfor ev := range queue { process(ev)}Producer блокируется → давление идёт обратно (callers producer’а замедляются).
4.3. Паттерн 2: Drop policy
Заголовок раздела «4.3. Паттерн 2: Drop policy»Если нельзя блокировать producer’а (например, real-time events), дроп:
// non-blocking send, drop on fullselect {case queue <- event: // sentdefault: droppedCounter.Inc()}FIFO drop (старое выбрасывается):
type FIFODropQueue struct { mu sync.Mutex q []Event cap int}
func (q *FIFODropQueue) Push(e Event) { q.mu.Lock() defer q.mu.Unlock() if len(q.q) >= q.cap { q.q = q.q[1:] // drop oldest } q.q = append(q.q, e)}LIFO drop (новое выбрасывается) — выше под select { default: }.
4.4. Паттерн 3: Reactive Streams (концепция)
Заголовок раздела «4.4. Паттерн 3: Reactive Streams (концепция)»Java-стиль: consumer запрашивает N элементов (request(N)), producer не превышает. В Go реализуется как chan T + сигнальный канал chan int:
type ReactiveQueue struct { out chan Event request chan int}
// consumergo func() { rq.request <- 10 // request 10 for i := 0; i < 10; i++ { ev := <-rq.out process(ev) } rq.request <- 10 // request more}()
// producergo func() { var credit int for { select { case n := <-rq.request: credit += n default: if credit > 0 { ev := generate() rq.out <- ev credit-- } } }}()4.5. Паттерн 4: Token-based flow control (gRPC, HTTP/2)
Заголовок раздела «4.5. Паттерн 4: Token-based flow control (gRPC, HTTP/2)»HTTP/2 имеет WINDOW_UPDATE: receiver send’ит, сколько байт он готов принять. Sender ограничен этим окном.
Receiver: window = 65535Sender: data 16384 → window = 49151Sender: data 16384 → window = 32767...Sender: data 16384 → window = 0 → blockReceiver: process 32768 bytes → WINDOW_UPDATE(+32768)Sender: window = 32768 → unblockЭто и есть token-based flow control: каждый байт «потребляет» токен, receiver выдаёт новые.
В Go gRPC client/server делают это автоматически через MaxRecvMsgSize, InitialWindowSize, etc.
4.6. Anti-pattern: unbounded chan
Заголовок раздела «4.6. Anti-pattern: unbounded chan»queue := make(chan Event) // нет capacity, но не unbounded — это **synchronous**
queue := make(chan Event, math.MaxInt) // unbounded — OOM bombНикогда не используйте «огромные» capacity в надежде, что хватит. Заведите Drop policy.
4.7. Anti-pattern: unbounded goroutines
Заголовок раздела «4.7. Anti-pattern: unbounded goroutines»for ev := range events { go process(ev) // ⚠️ каждое событие — новая goroutine}Если producer быстрее consumer’ов — миллионы goroutines, OOM.
Решение: worker pool с bounded queue:
workers := 10queue := make(chan Event, 100)for i := 0; i < workers; i++ { go func() { for ev := range queue { process(ev) } }()}4.8. Real case: log shipping
Заголовок раздела «4.8. Real case: log shipping»Logger пишет 100k events/sec, network upstream 50k/sec.
type Shipper struct { queue chan Event drops atomic.Uint64}
func (s *Shipper) Log(e Event) { select { case s.queue <- e: default: s.drops.Add(1) // overflow drop }}При перегрузе — drop, метрика drops экспортится в Prometheus, alerting.
5. Rate limiting
Заголовок раздела «5. Rate limiting»5.1. Token bucket
Заголовок раздела «5.1. Token bucket»Концепция:
- В ведре есть
burstмаксимум токенов. - Каждые
1/rateсекунд добавляется 1 токен. - Request «расходует» 1 токен.
- Если токенов нет — request rejected или ждёт.
rate = 10 req/sec, burst = 20: ─── 20 токенов сразу ─── burst: можно сделать 20 в одну миллисекунду ─── затем 1 токен / 100ms ─── steady state5.2. golang.org/x/time/rate
Заголовок раздела «5.2. golang.org/x/time/rate»import "golang.org/x/time/rate"
lim := rate.NewLimiter(rate.Limit(10), 20) // 10 req/sec, burst 20
// non-blocking checkif !lim.Allow() { http.Error(w, "Too Many Requests", 429) return}
// blocking wait (respects ctx)if err := lim.Wait(ctx); err != nil { return err}
// reserve (advanced)r := lim.Reserve()if !r.OK() { return ErrLimit}time.Sleep(r.Delay())5.3. Внутреннее устройство rate.Limiter
Заголовок раздела «5.3. Внутреннее устройство rate.Limiter»type Limiter struct { mu sync.Mutex limit Limit // req/sec burst int tokens float64 // accumulated last time.Time // ...}При каждом Allow/Reserve:
- Lock.
tokens += (now - last) * limit.- Cap at
burst. - Если
tokens >= 1→tokens--, return true. - Иначе вернуть false (Allow) или Delay (Reserve).
Это классический token bucket, не leaky bucket — он позволяет «burst», накопленные за период тишины.
5.4. Leaky bucket
Заголовок раздела «5.4. Leaky bucket»Альтернатива: вместо «копящегося» резервуара, leaky bucket выпускает по 1 токену в фиксированном темпе. Burst невозможен. В Go реже используется, потому что rate.Limiter уже хорош.
leaky: rate = 10 req/sec → 1 req / 100ms exactly no burst: 11-й request в первые 100ms — reject5.5. Sliding window rate limiter
Заголовок раздела «5.5. Sliding window rate limiter»Считаем requests в окне 1 секунды; если их > N — reject.
type SlidingWindow struct { mu sync.Mutex window time.Duration limit int times []time.Time}
func (s *SlidingWindow) Allow() bool { s.mu.Lock() defer s.mu.Unlock() now := time.Now() cutoff := now.Add(-s.window) // удалить устаревшие i := 0 for i < len(s.times) && s.times[i].Before(cutoff) { i++ } s.times = s.times[i:] if len(s.times) >= s.limit { return false } s.times = append(s.times, now) return true}Минусы: O(N) память на N requests за окно. Используется только при малых N.
5.6. Distributed rate limiting (Redis + Lua)
Заголовок раздела «5.6. Distributed rate limiting (Redis + Lua)»Один Limiter на N инстансов — нужно общее хранилище.
-- Redis Lua: token bucketlocal key = KEYS[1]local rate = tonumber(ARGV[1])local burst = tonumber(ARGV[2])local now = tonumber(ARGV[3])
local data = redis.call("HMGET", key, "tokens", "last")local tokens = tonumber(data[1] or burst)local last = tonumber(data[2] or now)
local elapsed = now - lasttokens = math.min(burst, tokens + elapsed * rate)local allowed = 0if tokens >= 1 then tokens = tokens - 1 allowed = 1end
redis.call("HMSET", key, "tokens", tokens, "last", now)redis.call("EXPIRE", key, 60)
return allowedВ Go:
result, err := redisClient.Eval(ctx, luaScript, []string{"rl:" + userID}, rate, burst, time.Now().Unix()).Result()5.7. Per-user vs global rate limiter
Заголовок раздела «5.7. Per-user vs global rate limiter»type MultiLimiter struct { mu sync.Mutex limiters map[string]*rate.Limiter rate rate.Limit burst int}
func (m *MultiLimiter) Get(key string) *rate.Limiter { m.mu.Lock() defer m.mu.Unlock() if lim, ok := m.limiters[key]; ok { return lim } lim := rate.NewLimiter(m.rate, m.burst) m.limiters[key] = lim return lim}⚠️ Без cleanup —
m.limitersбудет расти бесконечно. Используйте LRU/TTL.
5.8. Rate limiter middleware
Заголовок раздела «5.8. Rate limiter middleware»func RateLimitMiddleware(lim *rate.Limiter) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if !lim.Allow() { w.Header().Set("Retry-After", "1") http.Error(w, "Too Many Requests", 429) return } next.ServeHTTP(w, r) }) }}6. Структурированная concurrency
Заголовок раздела «6. Структурированная concurrency»6.1. Концепция
Заголовок раздела «6.1. Концепция»Структурированная concurrency (Nathaniel J. Smith, “Notes on structured concurrency, or: Go statement considered harmful”, 2018) — идея, что каждая parallel задача должна иметь scope, и dispatcher не возвращается, пока все дети не закончились.
В Python это TaskGroup (3.11+), в Kotlin — coroutineScope, в Swift — async let, в Trio — nursery.
function run() { scope { spawn taskA() spawn taskB() } // scope не выходит, пока taskA и taskB не закончатся // ↑ единственная точка выхода}6.2. Чего НЕ хватает Go
Заголовок раздела «6.2. Чего НЕ хватает Go»Go go statement — unstructured: после go f() управление возвращается, а f живёт независимо. Можно создать дитя goroutine из любого места, и его scope не привязан к родителю.
func handle(req *Request) { go cleanup(req) // утечка: cleanup может выполняться вечно return}6.3. errgroup как «базовая» структурированная concurrency
Заголовок раздела «6.3. errgroup как «базовая» структурированная concurrency»func process(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) g.Go(taskA) g.Go(taskB) return g.Wait() // ← единственная точка выхода}errgroup навязывает scope: Wait() блокируется до завершения всех Go. Это де-факто structured concurrency в Go.
6.4. Best practices structured concurrency в Go
Заголовок раздела «6.4. Best practices structured concurrency в Go»- Всегда передавайте
ctxв goroutines. - Всегда ассоциируйте goroutines с группой (errgroup/WaitGroup).
- Никогда не делайте
go fn()без знания, когда fn завершится (fire-and-forget — антипаттерн). - Если нужен fire-and-forget — заведите supervisor с контекстом приложения.
6.5. Anti-pattern: fire-and-forget
Заголовок раздела «6.5. Anti-pattern: fire-and-forget»// ПЛОХОfunc handle(req *Request) { go log(req) // leak: что если log упадёт, panic'нет?}
// ХОРОШОfunc handle(req *Request) { select { case logQueue <- req: default: droppedLogs.Inc() }}// log goroutine управляется supervisor'ом, имеет defer recover, известен lifecycle6.6. Supervisor pattern
Заголовок раздела «6.6. Supervisor pattern»type App struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup}
func (a *App) Start(name string, fn func(context.Context) error) { a.wg.Add(1) go func() { defer a.wg.Done() defer func() { if r := recover(); r != nil { log.Printf("panic in %s: %v", name, r) } }() if err := fn(a.ctx); err != nil && !errors.Is(err, context.Canceled) { log.Printf("%s exited: %v", name, err) } }()}
func (a *App) Shutdown() { a.cancel() a.wg.Wait()}7. Тестирование
Заголовок раздела «7. Тестирование»7.1. testing/synctest (Go 1.24+)
Заголовок раздела «7.1. testing/synctest (Go 1.24+)»Go 1.24 представил testing/synctest (изначально как experimental) — пакет для тестирования concurrent кода с виртуальным временем.
import "testing/synctest"
func TestTimeout(t *testing.T) { synctest.Run(func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel()
go func() { time.Sleep(5 * time.Second) // do something }()
// wait for all goroutines to be idle synctest.Wait()
// tick virtual time // (in Go 1.25 stable form) })}Виртуальное время позволяет:
- Тесты с
time.Sleep(1 * time.Hour)выполняются мгновенно. - Детерминированные scheduling decisions.
- Нет flaky tests из-за timing.
7.2. Race detector
Заголовок раздела «7.2. Race detector»go test -race ./...go build -raceRace detector использует ThreadSanitizer (TSan): инструментирует memory accesses, фиксирует happens-before. Если найден doat race — выводит stack trace.
Когда работает:
- Обнаруживает фактически произошедшие races.
- Не доказывает отсутствие races (может пропустить редкие).
Когда не работает:
- Production (~10× overhead).
- Race должен реально случиться в test run.
💡 Always run CI с
-race.
7.3. Deterministic tests с channels
Заголовок раздела «7.3. Deterministic tests с channels»Вместо time.Sleep(100 * time.Millisecond) для синхронизации — используйте channels:
// плохоfunc TestWorker(t *testing.T) { w := startWorker() w.Submit(task) time.Sleep(100 * time.Millisecond) // flaky! if !task.Done() { t.Fatal(...) }}
// хорошоfunc TestWorker(t *testing.T) { done := make(chan struct{}) w := startWorkerWith(func() { close(done) }) w.Submit(task) select { case <-done: case <-time.After(time.Second): t.Fatal("timeout") }}7.4. testify suite
Заголовок раздела «7.4. testify suite»testify/suite для setup/teardown:
type WorkerSuite struct { suite.Suite w *Worker}
func (s *WorkerSuite) SetupTest() { s.w = NewWorker()}
func (s *WorkerSuite) TearDownTest() { s.w.Shutdown()}
func (s *WorkerSuite) TestDoSomething() { s.w.Submit(task) s.Eventually(func() bool { return task.Done() }, time.Second, 10*time.Millisecond)}
func TestWorkerSuite(t *testing.T) { suite.Run(t, new(WorkerSuite))}7.5. Stress tests
Заголовок раздела «7.5. Stress tests»func TestConcurrentMapStress(t *testing.T) { if testing.Short() { t.Skip("stress test") } m := NewConcurrentMap() var wg sync.WaitGroup for i := 0; i < 1000; i++ { wg.Add(1) go func(i int) { defer wg.Done() for j := 0; j < 1000; j++ { m.Set(fmt.Sprintf("k%d", i*j%100), i) m.Get(fmt.Sprintf("k%d", i*j%100)) } }(i) } wg.Wait()}Запуск:
go test -race -count=100 -run TestConcurrentMapStress8. Gotchas
Заголовок раздела «8. Gotchas»⚠️ 8.1. errgroup без WithContext
Заголовок раздела «⚠️ 8.1. errgroup без WithContext»new(errgroup.Group) — не отменяет goroutines при ошибке. Используйте errgroup.WithContext.
⚠️ 8.2. errgroup.Go captures loop variable
Заголовок раздела «⚠️ 8.2. errgroup.Go captures loop variable»В Go < 1.22 — нужно i := i внутри loop. С Go 1.22+ переменные loop ограничены iteration scope автоматически.
// Go 1.22+for _, url := range urls { g.Go(func() error { return fetch(url) }) // OK}
// до 1.22for _, url := range urls { url := url g.Go(func() error { return fetch(url) })}⚠️ 8.3. errgroup.Wait — единственный сигнал завершения
Заголовок раздела «⚠️ 8.3. errgroup.Wait — единственный сигнал завершения»Не вызывайте g.Wait() несколько раз — нет защиты. И не пытайтесь читать g.err напрямую (приватное).
⚠️ 8.4. semaphore.Release с неверным весом
Заголовок раздела «⚠️ 8.4. semaphore.Release с неверным весом»sem.Acquire(ctx, 5)defer sem.Release(3) // ⚠️ leak 2 единицыВсегда release тот же вес, что acquire.
⚠️ 8.5. semaphore.Acquire с весом > size — deadlock
Заголовок раздела «⚠️ 8.5. semaphore.Acquire с весом > size — deadlock»sem := semaphore.NewWeighted(10)sem.Acquire(ctx, 20) // никогда не вернётся⚠️ 8.6. singleflight long-running blocks all waiters
Заголовок раздела «⚠️ 8.6. singleflight long-running blocks all waiters»Если fn 30 сек, все waiters ждут 30 сек, даже если их ctx cancelled. Реализуйте timeout внутри fn.
⚠️ 8.7. singleflight.Do — игнорирует contexts waiters
Заголовок раздела «⚠️ 8.7. singleflight.Do — игнорирует contexts waiters»Только context первого caller’а проходит в fn. Все остальные ждут в обход своих ctx.
⚠️ 8.8. singleflight.Forget вне race-free
Заголовок раздела «⚠️ 8.8. singleflight.Forget вне race-free»group.Forget(key) — race с in-flight Do для того же key. После Forget новые Do запустят повторное выполнение.
⚠️ 8.9. rate.Limiter — Burst 0
Заголовок раздела «⚠️ 8.9. rate.Limiter — Burst 0»NewLimiter(10, 0) — лимит 10 rps, но burst 0 → ничего не пропускает (нет накопленных токенов). Минимум burst = 1.
⚠️ 8.10. rate.Limiter — Wait под нагрузкой
Заголовок раздела «⚠️ 8.10. rate.Limiter — Wait под нагрузкой»lim.Wait(ctx) блокируется. Если тысячи requests — тысячи goroutines park’нутся. Лучше Allow() + return 429.
⚠️ 8.11. Per-user rate limiter — memory leak
Заголовок раздела «⚠️ 8.11. Per-user rate limiter — memory leak»Map[user]*Limiter без cleanup → растёт бесконечно. Используйте LRU или TTL.
⚠️ 8.12. Backpressure — drop policy unsuitable for critical data
Заголовок раздела «⚠️ 8.12. Backpressure — drop policy unsuitable for critical data»Drop OK для metrics/logs. Не OK для финансовых событий — там нужна durable очередь (Kafka, NATS).
⚠️ 8.13. Unbounded chan — OOM bomb
Заголовок раздела «⚠️ 8.13. Unbounded chan — OOM bomb»ch := make(chan Event, math.MaxInt32) // 2 миллиарда событий в памяти⚠️ 8.14. errgroup и panic
Заголовок раздела «⚠️ 8.14. errgroup и panic»Если функция в g.Go() паникует — паника не ловится. Process крашится. Заверните в defer recover, если важно:
g.Go(func() (err error) { defer func() { if r := recover(); r != nil { err = fmt.Errorf("panic: %v", r) } }() return doSomething()})⚠️ 8.15. Тесты с time.Sleep — flaky
Заголовок раздела «⚠️ 8.15. Тесты с time.Sleep — flaky»Любой time.Sleep в тесте — потенциальный flake. Используйте synctest, channels, s.Eventually(...) (testify).
9. Производительность
Заголовок раздела «9. Производительность»9.1. errgroup overhead
Заголовок раздела «9.1. errgroup overhead»errgroup.Group{}: ~100ns на Go+WaitWaitGroup{}: ~50ns на Add+Done+WaitНакладные расходы errgroup мизерные. Используйте всегда.
9.2. semaphore vs buffered chan
Заголовок раздела «9.2. semaphore vs buffered chan»buffered chan (cap=100): 50 ns/opsemaphore.Acquire(1): 120 ns/opBuffered chan быстрее для uniform weight. semaphore — для weighted.
9.3. singleflight overhead
Заголовок раздела «9.3. singleflight overhead»Без singleflight: 100 concurrent Get → 100 DB queries → 100×50ms = 5 sec totalС singleflight: 100 concurrent Get → 1 DB query → 50ms9.4. rate.Limiter
Заголовок раздела «9.4. rate.Limiter»lim.Allow(): 30 ns/op (low contention) 100 ns/op (high contention, mutex)9.5. Real case: cache stampede metrics
Заголовок раздела «9.5. Real case: cache stampede metrics»До singleflight (e-commerce checkout):
- p99 latency: 800ms (cache miss → 100 concurrent DB queries)
- DB CPU: 80%
После:
- p99 latency: 80ms (cache miss → 1 DB query)
- DB CPU: 25%
9.6. Real case: GitHub-style API rate limit
Заголовок раздела «9.6. Real case: GitHub-style API rate limit»5000 req/hour per user — token bucket, burst 100.
type APIRateLimiter struct { mu sync.Mutex users map[string]*rate.Limiter}
func (a *APIRateLimiter) Allow(userID string) bool { a.mu.Lock() lim, ok := a.users[userID] if !ok { lim = rate.NewLimiter(rate.Limit(5000.0/3600.0), 100) a.users[userID] = lim } a.mu.Unlock() return lim.Allow()}9.7. Worker pool patterns
Заголовок раздела «9.7. Worker pool patterns»// 1: shared chan + N workers (simple, recommended)ch := make(chan Task, 100)for i := 0; i < N; i++ { go worker(ch)}
// 2: errgroup.SetLimit (recommended Go 1.20+)g, ctx := errgroup.WithContext(ctx)g.SetLimit(N)for _, t := range tasks { t := t g.Go(func() error { return process(ctx, t) })}g.Wait()Производительность одинакова, errgroup версия — меньше кода.
10. Вопросы на собесе
Заголовок раздела «10. Вопросы на собесе»10.1. Что такое errgroup и для чего?
Заголовок раздела «10.1. Что такое errgroup и для чего?»golang.org/x/sync/errgroup — Wait group + первая ошибка + cancellation. WithContext отменяет ctx при первой ошибке. g.Go() — стартует, g.Wait() — блокирует до завершения всех.
10.2. Чем errgroup лучше WaitGroup?
Заголовок раздела «10.2. Чем errgroup лучше WaitGroup?»WaitGroup не агрегирует ошибки. errgroup автоматизирует: первая ошибка, cancel context, опциональный лимит concurrency (Go 1.20+ SetLimit).
10.3. errgroup.SetLimit и TryGo?
Заголовок раздела «10.3. errgroup.SetLimit и TryGo?»SetLimit(n) — лимит concurrent goroutines через internal semaphore. TryGo(f) — non-blocking версия Go, возвращает false при отсутствии слота.
10.4. Что произойдёт, если g.Wait() вызвать после ошибки в g.Go?
Заголовок раздела «10.4. Что произойдёт, если g.Wait() вызвать после ошибки в g.Go?»Wait() дождётся завершения всех goroutines (даже после первой ошибки — мы ждём остальных). Вернёт первую ошибку. Остальные ошибки игнорируются.
10.5. errgroup с panic’ой в g.Go?
Заголовок раздела «10.5. errgroup с panic’ой в g.Go?»Паника не ловится, процесс крашится. Используйте defer recover внутри функции.
10.6. Что такое semaphore.Weighted?
Заголовок раздела «10.6. Что такое semaphore.Weighted?»Counting semaphore с произвольным весом. Acquire(ctx, n) ждёт n единиц. Release(n) возвращает. FIFO fairness.
10.7. semaphore vs buffered chan?
Заголовок раздела «10.7. semaphore vs buffered chan?»Buffered chan — uniform weight. semaphore — weighted (разные задачи берут разный объём). semaphore нативно поддерживает ctx в Acquire.
10.8. Когда использовать semaphore?
Заголовок раздела «10.8. Когда использовать semaphore?»GPU/memory limits с переменными requirements, лимит subprocess’ов, ограничение DB connections с разными priorities.
10.9. Что такое cache stampede / thundering herd?
Заголовок раздела «10.9. Что такое cache stampede / thundering herd?»Одновременный истечение кеша + N concurrent requests → N идентичных backend-запросов одновременно → перегрузка downstream.
10.10. Как singleflight защищает от stampede?
Заголовок раздела «10.10. Как singleflight защищает от stampede?»Дедуплицирует одинаковые в-полёте запросы по ключу. Только один выполняется, остальные ждут результат. group.Do(key, fn) — все 1000 waiters получат тот же результат.
10.11. singleflight.Do vs DoChan?
Заголовок раздела «10.11. singleflight.Do vs DoChan?»Do — synchronous (блокирует caller). DoChan — возвращает channel, можно использовать с select+ctx.
10.12. Что делает singleflight.Forget?
Заголовок раздела «10.12. Что делает singleflight.Forget?»Удаляет in-flight call из map. Следующий Do(key, fn) повторно выполнит fn. Используется после получения ошибки.
10.13. Какие проблемы у singleflight?
Заголовок раздела «10.13. Какие проблемы у singleflight?»Long-running fn блокирует waiters даже если их ctx cancelled. Все waiters получают тот же результат, даже если данные могли измениться.
10.14. Что такое backpressure?
Заголовок раздела «10.14. Что такое backpressure?»Давление обратно от медленного consumer’а на producer. Без backpressure — растущая очередь и OOM.
10.15. Паттерны backpressure?
Заголовок раздела «10.15. Паттерны backpressure?»- Bounded queue (буферизованный chan) — блок producer’а.
- Drop policy (LIFO/FIFO) — выкидывание событий.
- Token-based flow control (HTTP/2, gRPC).
- Reactive Streams (request-N).
10.16. Token bucket vs leaky bucket?
Заголовок раздела «10.16. Token bucket vs leaky bucket?»Token bucket: токены копятся, burst разрешён. Leaky bucket: постоянный rate, burst невозможен. token bucket — golang.org/x/time/rate.
10.17. golang.org/x/time/rate API?
Заголовок раздела «10.17. golang.org/x/time/rate API?»NewLimiter(rate, burst). Методы: Allow() (non-blocking bool), Wait(ctx) (block), Reserve() (advanced: получить Delay).
10.18. rate.Limiter internals?
Заголовок раздела «10.18. rate.Limiter internals?»Token bucket. При каждом Allow: добавляются tokens = (now - last) * rate (cap at burst). Если tokens >= 1 — tokens—, return true. Mutex защищает state.
10.19. Per-user rate limiter — memory issues?
Заголовок раздела «10.19. Per-user rate limiter — memory issues?»Map[user]*Limiter растёт. Используйте LRU/TTL. Или sliding window + Redis для distributed.
10.20. Distributed rate limiting?
Заголовок раздела «10.20. Distributed rate limiting?»Redis + Lua скрипт (атомарный token bucket update). Или centralized service. Локальный rate.Limiter не подходит для N инстансов.
10.21. Что такое структурированная concurrency?
Заголовок раздела «10.21. Что такое структурированная concurrency?»Каждая parallel задача имеет scope; точка выхода ждёт всех детей. Python TaskGroup, Kotlin coroutineScope. В Go нет встроенного, но errgroup + ctx — рабочий аналог.
10.22. Чем go fn() отличается от structured?
Заголовок раздела «10.22. Чем go fn() отличается от structured?»go — unstructured: fn живёт независимо от родителя. Можно «потерять» goroutine. Structured concurrency запрещает orphan’ы.
10.23. Fire-and-forget goroutines — почему антипаттерн?
Заголовок раздела «10.23. Fire-and-forget goroutines — почему антипаттерн?»- Никто не отслеживает завершение.
- Не отменяемы.
- Паники не пойманы.
- Утечки goroutine.
Используйте supervisor pattern с ctx и WaitGroup.
10.24. testing/synctest — что это?
Заголовок раздела «10.24. testing/synctest — что это?»Go 1.24+ пакет для тестов с виртуальным временем. synctest.Run(fn) запускает fn, synctest.Wait() ждёт пока все goroutines idle, time.Sleep работает с виртуальным временем.
10.25. Race detector в Go?
Заголовок раздела «10.25. Race detector в Go?»-race flag активирует ThreadSanitizer instrumentation. Находит data races во время выполнения. ~10× CPU overhead, не для production.
10.26. Как избежать flaky concurrent tests?
Заголовок раздела «10.26. Как избежать flaky concurrent tests?»- Channels вместо time.Sleep для синхронизации.
- testify Eventually для условий.
- testing/synctest для виртуального времени.
- go test -count=100 для проверки стабильности.
10.27. Worker pool pattern?
Заголовок раздела «10.27. Worker pool pattern?»ch := make(chan Task, queueSize)for i := 0; i < workers; i++ { go func() { for t := range ch { process(t) } }()}Или errgroup.SetLimit. Bounded queue даёт backpressure.
10.28. Когда нужно несколько rate limiters?
Заголовок раздела «10.28. Когда нужно несколько rate limiters?»- Per-user + global (защита от abuse + от total overload).
- Per-endpoint (write-heavy endpoints — отдельный лимит).
- Read vs write (DB write quota меньше read quota).
10.29. errgroup + semaphore — когда обоё?
Заголовок раздела «10.29. errgroup + semaphore — когда обоё?»errgroup — для агрегации ошибок и cancel. semaphore — для weighted concurrency. Если задачи весят по-разному — нужны обе.
10.30. Sliding window vs token bucket — что выбрать?
Заголовок раздела «10.30. Sliding window vs token bucket — что выбрать?»Token bucket — простой, дешёвый, поддерживает burst. Sliding window — точнее на коротких интервалах, но O(N) память. Token bucket в 95% случаев.
11. Practice
Заголовок раздела «11. Practice»11.1. Parallel fetcher с лимитом
Заголовок раздела «11.1. Parallel fetcher с лимитом»Реализовать FetchAll(ctx, urls, limit) — параллельная загрузка с лимитом concurrency через errgroup.SetLimit. Тесты на cancel и aggregate error.
11.2. Worker pool с semaphore
Заголовок раздела «11.2. Worker pool с semaphore»Worker pool, где каждая задача имеет вес (CPU cores). Через semaphore.Weighted распределить так, чтобы суммарный вес не превышал N.
11.3. Кеш с singleflight
Заголовок раздела «11.3. Кеш с singleflight»Cache-aside для DB-запросов с singleflight protection. Замерить latency и DB QPS до/после под нагрузкой 1000 RPS.
11.4. Token bucket rate limiter
Заголовок раздела «11.4. Token bucket rate limiter»Реализовать MultiUserLimiter с per-user rate.Limiter, LRU cleanup до 10k активных пользователей.
11.5. Backpressure log shipper
Заголовок раздела «11.5. Backpressure log shipper»Log shipper с bounded queue (drop policy) и Prometheus метрикой dropped_logs_total.
11.6. Distributed rate limiter
Заголовок раздела «11.6. Distributed rate limiter»Token bucket в Redis (Lua script). Go клиент с fallback на локальный rate.Limiter при недоступности Redis.
11.7. Supervisor pattern
Заголовок раздела «11.7. Supervisor pattern»Application supervisor: app.Start(name, fn) запускает goroutine с recover, log на падении. app.Shutdown() — graceful cancel всем.
11.8. synctest-тест с виртуальным временем
Заголовок раздела «11.8. synctest-тест с виртуальным временем»Написать тест для функции с time.After(1*time.Hour) через testing/synctest (Go 1.24+) — тест должен выполняться < 1ms.
12. Источники
Заголовок раздела «12. Источники»- golang.org/x/sync/errgroup — https://pkg.go.dev/golang.org/x/sync/errgroup.
- golang.org/x/sync/semaphore — https://pkg.go.dev/golang.org/x/sync/semaphore.
- golang.org/x/sync/singleflight — https://pkg.go.dev/golang.org/x/sync/singleflight.
- golang.org/x/time/rate — https://pkg.go.dev/golang.org/x/time/rate (token bucket).
- Nathaniel J. Smith, “Notes on structured concurrency” — https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/.
- Go 1.24 release notes — testing/synctest — https://go.dev/doc/go1.24 (раздел testing/synctest).
- Marc Brooker, “Exponential Backoff And Jitter” — AWS Architecture Blog, для rate limiting wisdom.
- gRPC flow control — https://grpc.io/docs/guides/flow-control/ (token-based flow control).
- Redis Rate Limiting patterns — https://redis.io/learn/operate/redis-at-scale/scalability/rate-limiting (distributed token bucket Lua).
- The Go Programming Language (Donovan, Kernighan) глава 8 — channels, select, structured patterns.
Итог Middle 2: errgroup — стандарт для параллельных задач с ошибками; semaphore — weighted concurrency; singleflight — обязателен против cache stampede; backpressure через bounded queue + drop policy; rate limiting через token bucket (
x/time/rate), distributed — Redis Lua. Структурированная concurrency в Go не встроена, но errgroup + ctx — рабочий шаблон. Тестируйте concurrent код черезtesting/synctest(Go 1.24+), channels, race detector в CI.