Перейти к содержимому

Координационные примитивы: errgroup, semaphore, singleflight, backpressure, rate limiting

Зачем знать на Middle 2: реальные сервисы редко используют «голые» goroutines + channels — они построены на координационных примитивах: errgroup для параллельного выполнения с ошибками, semaphore для ограничения concurrency, singleflight против thundering herd, token-bucket rate limiter, backpressure паттерны. Без знания этих библиотек middle 2 не сможет реализовать production-ready параллельные пайплайны, защиту от cache stampede, корректное ограничение нагрузки. Это то, что отличает «знаю горутины» от «умею писать конкурентный backend».

  1. errgroup: параллелизм с ошибками и контекстом
  2. semaphore: weighted ограничение concurrency
  3. singleflight: дедупликация и cache stampede
  4. Backpressure: давление обратно
  5. Rate limiting: token bucket, leaky bucket, distributed
  6. Структурированная concurrency в Go
  7. Тестирование concurrent кода
  8. Gotchas (15)
  9. Производительность
  10. Вопросы на собесе (30)
  11. Practice (8)
  12. Источники

golang.org/x/sync/errgroup — стандартный де-факто инструмент для параллельного выполнения с агрегацией первой ошибки и кооперативной отменой.

Базовый паттерн:

import "golang.org/x/sync/errgroup"
func fetchAll(ctx context.Context, urls []string) ([]byte, error) {
g, ctx := errgroup.WithContext(ctx)
results := make([][]byte, len(urls))
for i, url := range urls {
i, url := i, url // capture (до Go 1.22)
g.Go(func() error {
body, err := fetch(ctx, url)
if err != nil {
return err // → cancel других через ctx
}
results[i] = body
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return bytes.Join(results, nil), nil
}
type Group struct {
cancel func(error) // CancelCauseFunc (Go 1.20+)
wg sync.WaitGroup
sem chan token // limit (Go 1.20+ SetLimit)
errOnce sync.Once
err error
}
  • wg — стандартный WaitGroup для синхронизации goroutines.
  • errOnce — первая ошибка побеждает, остальные игнорируются.
  • cancel — сохраняется из WithContext, вызывается при первой ошибке.
  • sem — буферизованный канал для лимита concurrency (Go 1.20+).
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- token{} // block если лимит достигнут
}
g.wg.Add(1)
go func() {
defer g.wg.Done()
defer func() { if g.sem != nil { <-g.sem } }()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil {
g.cancel(err) // cancel ctx
}
})
}
}()
}
func (g *Group) Wait() error {
g.wg.Wait()
if g.cancel != nil {
g.cancel(g.err)
}
return g.err
}
g, ctx := errgroup.WithContext(parent)

Это создаёт derived context, который отменяется при первой ошибке в любой g.Go функции. Остальные goroutines, которые слушают ctx, должны корректно завершиться.

g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
case res := <-fetchAsync(ctx, url):
return process(res)
}
})
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10) // максимум 10 параллельных goroutines
for _, url := range urls {
url := url
if !g.TryGo(func() error { return fetch(ctx, url) }) {
// нет слота — пропустить или ждать
log.Printf("dropped %s", url)
}
}
g.Wait()
  • SetLimit(n) — создаёт internal semaphore размера n.
  • Go(f) — блокируется, если все слоты заняты.
  • TryGo(f) — non-blocking, возвращает false если нет слота.

1.5. Пример: параллельная обработка с лимитом

Заголовок раздела «1.5. Пример: параллельная обработка с лимитом»
func processItems(ctx context.Context, items []Item) error {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(runtime.NumCPU())
for _, item := range items {
item := item
g.Go(func() error {
return processItem(ctx, item)
})
}
return g.Wait()
}

1.6. Пример: параллельный DB fetch с агрегацией

Заголовок раздела «1.6. Пример: параллельный DB fetch с агрегацией»
type Result struct {
User *User
Orders []Order
History []Event
}
func loadUserPage(ctx context.Context, db *sql.DB, uid int64) (*Result, error) {
var r Result
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
u, err := loadUser(ctx, db, uid)
r.User = u
return err
})
g.Go(func() error {
o, err := loadOrders(ctx, db, uid)
r.Orders = o
return err
})
g.Go(func() error {
h, err := loadHistory(ctx, db, uid)
r.History = h
return err
})
if err := g.Wait(); err != nil {
return nil, err
}
return &r, nil
}

