Concurrency-паттерны: production-level
Зачем знать на Middle 1: Паттерны конкурентности — это словарь для общения о коде. Worker pool, pipeline, fan-in/out, errgroup, singleflight — каждый middle обязан уметь их написать на доске и применять в коде. Это не “знание ради знания”: эти паттерны решают конкретные задачи backend-разработки: ограничить параллелизм, обработать batch с отменой, защититься от cache stampede, реализовать rate limit.
Содержание
Заголовок раздела «Содержание»- Базовая концепция (кратко)
- Под капотом и паттерны
- Gotchas
- Производительность
- Когда использовать / альтернативы
- Вопросы на собесе
- Practice
- Источники
1. Базовая концепция (кратко)
Заголовок раздела «1. Базовая концепция (кратко)»Concurrency-паттерны в Go строятся на трёх китах:
- Goroutines — дешёвый параллелизм (~2KB stack).
- Channels — типизированная связь.
- Context — отмена и timeout.
И четырёх библиотечных кирпичах:
sync(Mutex, WaitGroup, Once, Pool).golang.org/x/sync/errgroup— параллелизм с ошибками и context.golang.org/x/sync/semaphore— взвешенный semaphore.golang.org/x/sync/singleflight— дедупликация.
Основные паттерны:
- Worker pool — фиксированный N горутин обрабатывает очередь.
- Pipeline — этапы соединены каналами.
- Fan-out / fan-in — распределение и сбор.
- Or-done — отмена и cleanup.
- Semaphore — ограничение параллелизма.
- Rate limit — ограничение rate (req/s).
- Backpressure — обратное давление при медленном consumer.
- Pub-Sub — broadcast in-process.
- Future / Promise — асинхронный результат.
2. Под капотом (детально)
Заголовок раздела «2. Под капотом (детально)»2.1. Worker pool с буферизованным каналом
Заголовок раздела «2.1. Worker pool с буферизованным каналом»Базовая идея: создаём N постоянных горутин-воркеров, они читают из общего канала задач.
package workerpool
import ( "context" "sync")
type Job func(ctx context.Context)
type Pool struct { jobs chan Job wg sync.WaitGroup ctx context.Context cancel context.CancelFunc}
func New(ctx context.Context, n int, queueSize int) *Pool { ctx, cancel := context.WithCancel(ctx) p := &Pool{ jobs: make(chan Job, queueSize), ctx: ctx, cancel: cancel, } p.wg.Add(n) for i := 0; i < n; i++ { go p.worker(i) } return p}
func (p *Pool) worker(id int) { defer p.wg.Done() for { select { case <-p.ctx.Done(): return case job, ok := <-p.jobs: if !ok { return } job(p.ctx) } }}
// Submit добавляет задачу. Блокируется если очередь полна.func (p *Pool) Submit(j Job) error { select { case <-p.ctx.Done(): return p.ctx.Err() case p.jobs <- j: return nil }}
// TrySubmit non-blocking.func (p *Pool) TrySubmit(j Job) bool { select { case p.jobs <- j: return true default: return false }}
// Stop graceful: закрывает jobs, ждёт воркеров.func (p *Pool) Stop() { close(p.jobs) p.wg.Wait()}
// Shutdown force: отменяет контекст.func (p *Pool) Shutdown() { p.cancel() p.wg.Wait()}ASCII-схема:
Submit │ ▼ ┌─────────────┐ │ jobs chan │ ◄── buffer size = queueSize └──┬──┬──┬────┘ │ │ │ ▼ ▼ ▼ ┌─┐ ┌─┐ ┌─┐ │W│ │W│ │W│ ◄── N workers └─┘ └─┘ └─┘ │ │ │ ▼ ▼ ▼ job(ctx)Замечания:
queueSize— backpressure: если 0, Submit блокируется пока worker не свободен.Stop— graceful: воркеры доедают очередь.Shutdown— force: отмена через context.
2.2. Worker pool через errgroup
Заголовок раздела «2.2. Worker pool через errgroup»errgroup.Group (golang.org/x/sync/errgroup) — упрощает паттерн “запусти N горутин, дождись всех, верни первую ошибку”.
package main
import ( "context" "fmt"
"golang.org/x/sync/errgroup")
func ProcessAll(ctx context.Context, items []string) error { g, ctx := errgroup.WithContext(ctx) g.SetLimit(10) // максимум 10 параллельных
for _, item := range items { item := item // capture g.Go(func() error { return process(ctx, item) }) } return g.Wait() // блокирует, возвращает первую ошибку}
func process(ctx context.Context, item string) error { select { case <-ctx.Done(): return ctx.Err() default: } // ... работа return nil}Что делает errgroup:
- WithContext возвращает производный ctx; cancel’ится при первой ошибке.
- Go(f) — стартует горутину.
- SetLimit(N) (Go 1.18+ в errgroup) — semaphore на N concurrent.
- TryGo(f) — non-blocking запуск (если semaphore полон, возвращает false).
- Wait() — ждёт все горутины, возвращает первую non-nil ошибку.
Реализация SetLimit:
func (g *Group) SetLimit(n int) { if n < 0 { g.sem = nil return } if len(g.sem) != 0 { panic("errgroup: modify limit while goroutines in flight") } g.sem = make(chan token, n)}
func (g *Group) Go(f func() error) { if g.sem != nil { g.sem <- token{} // acquire (блокирует если полон) } g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel(g.err) } }) } }()}2.3. Pipeline
Заголовок раздела «2.3. Pipeline»Этапы обработки соединены каналами. Каждый этап — горутина (или несколько).
package pipeline
import "context"
// generator: int → chan intfunc gen(ctx context.Context, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case <-ctx.Done(): return case out <- n: } } }() return out}
// stage1: squarefunc sq(ctx context.Context, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case <-ctx.Done(): return case out <- n * n: } } }() return out}
// stage2: filter evenfunc filterEven(ctx context.Context, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { if n%2 != 0 { continue } select { case <-ctx.Done(): return case out <- n: } } }() return out}
// Использованиеfunc main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel()
nums := gen(ctx, 1, 2, 3, 4, 5) squared := sq(ctx, nums) evens := filterEven(ctx, squared)
for v := range evens { println(v) // 4, 16 }}ASCII:
[gen]──chan──►[sq]──chan──►[filterEven]──chan──► consumerПравила pipeline:
- Каждый этап запускает горутину, читает входной chan, пишет в выходной.
- Outbound chan закрывает тот, кто пишет.
- Context для отмены — каждый стейдж проверяет
ctx.Done(). - Закрытие inbound триггерит range exit → close outbound → каскад.
2.4. Fan-out, fan-in
Заголовок раздела «2.4. Fan-out, fan-in»Fan-out: один генератор → N воркеров. Fan-in: N каналов → один.
// fan-out: stage с N параллельнымиfunc fanOutSq(ctx context.Context, in <-chan int, n int) []<-chan int { outs := make([]<-chan int, n) for i := 0; i < n; i++ { outs[i] = sq(ctx, in) // несколько goroutines читают тот же in } return outs}
// fan-in: merge нескольких в одинfunc merge[T any](ctx context.Context, channels ...<-chan T) <-chan T { out := make(chan T) var wg sync.WaitGroup wg.Add(len(channels)) for _, c := range channels { c := c go func() { defer wg.Done() for v := range c { select { case <-ctx.Done(): return case out <- v: } } }() } go func() { wg.Wait() close(out) }() return out}Использование:
in := gen(ctx, 1, 2, 3, ..., 100)outs := fanOutSq(ctx, in, 8) // 8 воркеров делят работуmerged := merge(ctx, outs...) // собираем в один каналfor v := range merged { /* ... */ }ASCII:
┌──[sq]──┐ ├──[sq]──┤ [gen]──in──┼──[sq]──┼──merge──out──► consumer ├──[sq]──┤ └──[sq]──┘
Fan-out: in → 5 workers (читают параллельно)Fan-in: 5 outputs → 1 (merge)2.5. Generator паттерн
Заголовок раздела «2.5. Generator паттерн»Канал как iterator:
func Range(start, end int) <-chan int { ch := make(chan int) go func() { defer close(ch) for i := start; i < end; i++ { ch <- i } }() return ch}
for v := range Range(0, 10) { println(v)}⚠️ Если consumer не дочитал до конца — goroutine leak! Решение: context.
func Range(ctx context.Context, start, end int) <-chan int { ch := make(chan int) go func() { defer close(ch) for i := start; i < end; i++ { select { case <-ctx.Done(): return case ch <- i: } } }() return ch}В Go 1.23 появился стандартный iter.Seq/iter.Seq2 — для итерации range-функциями. Но для concurrent producer всё ещё используют каналы.
2.6. Or-done channel
Заголовок раздела «2.6. Or-done channel»Универсальный паттерн для отмены:
func orDone[T any](done <-chan struct{}, c <-chan T) <-chan T { out := make(chan T) go func() { defer close(out) for { select { case <-done: return case v, ok := <-c: if !ok { return } select { case <-done: return case out <- v: } } } }() return out}
// Использованиеfor v := range orDone(done, dataCh) { // не нужно select в каждом месте}Идея: вместо того чтобы каждый consumer писал select { case <-done: ... case v := <-c: ... }, упаковываем в orDone и используем обычный range.
С context.Context это:
func ctxRange[T any](ctx context.Context, c <-chan T) <-chan T { return orDone(ctx.Done(), c)}2.7. Or-channel: combine cancel signals
Заголовок раздела «2.7. Or-channel: combine cancel signals»Совмещение N сигналов отмены в один:
func or(channels ...<-chan struct{}) <-chan struct{} { switch len(channels) { case 0: return nil case 1: return channels[0] } out := make(chan struct{}) go func() { defer close(out) switch len(channels) { case 2: select { case <-channels[0]: case <-channels[1]: } default: select { case <-channels[0]: case <-channels[1]: case <-channels[2]: case <-or(append(channels[3:], out)...): } } }() return out}В современном коде вместо or — context.WithCancel композиция.
2.8. Tee pattern
Заголовок раздела «2.8. Tee pattern»Один входной канал → два выходных, в каждый идёт каждое значение:
func tee[T any](done <-chan struct{}, in <-chan T) (<-chan T, <-chan T) { out1 := make(chan T) out2 := make(chan T) go func() { defer close(out1) defer close(out2) for v := range in { var a, b = out1, out2 for i := 0; i < 2; i++ { select { case <-done: return case a <- v: a = nil // disable case b <- v: b = nil } } } }() return out1, out2}Trick: обнуляем переменную → case с nil каналом никогда не сработает. Это идиома Go select.
2.9. Semaphore через буферизованный канал
Заголовок раздела «2.9. Semaphore через буферизованный канал»type Sem chan struct{}
func NewSem(n int) Sem { return make(chan struct{}, n)}
func (s Sem) Acquire() { s <- struct{}{}}
func (s Sem) Release() { <-s}
func (s Sem) TryAcquire() bool { select { case s <- struct{}{}: return true default: return false }}Использование:
sem := NewSem(10)for _, item := range items { sem.Acquire() go func(item Item) { defer sem.Release() process(item) }(item)}2.10. semaphore.Weighted (x/sync)
Заголовок раздела «2.10. semaphore.Weighted (x/sync)»Когда нужны “веса” — операции с разным стоимостью:
import "golang.org/x/sync/semaphore"
sem := semaphore.NewWeighted(100) // 100 unit-ов capacity
// Тяжёлая операция (вес 50)if err := sem.Acquire(ctx, 50); err != nil { return err}defer sem.Release(50)
// Лёгкая (вес 1)if err := sem.Acquire(ctx, 1); err != nil { return err}defer sem.Release(1)Внутри — Mutex + waitlist. Acquire ждёт пока в bucket’е достаточно place.
2.11. singleflight: дедупликация
Заголовок раздела «2.11. singleflight: дедупликация»import "golang.org/x/sync/singleflight"
var sf singleflight.Group
func GetUser(id string) (*User, error) { v, err, shared := sf.Do(id, func() (any, error) { // эта функция выполняется ТОЛЬКО раз для каждого id, // даже если 100 горутин одновременно зовут GetUser("123"). return db.QueryUser(id) }) if err != nil { return nil, err } _ = shared // true если несколько горутин получили результат return v.(*User), nil}Use case: cache stampede protection. 1000 одновременных запросов за одним ключом → один реальный DB query. Остальные 999 ждут результат первого.
Альтернатива: sf.DoChan(key, fn) — non-blocking, возвращает channel с результатом.
⚠️ Если первый запрос медленный — все остальные тоже ждут. Это компромисс: меньше нагрузки на backend vs больше latency для отдельного запроса.
2.12. Rate limiting: token bucket
Заголовок раздела «2.12. Rate limiting: token bucket»import "golang.org/x/time/rate"
// 100 req/sec, burst 200limiter := rate.NewLimiter(rate.Limit(100), 200)
func handler(w http.ResponseWriter, r *http.Request) { if !limiter.Allow() { http.Error(w, "rate limit", http.StatusTooManyRequests) return } // ...}
// Или с ожиданием:if err := limiter.Wait(ctx); err != nil { return err // ctx cancelled}Алгоритм token bucket:
- Bucket с capacity
burstтокенов. - Пополняется со скоростью
rateтокенов/сек. - Каждый запрос забирает 1 (или N) токен.
- Если токенов нет — отказ или wait.
ASCII:
drip rate=100/s │ ▼ ┌──────────┐ [tokens] │░░░░░░░░░░│ burst=200 │░░░░░░░░░░│ └────┬─────┘ │ ▼ Allow() забирает 1 [Request OK?]2.13. Leaky bucket (через time.Tick)
Заголовок раздела «2.13. Leaky bucket (через time.Tick)»type LeakyBucket struct { capacity int bucket chan struct{} ticker *time.Ticker}
func NewLeakyBucket(cap int, leakInterval time.Duration) *LeakyBucket { lb := &LeakyBucket{ capacity: cap, bucket: make(chan struct{}, cap), ticker: time.NewTicker(leakInterval), } go func() { for range lb.ticker.C { select { case <-lb.bucket: // leak default: } } }() return lb}
func (lb *LeakyBucket) Allow() bool { select { case lb.bucket <- struct{}{}: return true default: return false }}
func (lb *LeakyBucket) Close() { lb.ticker.Stop()}Разница: token bucket позволяет burst, leaky — нет. Leaky — для smooth output rate.
2.14. Backpressure
Заголовок раздела «2.14. Backpressure»Принцип: producer должен замедляться, когда consumer медленнее. Способы:
1. Unbuffered channel — natural backpressure:
ch := make(chan Item) // sender блокируется2. Bounded buffered channel — backpressure при заполнении:
ch := make(chan Item, 100) // sender блокируется при 100 items**3. Drop:
select {case ch <- item:default: // drop}**4. Block с timeout:
select {case ch <- item:case <-time.After(timeout): return ErrTimeout}⚠️ Антипаттерн: огромный буфер. make(chan T, 1<<20) — это память + latency, не решает проблему.
2.15. Graceful shutdown
Заголовок раздела «2.15. Graceful shutdown»func main() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel()
srv := &http.Server{Addr: ":8080", Handler: mux}
go func() { if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatal(err) } }()
<-ctx.Done() log.Println("shutdown signal received")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := srv.Shutdown(shutdownCtx); err != nil { log.Printf("forced shutdown: %v", err) }}signal.NotifyContext (Go 1.16+) — context, который cancel’ится при SIGINT/SIGTERM. srv.Shutdown ждёт active requests до timeout.
2.16. Pub-Sub in-process
Заголовок раздела «2.16. Pub-Sub in-process»type Topic[T any] struct { mu sync.RWMutex subs map[chan T]struct{}}
func NewTopic[T any]() *Topic[T] { return &Topic[T]{subs: make(map[chan T]struct{})}}
func (t *Topic[T]) Subscribe(bufSize int) (<-chan T, func()) { ch := make(chan T, bufSize) t.mu.Lock() t.subs[ch] = struct{}{} t.mu.Unlock() return ch, func() { t.mu.Lock() delete(t.subs, ch) close(ch) t.mu.Unlock() }}
func (t *Topic[T]) Publish(v T) { t.mu.RLock() defer t.mu.RUnlock() for ch := range t.subs { select { case ch <- v: default: // slow consumer — drop (или log) } }}Drop slow subscribers — иначе один тормозит всех. Альтернатива: каждый sub получает свою горутину-pusher.
2.17. Future / Promise
Заголовок раздела «2.17. Future / Promise»type Future[T any] struct { done chan struct{} val T err error}
func Async[T any](fn func() (T, error)) *Future[T] { f := &Future[T]{done: make(chan struct{})} go func() { defer close(f.done) f.val, f.err = fn() }() return f}
func (f *Future[T]) Get(ctx context.Context) (T, error) { select { case <-f.done: return f.val, f.err case <-ctx.Done(): var zero T return zero, ctx.Err() }}Использование:
f := Async(func() (int, error) { return computeHeavy() })// делаем что-то ещё...v, err := f.Get(ctx)⚠️ Future в Go не идиоматичен — обычно проще errgroup или горутина с каналом результата.
2.18. Done-channel idiom
Заголовок раздела «2.18. Done-channel idiom»done := make(chan struct{})go func() { defer close(done) work()}()// ... do other work ...<-done // wait for completionclose(done) как broadcast: можно <-done сколько угодно раз, всё вернётся мгновенно.
3. Gotchas
Заголовок раздела «3. Gotchas»3.1. ⚠️ Worker pool: не забывай context
Заголовок раздела «3.1. ⚠️ Worker pool: не забывай context»Без ctx.Done() в select воркеры не остановятся при cancel.
3.2. ⚠️ errgroup без SetLimit — unbounded
Заголовок раздела «3.2. ⚠️ errgroup без SetLimit — unbounded»g, _ := errgroup.WithContext(ctx)for _, item := range millionItems { g.Go(func() error { ... }) // 1M goroutines!}Всегда SetLimit(N) для batch обработки.
3.3. ⚠️ Pipeline: закрытие out выполняет тот, кто пишет
Заголовок раздела «3.3. ⚠️ Pipeline: закрытие out выполняет тот, кто пишет»Если несколько горутин пишут в один out — координируй через WaitGroup, закрывай только когда все finish.
3.4. ⚠️ Fan-in без cancel — leak при early exit
Заголовок раздела «3.4. ⚠️ Fan-in без cancel — leak при early exit»merged := merge(c1, c2, c3)for v := range merged { if cond { break // c1/c2/c3 senders могут leak'нуть! }}Используй orDone или context.
3.5. ⚠️ Generator goroutine leak
Заголовок раздела «3.5. ⚠️ Generator goroutine leak»Если consumer не дочитал — generator горутина висит в ch <- v. Всегда — context.
3.6. ⚠️ Tee: один медленный consumer тормозит другой
Заголовок раздела «3.6. ⚠️ Tee: один медленный consumer тормозит другой»o1, o2 := tee(in)// если o1 читается медленно, o2 тоже стопает (наш select зависает на o1).Решение: каждый out — со своим буфером, или отдельные горутины с back-pressure.
3.7. ⚠️ Semaphore-канал: capture in goroutine
Заголовок раздела «3.7. ⚠️ Semaphore-канал: capture in goroutine»sem := make(chan struct{}, 10)for _, item := range items { sem <- struct{}{} go func() { // ОШИБКА: item не захвачен defer func() { <-sem }() process(item) // последний item для всех! }()}Правильно — func(item Item) или item := item.
3.8. ⚠️ singleflight: одна ошибка → все получают
Заголовок раздела «3.8. ⚠️ singleflight: одна ошибка → все получают»sf.Do("key", func() (any, error) { return nil, errors.New("transient")})Все 100 параллельных вызовов получат одну ошибку. Иногда хочется чтобы каждый ретраил. Решение: подумай, нужен ли тебе singleflight на этой задаче.
3.9. ⚠️ rate.Limiter блокирует Wait при cancel?
Заголовок раздела «3.9. ⚠️ rate.Limiter блокирует Wait при cancel?»Wait(ctx) возвращает ctx.Err() при отмене. Не leak’нет.
3.10. ⚠️ time.Tick без Stop
Заголовок раздела «3.10. ⚠️ time.Tick без Stop»for range time.Tick(time.Second) { ... }time.Tick не освобождает таймер. Используй time.NewTicker + defer ticker.Stop().
3.11. ⚠️ Pub-Sub: subscriber должен unsubscribe
Заголовок раздела «3.11. ⚠️ Pub-Sub: subscriber должен unsubscribe»Если subscriber завершается, но Topic не вызвал unsubscribe — канал утекает.
3.12. ⚠️ Future: не вызывать fn в горячем пути
Заголовок раздела «3.12. ⚠️ Future: не вызывать fn в горячем пути»Async создаёт goroutine. Если ты делаешь 1M Async() на батч — это 1M goroutines. Используй pool/errgroup.
4. Производительность
Заголовок раздела «4. Производительность»4.1. Worker pool: оптимальный N
Заголовок раздела «4.1. Worker pool: оптимальный N»- CPU-bound: N = GOMAXPROCS.
- IO-bound: N большой (100-1000), зависит от downstream limits.
Profile с pprof: runtime.NumGoroutine() показывает текущий count.
4.2. errgroup.SetLimit бенчмарк
Заголовок раздела «4.2. errgroup.SetLimit бенчмарк»func BenchmarkErrgroup(b *testing.B) { for _, n := range []int{1, 10, 100, 1000} { b.Run(fmt.Sprintf("limit=%d", n), func(b *testing.B) { for i := 0; i < b.N; i++ { g, _ := errgroup.WithContext(context.Background()) g.SetLimit(n) for j := 0; j < 1000; j++ { g.Go(func() error { return nil }) } g.Wait() } }) }}Чем меньше limit — тем больше Wait в Go() (semaphore блокирует). Чем больше — тем больше goroutines одновременно.
4.3. Pipeline overhead
Заголовок раздела «4.3. Pipeline overhead»Каждый этап = 1 канал + 1 горутина. Если этап делает 100 ns работы, а канал-операция 100 ns, overhead 50%. Pipeline хорош для тяжёлых операций.
Когда pipeline ХУЖЕ:
- Работа очень дешёвая (< 1 µs).
- Этапов много (> 5).
Решение: батчинг (передавать []T вместо T).
4.4. singleflight overhead
Заголовок раздела «4.4. singleflight overhead»Один Do — это lookup в map + Mutex. ~100 ns. Очень дёшево относительно типичной операции (DB query 1-10 ms).
4.5. rate.Limiter overhead
Заголовок раздела «4.5. rate.Limiter overhead»Allow() — ~50 ns. Под высокой нагрузкой не bottleneck.
4.6. Channel batching для throughput
Заголовок раздела «4.6. Channel batching для throughput»Вместо отправки по одному:
ch <- item1ch <- item2Отправляй batch:
ch <- []Item{item1, item2}Меньше lock contention, меньше Go scheduler overhead.
5. Когда использовать / альтернативы
Заголовок раздела «5. Когда использовать / альтернативы»| Задача | Паттерн |
|---|---|
| Параллельная обработка известных N items | errgroup.WithContext + SetLimit |
| Обработка stream данных | pipeline + fan-out/in |
| Очередь работ в долго живущем процессе | worker pool |
| Ограничить параллелизм | semaphore (chan или x/sync) |
| Защита от cache stampede | singleflight |
| Ограничить rate (req/s) | rate.Limiter |
| Broadcast события | Pub-Sub или close(ch) |
| Подождать одного результата | future / chan T (size 1) |
| Graceful shutdown | signal.NotifyContext + srv.Shutdown |
| Periodic task | time.NewTicker + ctx |
Альтернативы:
Заголовок раздела «Альтернативы:»- errgroup vs WaitGroup: errgroup в новых проектах almost всегда. WaitGroup — когда нет ошибок.
- channel semaphore vs x/sync semaphore: chan быстрее для unit-веса. x/sync — для весов.
- singleflight vs cache: они композируются. cache → miss → singleflight.
6. Вопросы на собесе
Заголовок раздела «6. Вопросы на собесе»1. Реализуй worker pool на доске. См. секцию 2.1. Ключевое: chan jobs, ctx, N горутин в for-select, graceful Stop через close(jobs).
2. errgroup vs WaitGroup? errgroup += context + первая ошибка + SetLimit. WaitGroup только Add/Done/Wait, без context, без ошибок.
3. Что делает SetLimit? Создаёт internal semaphore-канал capacity N. Go() блокируется, если N горутин уже работают.
4. Что такое pipeline в Go? Цепочка этапов, соединённых каналами. Каждый этап — goroutine, читает in, пишет out. Закрытие inbound каскадирует через range.
5. Fan-out / fan-in? Fan-out: один источник → N consumer goroutines (читают один канал). Fan-in: N каналов → один (merge через WaitGroup).
6. Or-done паттерн?
Обёртка канала, который закрывается при done. Позволяет в consumer писать обычный for range вместо select.
7. Tee паттерн? Дуплицирование канала: один in → два out, каждое значение идёт в оба.
8. Реализуй semaphore через канал.
make(chan struct{}, N). Acquire — send. Release — recv. Try — select with default.
9. semaphore.Weighted зачем? Когда операции имеют вес: Acquire(ctx, weight). Полезно для пулов памяти/CPU.
10. Что делает singleflight? Дедупликирует одинаковые ключи: N параллельных Do(key, fn) → один реальный вызов fn, все получают результат.
11. Когда НЕ использовать singleflight?
- Если каждый запрос должен иметь свежие данные.
- Если ошибка одного не должна каскадировать на всех.
- Если запрос быстрый — overhead не оправдан.
12. rate.Limiter — token bucket или leaky bucket? Token bucket. Capacity = burst, refill = rate. Allow забирает 1 токен, Wait блокирует пока не появится.
13. Token bucket vs leaky bucket? Token: позволяет burst (если bucket полон). Leaky: smooth rate, не позволяет burst.
14. Что такое backpressure? Замедление producer при медленном consumer. Реализуется через unbuffered/bounded chan, drop, или timeout.
15. Реализуй graceful shutdown HTTP сервера. signal.NotifyContext → server.ListenAndServe в goroutine → <-ctx.Done → server.Shutdown с timeout.
16. Реализуй Pub-Sub in-memory. См. секцию 2.16. Topic с map[chan T]struct{}, Publish с RLock, Subscribe возвращает канал + unsubscribe func.
17. Future в Go идиоматичен?
Редко. Чаще errgroup + структура для результата, или ch := make(chan Result, 1); go func() { ch <- compute() }().
18. Как избежать leak в generator? Context.Done() в select при send. Или закрытие done канала из consumer.
19. errgroup.WithContext: что отменяется? Внутренний context cancel’ится при первой ошибке любого Go(). Все остальные горутины должны проверять ctx.Done().
20. Что вернёт g.Wait() при cancel? Первую non-nil ошибку. Если все горутины не возвращали ошибку, но ctx был cancelled — вернёт nil (горутины должны были вернуть ctx.Err() если хотели).
21. Реализуй limiter на N concurrent requests. Buffered chan size N. acquire — send, release — recv. Или x/sync semaphore.
22. Cancel в pipeline: как? Context в каждой стадии. select { case <-ctx.Done(): return; case out <- v: }.
23. Что произойдёт, если в pipeline один этап залип? Нisbalance: upstream блокируется на write (если unbuffered), downstream блокируется на read. Под backpressure обычно ок, под cancel — все должны выйти через ctx.
24. Закрытие pipeline output: кто и когда?
Тот этап, который пишет. После range in exit + close(out) через defer.
25. Worker pool: queueSize=0 vs queueSize=N? 0 — Submit блокирует пока worker свободен (sync). N — Submit не блокирует до N в очереди (буфер).
26. Чем TryGo отличается от Go в errgroup?
TryGo возвращает false, если semaphore полон (без блокировки). Go блокирует пока освободится слот.
27. Реализуй rate limiter на 100 req/sec через канал. Token bucket: канал buffered size 100, тикер раз в 10ms кидает 1 токен (учитывает overflow). Allow — TryRecv.
28. Pub-Sub с гарантией доставки? Каждому subscriber — своя горутина-pusher + bounded queue. Если queue полна — либо block sender, либо drop, либо kick subscriber.
29. Когда канал-семафор лучше Mutex? Когда нужно несколько concurrent (N>1) — Mutex даёт только 1. Канал size N это N concurrent.
30. singleflight DoChan vs Do?
Do — блокирующий, возвращает (val, err, shared). DoChan — non-blocking, возвращает chan Result. Удобно при select с timeout.
7. Practice
Заголовок раздела «7. Practice»Задача 1. Worker pool с приоритетами
Заголовок раздела «Задача 1. Worker pool с приоритетами»type Job struct { Priority int Work func()}
type PriorityPool struct { high, low chan Job wg sync.WaitGroup}
func NewPriorityPool(n int) *PriorityPool { p := &PriorityPool{ high: make(chan Job, 100), low: make(chan Job, 100), } p.wg.Add(n) for i := 0; i < n; i++ { go p.worker() } return p}
func (p *PriorityPool) worker() { defer p.wg.Done() for { // Сначала проверяем high select { case j, ok := <-p.high: if !ok { return } j.Work() continue default: } // Иначе ждём любой select { case j, ok := <-p.high: if !ok { return } j.Work() case j, ok := <-p.low: if !ok { return } j.Work() } }}
func (p *PriorityPool) Submit(j Job) { if j.Priority > 0 { p.high <- j } else { p.low <- j }}Задача 2. Rate limiter token bucket с нуля
Заголовок раздела «Задача 2. Rate limiter token bucket с нуля»type Limiter struct { rate float64 // tokens per second capacity float64
mu sync.Mutex tokens float64 last time.Time}
func NewLimiter(rate, capacity float64) *Limiter { return &Limiter{ rate: rate, capacity: capacity, tokens: capacity, last: time.Now(), }}
func (l *Limiter) Allow() bool { return l.AllowN(1)}
func (l *Limiter) AllowN(n float64) bool { l.mu.Lock() defer l.mu.Unlock() now := time.Now() elapsed := now.Sub(l.last).Seconds() l.tokens = math.Min(l.capacity, l.tokens+elapsed*l.rate) l.last = now if l.tokens < n { return false } l.tokens -= n return true}Используется в продакшен-API. Сравни с x/time/rate.Limiter — там ещё Wait и Reserve.
Задача 3. Pipeline для обработки файлов
Заголовок раздела «Задача 3. Pipeline для обработки файлов»// pipeline: filenames → []byte (read) → []byte (process) → writefunc pipeline(ctx context.Context, files []string) error { g, ctx := errgroup.WithContext(ctx)
paths := make(chan string) g.Go(func() error { defer close(paths) for _, f := range files { select { case <-ctx.Done(): return ctx.Err() case paths <- f: } } return nil })
data := make(chan []byte) // 4 readers var rwg sync.WaitGroup rwg.Add(4) for i := 0; i < 4; i++ { g.Go(func() error { defer rwg.Done() for p := range paths { b, err := os.ReadFile(p) if err != nil { return err } select { case <-ctx.Done(): return ctx.Err() case data <- b: } } return nil }) } g.Go(func() error { rwg.Wait() close(data) return nil })
// sink g.Go(func() error { for b := range data { process(b) } return nil })
return g.Wait()}Задача 4. Singleflight + LRU cache
Заголовок раздела «Задача 4. Singleflight + LRU cache»type CachedFetcher struct { cache *lru.Cache[string, []byte] sf singleflight.Group fetch func(ctx context.Context, k string) ([]byte, error)}
func (c *CachedFetcher) Get(ctx context.Context, k string) ([]byte, error) { if v, ok := c.cache.Get(k); ok { return v, nil } v, err, _ := c.sf.Do(k, func() (any, error) { return c.fetch(ctx, k) }) if err != nil { return nil, err } c.cache.Add(k, v.([]byte)) return v.([]byte), nil}Cache stampede protection + кеш. Канонический паттерн для backend.
Задача 5. Fan-out fan-in для batch обработки
Заголовок раздела «Задача 5. Fan-out fan-in для batch обработки»func batchProcess(ctx context.Context, items []int) ([]int, error) { in := make(chan int) out := make(chan int)
g, ctx := errgroup.WithContext(ctx)
// producer g.Go(func() error { defer close(in) for _, v := range items { select { case <-ctx.Done(): return ctx.Err() case in <- v: } } return nil })
// 8 workers var wg sync.WaitGroup for i := 0; i < 8; i++ { wg.Add(1) g.Go(func() error { defer wg.Done() for v := range in { r, err := process(ctx, v) if err != nil { return err } select { case <-ctx.Done(): return ctx.Err() case out <- r: } } return nil }) } g.Go(func() error { wg.Wait() close(out) return nil })
results := make([]int, 0, len(items)) for v := range out { results = append(results, v) } if err := g.Wait(); err != nil { return nil, err } return results, nil}Замечание: order не сохраняется. Если нужна order — index в struct и сортировать после.
Задача 6. Backpressure-aware producer
Заголовок раздела «Задача 6. Backpressure-aware producer»type SlowProducer struct { out chan int rate *rate.Limiter}
func (p *SlowProducer) Produce(ctx context.Context, value int) error { if err := p.rate.Wait(ctx); err != nil { return err } select { case <-ctx.Done(): return ctx.Err() case p.out <- value: return nil }}Limiter сглаживает rate, канал даёт backpressure от consumer.
Задача 7. Pub-Sub с гарантированной доставкой
Заголовок раздела «Задача 7. Pub-Sub с гарантированной доставкой»type GuaranteedTopic[T any] struct { mu sync.RWMutex subs []*sub[T]}
type sub[T any] struct { ch chan T closed atomic.Bool}
func (t *GuaranteedTopic[T]) Subscribe() (<-chan T, func()) { s := &sub[T]{ch: make(chan T, 64)} t.mu.Lock() t.subs = append(t.subs, s) t.mu.Unlock() return s.ch, func() { if s.closed.CompareAndSwap(false, true) { t.mu.Lock() for i, x := range t.subs { if x == s { t.subs = append(t.subs[:i], t.subs[i+1:]...) break } } t.mu.Unlock() close(s.ch) } }}
func (t *GuaranteedTopic[T]) Publish(ctx context.Context, v T) error { t.mu.RLock() subs := make([]*sub[T], len(t.subs)) copy(subs, t.subs) t.mu.RUnlock()
for _, s := range subs { if s.closed.Load() { continue } select { case <-ctx.Done(): return ctx.Err() case s.ch <- v: } } return nil}Бойко-fashion: publish блокируется, если хоть один subscriber заполнен. Trade-off: гарантия доставки vs slow subscriber может застопорить всех.
Задача 8. Graceful shutdown полный пример
Заголовок раздела «Задача 8. Graceful shutdown полный пример»func main() { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer cancel()
pool := workerpool.New(ctx, 10, 100) defer pool.Shutdown()
srv := &http.Server{ Addr: ":8080", Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { _ = pool.Submit(func(_ context.Context) { // обработка }) w.WriteHeader(204) }), }
errCh := make(chan error, 1) go func() { if err := srv.ListenAndServe(); err != http.ErrServerClosed { errCh <- err } }()
select { case err := <-errCh: log.Fatal(err) case <-ctx.Done(): log.Println("shutting down...") }
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := srv.Shutdown(shutdownCtx); err != nil { log.Printf("forced shutdown: %v", err) }
pool.Stop() log.Println("bye")}Это шаблон для production-приложения.
8. Источники
Заголовок раздела «8. Источники»- Sameer Ajmani — “Go Concurrency Patterns” — https://go.dev/blog/pipelines (классика).
- Bryan C. Mills — “Rethinking Classical Concurrency Patterns” (GopherCon 2018) — https://www.youtube.com/watch?v=5zXAHh5tJqQ.
- Katherine Cox-Buday — “Concurrency in Go” (книга O’Reilly).
golang.org/x/sync/errgroupdocs — https://pkg.go.dev/golang.org/x/sync/errgroup.golang.org/x/sync/semaphoredocs — https://pkg.go.dev/golang.org/x/sync/semaphore.golang.org/x/sync/singleflightdocs — https://pkg.go.dev/golang.org/x/sync/singleflight.golang.org/x/time/ratedocs — https://pkg.go.dev/golang.org/x/time/rate.- Rob Pike — “Concurrency is not parallelism” — https://go.dev/blog/waza-talk.
signal.NotifyContext— https://pkg.go.dev/os/signal#NotifyContext.net/http.Server.Shutdown— https://pkg.go.dev/net/http#Server.Shutdown.