⚠️ Внимание: запись в r.User/r.Orders из разных goroutines на разных полях — формально race? Нет, разные поля структуры в Go memory model — отдельные адреса, race detector это пропускает. Но если бы это была map — был бы race.

// ПЛОХО
g := new(errgroup.Group)
g.Go(func() error { return longOp() }) // не cancel'абельно
// ХОРОШО
g, ctx := errgroup.WithContext(parent)
g.Go(func() error { return longOpCtx(ctx) })

golang.org/x/sync/semaphoreweighted counting semaphore. В отличие от буферизованного канала, каждый Acquire может требовать произвольный вес.

import "golang.org/x/sync/semaphore"
sem := semaphore.NewWeighted(int64(100)) // 100 единиц ресурса
// Acquire 10 единиц
if err := sem.Acquire(ctx, 10); err != nil {
return err // context cancelled
}
defer sem.Release(10)
// ... use 10 units of resource ...
NewWeighted(n int64) *Weighted
(s *Weighted) Acquire(ctx context.Context, n int64) error
(s *Weighted) TryAcquire(n int64) bool
(s *Weighted) Release(n int64)
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List // FIFO очередь
}

Acquire:

  1. Lock.
  2. Если cur + n <= size и очередь пуста → cur += n, Unlock, success.
  3. Иначе — добавить в waiters, разлочить mutex, ждать на channel.

Release:

  1. Lock.
  2. cur -= n.
  3. Пройти waiters head→tail, разбудить тех, кто помещается.
  4. Unlock.

FIFO fairness: новые Acquire ждут позади очереди, не «прыгают».

buffered chansemaphore
Weightedнет (один вес)да
Context cancelчерез selectнапрямую в Acquire
FairnessFIFO (Go runtime)FIFO
APIch <- struct{}{} / <-chAcquire/Release
Overheadмьютекс + parkмьютекс + park
TryAcquireselect defaultTryAcquire

Для uniform concurrency лимита buffered chan проще:

sem := make(chan struct{}, 10)
for _, item := range items {
sem <- struct{}{}
go func(i Item) {
defer func() { <-sem }()
process(i)
}(item)
}

Для weighted (например, разные задачи берут разный объём памяти):

sem := semaphore.NewWeighted(int64(totalMemory))
for _, task := range tasks {
weight := task.MemorySize
if err := sem.Acquire(ctx, weight); err != nil {
return err
}
go func(t Task) {
defer sem.Release(t.MemorySize)
run(t)
}(task)
}
// 24 GB GPU, разные задачи требуют от 1 до 8 GB
sem := semaphore.NewWeighted(24)
func runInference(ctx context.Context, model Model) (*Result, error) {
weight := int64(model.MemoryGB)
if err := sem.Acquire(ctx, weight); err != nil {
return nil, err
}
defer sem.Release(weight)
return doInference(model)
}

2.6. Real case: ограничение параллельных subprocesses

Заголовок раздела «2.6. Real case: ограничение параллельных subprocesses»
// max 5 ffmpeg subprocesses на машину (CPU-bound)
sem := semaphore.NewWeighted(5)
g, ctx := errgroup.WithContext(ctx)
sem := semaphore.NewWeighted(50)
for _, task := range tasks {
task := task
g.Go(func() error {
if err := sem.Acquire(ctx, task.Weight); err != nil {
return err
}
defer sem.Release(task.Weight)
return process(ctx, task)
})
}
return g.Wait()

💡 Можно вместо semaphore использовать g.SetLimit(...), но если веса разные — нужен semaphore.


Кеш: key=user:42, TTL=60s истёк в T=0.
T=0.001: 1000 requests за user:42 одновременно.
Каждая goroutine видит cache miss.
1000 идентичных DB-запросов одновременно.
DB падает.

golang.org/x/sync/singleflight:

import "golang.org/x/sync/singleflight"
var group singleflight.Group
func getUser(ctx context.Context, id int64) (*User, error) {
key := fmt.Sprintf("user:%d", id)
v, err, _ := group.Do(key, func() (any, error) {
// только один goroutine выполнит это
return loadFromDB(ctx, id)
})
if err != nil {
return nil, err
}
return v.(*User), nil
}

При 1000 одновременных вызовах getUser(ctx, 42):

  • Один выполнит loadFromDB.
  • 999 заблокируются и получат тот же результат.
type CachedStore struct {
cache *ristretto.Cache
g singleflight.Group
db *sql.DB
}
func (s *CachedStore) Get(ctx context.Context, id int64) (*User, error) {
key := fmt.Sprintf("user:%d", id)
if v, ok := s.cache.Get(key); ok {
return v.(*User), nil
}
v, err, _ := s.g.Do(key, func() (any, error) {
// двойная проверка (другая goroutine могла уже загрузить)
if v, ok := s.cache.Get(key); ok {
return v, nil
}
u, err := loadFromDB(ctx, id)
if err != nil {
return nil, err
}
s.cache.Set(key, u, 1)
return u, nil
})
if err != nil {
return nil, err
}
return v.(*User), nil
}
ch := group.DoChan(key, fn)
select {
case res := <-ch:
return res.Val.(*User), res.Err
case <-ctx.Done():
return nil, ctx.Err()
}

⚠️ Даже если context cancelled у вас, fn продолжит выполняться — другие waiters ждут.

group.Forget(key) // forces next Do() to re-execute fn

Используется после получения ошибки, чтобы не кэшировать неудачу:

v, err, _ := g.Do(key, fn)
if err != nil {
g.Forget(key) // re-execute next time
return nil, err
}
v, err, shared := group.Do(key, fn)
// shared == true, если этот результат разделён с другими waiters

Полезно для метрик (count deduplicated requests).

type Group struct {
mu sync.Mutex
m map[string]*call
}
type call struct {
wg sync.WaitGroup
val any
err error
dups int // count waiters
chans []chan<- Result
}
  • При первом Do(key, fn) создаётся *call, wg.Add(1), запускается fn.
  • При втором Do(key, …) — находит существующий call, dups++, wg.Wait().
  • Когда fn завершается — wg.Done(), broadcasting через wg.

Если данные изменились во время in-flight запроса, все waiters получат старый результат. Это особенность дедупликации — не корректность.

Если fn выполняется 30 секунд, все waiters ждут 30 секунд (даже если их contexts cancelled). Используйте timeout внутри fn, не полагайтесь на caller context.


Backpressure — давление обратно от медленного consumer’а на producer’а. Без backpressure:

  • Producer заполняет очередь быстрее, чем consumer разбирает.
  • Очередь растёт → OOM.
  • Latency бесконечно растёт.
queue := make(chan Event, 1000)
// producer
queue <- event // блокируется, если queue full
// consumer
for ev := range queue {
process(ev)
}

Producer блокируется → давление идёт обратно (callers producer’а замедляются).

Если нельзя блокировать producer’а (например, real-time events), дроп:

// non-blocking send, drop on full
select {
case queue <- event:
// sent
default:
droppedCounter.Inc()
}

FIFO drop (старое выбрасывается):

type FIFODropQueue struct {
mu sync.Mutex
q []Event
cap int
}
func (q *FIFODropQueue) Push(e Event) {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.q) >= q.cap {
q.q = q.q[1:] // drop oldest
}
q.q = append(q.q, e)
}

LIFO drop (новое выбрасывается) — выше под select { default: }.

Java-стиль: consumer запрашивает N элементов (request(N)), producer не превышает. В Go реализуется как chan T + сигнальный канал chan int:

type ReactiveQueue struct {
out chan Event
request chan int
}
// consumer
go func() {
rq.request <- 10 // request 10
for i := 0; i < 10; i++ {
ev := <-rq.out
process(ev)
}
rq.request <- 10 // request more
}()
// producer
go func() {
var credit int
for {
select {
case n := <-rq.request:
credit += n
default:
if credit > 0 {
ev := generate()
rq.out <- ev
credit--
}
}
}
}()

HTTP/2 имеет WINDOW_UPDATE: receiver send’ит, сколько байт он готов принять. Sender ограничен этим окном.

Receiver: window = 65535
Sender: data 16384 → window = 49151
Sender: data 16384 → window = 32767
...
Sender: data 16384 → window = 0 → block
Receiver: process 32768 bytes → WINDOW_UPDATE(+32768)
Sender: window = 32768 → unblock

Это и есть token-based flow control: каждый байт «потребляет» токен, receiver выдаёт новые.

В Go gRPC client/server делают это автоматически через MaxRecvMsgSize, InitialWindowSize, etc.

queue := make(chan Event) // нет capacity, но не unbounded — это **synchronous**
queue := make(chan Event, math.MaxInt) // unbounded — OOM bomb

Никогда не используйте «огромные» capacity в надежде, что хватит. Заведите Drop policy.

for ev := range events {
go process(ev) // ⚠️ каждое событие — новая goroutine
}

Если producer быстрее consumer’ов — миллионы goroutines, OOM.

Решение: worker pool с bounded queue:

workers := 10
queue := make(chan Event, 100)
for i := 0; i < workers; i++ {
go func() {
for ev := range queue {
process(ev)
}
}()
}

Logger пишет 100k events/sec, network upstream 50k/sec.

type Shipper struct {
queue chan Event
drops atomic.Uint64
}
func (s *Shipper) Log(e Event) {
select {
case s.queue <- e:
default:
s.drops.Add(1) // overflow drop
}
}

При перегрузе — drop, метрика drops экспортится в Prometheus, alerting.


Концепция:

  • В ведре есть burst максимум токенов.
  • Каждые 1/rate секунд добавляется 1 токен.
  • Request «расходует» 1 токен.
  • Если токенов нет — request rejected или ждёт.
rate = 10 req/sec, burst = 20:
─── 20 токенов сразу ─── burst: можно сделать 20 в одну миллисекунду
─── затем 1 токен / 100ms ─── steady state
import "golang.org/x/time/rate"
lim := rate.NewLimiter(rate.Limit(10), 20) // 10 req/sec, burst 20
// non-blocking check
if !lim.Allow() {
http.Error(w, "Too Many Requests", 429)
return
}
// blocking wait (respects ctx)
if err := lim.Wait(ctx); err != nil {
return err
}
// reserve (advanced)
r := lim.Reserve()
if !r.OK() {
return ErrLimit
}
time.Sleep(r.Delay())
type Limiter struct {
mu sync.Mutex
limit Limit // req/sec
burst int
tokens float64 // accumulated
last time.Time
// ...
}

При каждом Allow/Reserve:

  1. Lock.
  2. tokens += (now - last) * limit.
  3. Cap at burst.
  4. Если tokens >= 1tokens--, return true.
  5. Иначе вернуть false (Allow) или Delay (Reserve).

Это классический token bucket, не leaky bucket — он позволяет «burst», накопленные за период тишины.

Альтернатива: вместо «копящегося» резервуара, leaky bucket выпускает по 1 токену в фиксированном темпе. Burst невозможен. В Go реже используется, потому что rate.Limiter уже хорош.

leaky:
rate = 10 req/sec → 1 req / 100ms exactly
no burst: 11-й request в первые 100ms — reject

Считаем requests в окне 1 секунды; если их > N — reject.

type SlidingWindow struct {
mu sync.Mutex
window time.Duration
limit int
times []time.Time
}
func (s *SlidingWindow) Allow() bool {
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
cutoff := now.Add(-s.window)
// удалить устаревшие
i := 0
for i < len(s.times) && s.times[i].Before(cutoff) {
i++
}
s.times = s.times[i:]
if len(s.times) >= s.limit {
return false
}
s.times = append(s.times, now)
return true
}

Минусы: O(N) память на N requests за окно. Используется только при малых N.

Один Limiter на N инстансов — нужно общее хранилище.

-- Redis Lua: token bucket
local key = KEYS[1]
local rate = tonumber(ARGV[1])
local burst = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local data = redis.call("HMGET", key, "tokens", "last")
local tokens = tonumber(data[1] or burst)
local last = tonumber(data[2] or now)
local elapsed = now - last
tokens = math.min(burst, tokens + elapsed * rate)
local allowed = 0
if tokens >= 1 then
tokens = tokens - 1
allowed = 1
end
redis.call("HMSET", key, "tokens", tokens, "last", now)
redis.call("EXPIRE", key, 60)
return allowed

В Go:

result, err := redisClient.Eval(ctx, luaScript, []string{"rl:" + userID},
rate, burst, time.Now().Unix()).Result()
type MultiLimiter struct {
mu sync.Mutex
limiters map[string]*rate.Limiter
rate rate.Limit
burst int
}
func (m *MultiLimiter) Get(key string) *rate.Limiter {
m.mu.Lock()
defer m.mu.Unlock()
if lim, ok := m.limiters[key]; ok {
return lim
}
lim := rate.NewLimiter(m.rate, m.burst)
m.limiters[key] = lim
return lim
}

⚠️ Без cleanup — m.limiters будет расти бесконечно. Используйте LRU/TTL.

func RateLimitMiddleware(lim *rate.Limiter) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !lim.Allow() {
w.Header().Set("Retry-After", "1")
http.Error(w, "Too Many Requests", 429)
return
}
next.ServeHTTP(w, r)
})
}
}

Структурированная concurrency (Nathaniel J. Smith, “Notes on structured concurrency, or: Go statement considered harmful”, 2018) — идея, что каждая parallel задача должна иметь scope, и dispatcher не возвращается, пока все дети не закончились.

В Python это TaskGroup (3.11+), в Kotlin — coroutineScope, в Swift — async let, в Trio — nursery.

function run() {
scope {
spawn taskA()
spawn taskB()
} // scope не выходит, пока taskA и taskB не закончатся
// ↑ единственная точка выхода
}

Go go statement — unstructured: после go f() управление возвращается, а f живёт независимо. Можно создать дитя goroutine из любого места, и его scope не привязан к родителю.

func handle(req *Request) {
go cleanup(req) // утечка: cleanup может выполняться вечно
return
}

6.3. errgroup как «базовая» структурированная concurrency

Заголовок раздела «6.3. errgroup как «базовая» структурированная concurrency»
func process(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
g.Go(taskA)
g.Go(taskB)
return g.Wait() // ← единственная точка выхода
}

errgroup навязывает scope: Wait() блокируется до завершения всех Go. Это де-факто structured concurrency в Go.

  1. Всегда передавайте ctx в goroutines.
  2. Всегда ассоциируйте goroutines с группой (errgroup/WaitGroup).
  3. Никогда не делайте go fn() без знания, когда fn завершится (fire-and-forget — антипаттерн).
  4. Если нужен fire-and-forget — заведите supervisor с контекстом приложения.
// ПЛОХО
func handle(req *Request) {
go log(req) // leak: что если log упадёт, panic'нет?
}
// ХОРОШО
func handle(req *Request) {
select {
case logQueue <- req:
default:
droppedLogs.Inc()
}
}
// log goroutine управляется supervisor'ом, имеет defer recover, известен lifecycle
type App struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func (a *App) Start(name string, fn func(context.Context) error) {
a.wg.Add(1)
go func() {
defer a.wg.Done()
defer func() {
if r := recover(); r != nil {
log.Printf("panic in %s: %v", name, r)
}
}()
if err := fn(a.ctx); err != nil && !errors.Is(err, context.Canceled) {
log.Printf("%s exited: %v", name, err)
}
}()
}
func (a *App) Shutdown() {
a.cancel()
a.wg.Wait()
}

Go 1.24 представил testing/synctest (изначально как experimental) — пакет для тестирования concurrent кода с виртуальным временем.

import "testing/synctest"
func TestTimeout(t *testing.T) {
synctest.Run(func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
time.Sleep(5 * time.Second)
// do something
}()
// wait for all goroutines to be idle
synctest.Wait()
// tick virtual time
// (in Go 1.25 stable form)
})
}

Виртуальное время позволяет:

  • Тесты с time.Sleep(1 * time.Hour) выполняются мгновенно.
  • Детерминированные scheduling decisions.
  • Нет flaky tests из-за timing.
Окно терминала
go test -race ./...
go build -race

Race detector использует ThreadSanitizer (TSan): инструментирует memory accesses, фиксирует happens-before. Если найден doat race — выводит stack trace.

Когда работает:

  • Обнаруживает фактически произошедшие races.
  • Не доказывает отсутствие races (может пропустить редкие).

Когда не работает:

  • Production (~10× overhead).
  • Race должен реально случиться в test run.

💡 Always run CI с -race.

Вместо time.Sleep(100 * time.Millisecond) для синхронизации — используйте channels:

// плохо
func TestWorker(t *testing.T) {
w := startWorker()
w.Submit(task)
time.Sleep(100 * time.Millisecond) // flaky!
if !task.Done() { t.Fatal(...) }
}
// хорошо
func TestWorker(t *testing.T) {
done := make(chan struct{})
w := startWorkerWith(func() { close(done) })
w.Submit(task)
select {
case <-done:
case <-time.After(time.Second):
t.Fatal("timeout")
}
}

testify/suite для setup/teardown:

type WorkerSuite struct {
suite.Suite
w *Worker
}
func (s *WorkerSuite) SetupTest() {
s.w = NewWorker()
}
func (s *WorkerSuite) TearDownTest() {
s.w.Shutdown()
}
func (s *WorkerSuite) TestDoSomething() {
s.w.Submit(task)
s.Eventually(func() bool { return task.Done() }, time.Second, 10*time.Millisecond)
}
func TestWorkerSuite(t *testing.T) {
suite.Run(t, new(WorkerSuite))
}
func TestConcurrentMapStress(t *testing.T) {
if testing.Short() {
t.Skip("stress test")
}
m := NewConcurrentMap()
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < 1000; j++ {
m.Set(fmt.Sprintf("k%d", i*j%100), i)
m.Get(fmt.Sprintf("k%d", i*j%100))
}
}(i)
}
wg.Wait()
}

Запуск:

Окно терминала
go test -race -count=100 -run TestConcurrentMapStress

new(errgroup.Group) — не отменяет goroutines при ошибке. Используйте errgroup.WithContext.

В Go < 1.22 — нужно i := i внутри loop. С Go 1.22+ переменные loop ограничены iteration scope автоматически.

// Go 1.22+
for _, url := range urls {
g.Go(func() error { return fetch(url) }) // OK
}
// до 1.22
for _, url := range urls {
url := url
g.Go(func() error { return fetch(url) })
}

⚠️ 8.3. errgroup.Wait — единственный сигнал завершения

Заголовок раздела «⚠️ 8.3. errgroup.Wait — единственный сигнал завершения»

Не вызывайте g.Wait() несколько раз — нет защиты. И не пытайтесь читать g.err напрямую (приватное).

sem.Acquire(ctx, 5)
defer sem.Release(3) // ⚠️ leak 2 единицы

Всегда release тот же вес, что acquire.

sem := semaphore.NewWeighted(10)
sem.Acquire(ctx, 20) // никогда не вернётся

Если fn 30 сек, все waiters ждут 30 сек, даже если их ctx cancelled. Реализуйте timeout внутри fn.

Только context первого caller’а проходит в fn. Все остальные ждут в обход своих ctx.

group.Forget(key) — race с in-flight Do для того же key. После Forget новые Do запустят повторное выполнение.

NewLimiter(10, 0) — лимит 10 rps, но burst 0 → ничего не пропускает (нет накопленных токенов). Минимум burst = 1.

lim.Wait(ctx) блокируется. Если тысячи requests — тысячи goroutines park’нутся. Лучше Allow() + return 429.

Map[user]*Limiter без cleanup → растёт бесконечно. Используйте LRU или TTL.

Drop OK для metrics/logs. Не OK для финансовых событий — там нужна durable очередь (Kafka, NATS).

ch := make(chan Event, math.MaxInt32) // 2 миллиарда событий в памяти

Если функция в g.Go() паникует — паника не ловится. Process крашится. Заверните в defer recover, если важно:

g.Go(func() (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic: %v", r)
}
}()
return doSomething()
})

Любой time.Sleep в тесте — потенциальный flake. Используйте synctest, channels, s.Eventually(...) (testify).


errgroup.Group{}: ~100ns на Go+Wait
WaitGroup{}: ~50ns на Add+Done+Wait

Накладные расходы errgroup мизерные. Используйте всегда.

buffered chan (cap=100): 50 ns/op
semaphore.Acquire(1): 120 ns/op

Buffered chan быстрее для uniform weight. semaphore — для weighted.

Без singleflight:
100 concurrent Get → 100 DB queries → 100×50ms = 5 sec total
С singleflight:
100 concurrent Get → 1 DB query → 50ms
lim.Allow(): 30 ns/op (low contention)
100 ns/op (high contention, mutex)

До singleflight (e-commerce checkout):

  • p99 latency: 800ms (cache miss → 100 concurrent DB queries)
  • DB CPU: 80%

После:

  • p99 latency: 80ms (cache miss → 1 DB query)
  • DB CPU: 25%

5000 req/hour per user — token bucket, burst 100.

type APIRateLimiter struct {
mu sync.Mutex
users map[string]*rate.Limiter
}
func (a *APIRateLimiter) Allow(userID string) bool {
a.mu.Lock()
lim, ok := a.users[userID]
if !ok {
lim = rate.NewLimiter(rate.Limit(5000.0/3600.0), 100)
a.users[userID] = lim
}
a.mu.Unlock()
return lim.Allow()
}
// 1: shared chan + N workers (simple, recommended)
ch := make(chan Task, 100)
for i := 0; i < N; i++ {
go worker(ch)
}
// 2: errgroup.SetLimit (recommended Go 1.20+)
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(N)
for _, t := range tasks {
t := t
g.Go(func() error { return process(ctx, t) })
}
g.Wait()

Производительность одинакова, errgroup версия — меньше кода.


golang.org/x/sync/errgroup — Wait group + первая ошибка + cancellation. WithContext отменяет ctx при первой ошибке. g.Go() — стартует, g.Wait() — блокирует до завершения всех.

WaitGroup не агрегирует ошибки. errgroup автоматизирует: первая ошибка, cancel context, опциональный лимит concurrency (Go 1.20+ SetLimit).

SetLimit(n) — лимит concurrent goroutines через internal semaphore. TryGo(f) — non-blocking версия Go, возвращает false при отсутствии слота.

10.4. Что произойдёт, если g.Wait() вызвать после ошибки в g.Go?

Заголовок раздела «10.4. Что произойдёт, если g.Wait() вызвать после ошибки в g.Go?»

Wait() дождётся завершения всех goroutines (даже после первой ошибки — мы ждём остальных). Вернёт первую ошибку. Остальные ошибки игнорируются.

Паника не ловится, процесс крашится. Используйте defer recover внутри функции.

Counting semaphore с произвольным весом. Acquire(ctx, n) ждёт n единиц. Release(n) возвращает. FIFO fairness.

Buffered chan — uniform weight. semaphore — weighted (разные задачи берут разный объём). semaphore нативно поддерживает ctx в Acquire.

GPU/memory limits с переменными requirements, лимит subprocess’ов, ограничение DB connections с разными priorities.

Одновременный истечение кеша + N concurrent requests → N идентичных backend-запросов одновременно → перегрузка downstream.

Дедуплицирует одинаковые в-полёте запросы по ключу. Только один выполняется, остальные ждут результат. group.Do(key, fn) — все 1000 waiters получат тот же результат.

Do — synchronous (блокирует caller). DoChan — возвращает channel, можно использовать с select+ctx.

Удаляет in-flight call из map. Следующий Do(key, fn) повторно выполнит fn. Используется после получения ошибки.

Long-running fn блокирует waiters даже если их ctx cancelled. Все waiters получают тот же результат, даже если данные могли измениться.

Давление обратно от медленного consumer’а на producer. Без backpressure — растущая очередь и OOM.

  1. Bounded queue (буферизованный chan) — блок producer’а.
  2. Drop policy (LIFO/FIFO) — выкидывание событий.
  3. Token-based flow control (HTTP/2, gRPC).
  4. Reactive Streams (request-N).

Token bucket: токены копятся, burst разрешён. Leaky bucket: постоянный rate, burst невозможен. token bucket — golang.org/x/time/rate.

NewLimiter(rate, burst). Методы: Allow() (non-blocking bool), Wait(ctx) (block), Reserve() (advanced: получить Delay).

Token bucket. При каждом Allow: добавляются tokens = (now - last) * rate (cap at burst). Если tokens >= 1 — tokens—, return true. Mutex защищает state.

Map[user]*Limiter растёт. Используйте LRU/TTL. Или sliding window + Redis для distributed.

Redis + Lua скрипт (атомарный token bucket update). Или centralized service. Локальный rate.Limiter не подходит для N инстансов.

Каждая parallel задача имеет scope; точка выхода ждёт всех детей. Python TaskGroup, Kotlin coroutineScope. В Go нет встроенного, но errgroup + ctx — рабочий аналог.

go — unstructured: fn живёт независимо от родителя. Можно «потерять» goroutine. Structured concurrency запрещает orphan’ы.

  • Никто не отслеживает завершение.
  • Не отменяемы.
  • Паники не пойманы.
  • Утечки goroutine.

Используйте supervisor pattern с ctx и WaitGroup.

Go 1.24+ пакет для тестов с виртуальным временем. synctest.Run(fn) запускает fn, synctest.Wait() ждёт пока все goroutines idle, time.Sleep работает с виртуальным временем.

-race flag активирует ThreadSanitizer instrumentation. Находит data races во время выполнения. ~10× CPU overhead, не для production.

  • Channels вместо time.Sleep для синхронизации.
  • testify Eventually для условий.
  • testing/synctest для виртуального времени.
  • go test -count=100 для проверки стабильности.
ch := make(chan Task, queueSize)
for i := 0; i < workers; i++ {
go func() { for t := range ch { process(t) } }()
}

Или errgroup.SetLimit. Bounded queue даёт backpressure.

  • Per-user + global (защита от abuse + от total overload).
  • Per-endpoint (write-heavy endpoints — отдельный лимит).
  • Read vs write (DB write quota меньше read quota).

errgroup — для агрегации ошибок и cancel. semaphore — для weighted concurrency. Если задачи весят по-разному — нужны обе.

Token bucket — простой, дешёвый, поддерживает burst. Sliding window — точнее на коротких интервалах, но O(N) память. Token bucket в 95% случаев.


Реализовать FetchAll(ctx, urls, limit) — параллельная загрузка с лимитом concurrency через errgroup.SetLimit. Тесты на cancel и aggregate error.

Worker pool, где каждая задача имеет вес (CPU cores). Через semaphore.Weighted распределить так, чтобы суммарный вес не превышал N.

Cache-aside для DB-запросов с singleflight protection. Замерить latency и DB QPS до/после под нагрузкой 1000 RPS.

Реализовать MultiUserLimiter с per-user rate.Limiter, LRU cleanup до 10k активных пользователей.

Log shipper с bounded queue (drop policy) и Prometheus метрикой dropped_logs_total.

Token bucket в Redis (Lua script). Go клиент с fallback на локальный rate.Limiter при недоступности Redis.

Application supervisor: app.Start(name, fn) запускает goroutine с recover, log на падении. app.Shutdown() — graceful cancel всем.

Написать тест для функции с time.After(1*time.Hour) через testing/synctest (Go 1.24+) — тест должен выполняться < 1ms.


  1. golang.org/x/sync/errgrouphttps://pkg.go.dev/golang.org/x/sync/errgroup.
  2. golang.org/x/sync/semaphorehttps://pkg.go.dev/golang.org/x/sync/semaphore.
  3. golang.org/x/sync/singleflighthttps://pkg.go.dev/golang.org/x/sync/singleflight.
  4. golang.org/x/time/ratehttps://pkg.go.dev/golang.org/x/time/rate (token bucket).
  5. Nathaniel J. Smith, “Notes on structured concurrency”https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/.
  6. Go 1.24 release notes — testing/synctesthttps://go.dev/doc/go1.24 (раздел testing/synctest).
  7. Marc Brooker, “Exponential Backoff And Jitter” — AWS Architecture Blog, для rate limiting wisdom.
  8. gRPC flow controlhttps://grpc.io/docs/guides/flow-control/ (token-based flow control).
  9. Redis Rate Limiting patternshttps://redis.io/learn/operate/redis-at-scale/scalability/rate-limiting (distributed token bucket Lua).
  10. The Go Programming Language (Donovan, Kernighan) глава 8 — channels, select, structured patterns.

Итог Middle 2: errgroup — стандарт для параллельных задач с ошибками; semaphore — weighted concurrency; singleflight — обязателен против cache stampede; backpressure через bounded queue + drop policy; rate limiting через token bucket (x/time/rate), distributed — Redis Lua. Структурированная concurrency в Go не встроена, но errgroup + ctx — рабочий шаблон. Тестируйте concurrent код через testing/synctest (Go 1.24+), channels, race detector в CI.