Перейти к содержимому

Concurrency-паттерны: production-level

Зачем знать на Middle 1: Паттерны конкурентности — это словарь для общения о коде. Worker pool, pipeline, fan-in/out, errgroup, singleflight — каждый middle обязан уметь их написать на доске и применять в коде. Это не “знание ради знания”: эти паттерны решают конкретные задачи backend-разработки: ограничить параллелизм, обработать batch с отменой, защититься от cache stampede, реализовать rate limit.


  1. Базовая концепция (кратко)
  2. Под капотом и паттерны
  3. Gotchas
  4. Производительность
  5. Когда использовать / альтернативы
  6. Вопросы на собесе
  7. Practice
  8. Источники

Concurrency-паттерны в Go строятся на трёх китах:

  1. Goroutines — дешёвый параллелизм (~2KB stack).
  2. Channels — типизированная связь.
  3. Context — отмена и timeout.

И четырёх библиотечных кирпичах:

  • sync (Mutex, WaitGroup, Once, Pool).
  • golang.org/x/sync/errgroup — параллелизм с ошибками и context.
  • golang.org/x/sync/semaphore — взвешенный semaphore.
  • golang.org/x/sync/singleflight — дедупликация.

Основные паттерны:

  • Worker pool — фиксированный N горутин обрабатывает очередь.
  • Pipeline — этапы соединены каналами.
  • Fan-out / fan-in — распределение и сбор.
  • Or-done — отмена и cleanup.
  • Semaphore — ограничение параллелизма.
  • Rate limit — ограничение rate (req/s).
  • Backpressure — обратное давление при медленном consumer.
  • Pub-Sub — broadcast in-process.
  • Future / Promise — асинхронный результат.

Базовая идея: создаём N постоянных горутин-воркеров, они читают из общего канала задач.

package workerpool
import (
"context"
"sync"
)
type Job func(ctx context.Context)
type Pool struct {
jobs chan Job
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
func New(ctx context.Context, n int, queueSize int) *Pool {
ctx, cancel := context.WithCancel(ctx)
p := &Pool{
jobs: make(chan Job, queueSize),
ctx: ctx,
cancel: cancel,
}
p.wg.Add(n)
for i := 0; i < n; i++ {
go p.worker(i)
}
return p
}
func (p *Pool) worker(id int) {
defer p.wg.Done()
for {
select {
case <-p.ctx.Done():
return
case job, ok := <-p.jobs:
if !ok {
return
}
job(p.ctx)
}
}
}
// Submit добавляет задачу. Блокируется если очередь полна.
func (p *Pool) Submit(j Job) error {
select {
case <-p.ctx.Done():
return p.ctx.Err()
case p.jobs <- j:
return nil
}
}
// TrySubmit non-blocking.
func (p *Pool) TrySubmit(j Job) bool {
select {
case p.jobs <- j:
return true
default:
return false
}
}
// Stop graceful: закрывает jobs, ждёт воркеров.
func (p *Pool) Stop() {
close(p.jobs)
p.wg.Wait()
}
// Shutdown force: отменяет контекст.
func (p *Pool) Shutdown() {
p.cancel()
p.wg.Wait()
}

ASCII-схема:

Submit
┌─────────────┐
│ jobs chan │ ◄── buffer size = queueSize
└──┬──┬──┬────┘
│ │ │
▼ ▼ ▼
┌─┐ ┌─┐ ┌─┐
│W│ │W│ │W│ ◄── N workers
└─┘ └─┘ └─┘
│ │ │
▼ ▼ ▼
job(ctx)

Замечания:

  • queueSize — backpressure: если 0, Submit блокируется пока worker не свободен.
  • Stop — graceful: воркеры доедают очередь.
  • Shutdown — force: отмена через context.

errgroup.Group (golang.org/x/sync/errgroup) — упрощает паттерн “запусти N горутин, дождись всех, верни первую ошибку”.

package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
)
func ProcessAll(ctx context.Context, items []string) error {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(10) // максимум 10 параллельных
for _, item := range items {
item := item // capture
g.Go(func() error {
return process(ctx, item)
})
}
return g.Wait() // блокирует, возвращает первую ошибку
}
func process(ctx context.Context, item string) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// ... работа
return nil
}

Что делает errgroup:

  1. WithContext возвращает производный ctx; cancel’ится при первой ошибке.
  2. Go(f) — стартует горутину.
  3. SetLimit(N) (Go 1.18+ в errgroup) — semaphore на N concurrent.
  4. TryGo(f) — non-blocking запуск (если semaphore полон, возвращает false).
  5. Wait() — ждёт все горутины, возвращает первую non-nil ошибку.

Реализация SetLimit:

func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic("errgroup: modify limit while goroutines in flight")
}
g.sem = make(chan token, n)
}
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- token{} // acquire (блокирует если полон)
}
g.wg.Add(1)
go func() {
defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
g.err = err
if g.cancel != nil { g.cancel(g.err) }
})
}
}()
}

Этапы обработки соединены каналами. Каждый этап — горутина (или несколько).

package pipeline
import "context"
// generator: int → chan int
func gen(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case <-ctx.Done():
return
case out <- n:
}
}
}()
return out
}
// stage1: square
func sq(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case <-ctx.Done():
return
case out <- n * n:
}
}
}()
return out
}
// stage2: filter even
func filterEven(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 != 0 {
continue
}
select {
case <-ctx.Done():
return
case out <- n:
}
}
}()
return out
}
// Использование
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nums := gen(ctx, 1, 2, 3, 4, 5)
squared := sq(ctx, nums)
evens := filterEven(ctx, squared)
for v := range evens {
println(v) // 4, 16
}
}

ASCII:

[gen]──chan──►[sq]──chan──►[filterEven]──chan──► consumer

Правила pipeline:

  1. Каждый этап запускает горутину, читает входной chan, пишет в выходной.
  2. Outbound chan закрывает тот, кто пишет.
  3. Context для отмены — каждый стейдж проверяет ctx.Done().
  4. Закрытие inbound триггерит range exit → close outbound → каскад.

Fan-out: один генератор → N воркеров. Fan-in: N каналов → один.

// fan-out: stage с N параллельными
func fanOutSq(ctx context.Context, in <-chan int, n int) []<-chan int {
outs := make([]<-chan int, n)
for i := 0; i < n; i++ {
outs[i] = sq(ctx, in) // несколько goroutines читают тот же in
}
return outs
}
// fan-in: merge нескольких в один
func merge[T any](ctx context.Context, channels ...<-chan T) <-chan T {
out := make(chan T)
var wg sync.WaitGroup
wg.Add(len(channels))
for _, c := range channels {
c := c
go func() {
defer wg.Done()
for v := range c {
select {
case <-ctx.Done():
return
case out <- v:
}
}
}()
}
go func() {
wg.Wait()
close(out)
}()
return out
}

Использование:

in := gen(ctx, 1, 2, 3, ..., 100)
outs := fanOutSq(ctx, in, 8) // 8 воркеров делят работу
merged := merge(ctx, outs...) // собираем в один канал
for v := range merged { /* ... */ }

ASCII:

┌──[sq]──┐
├──[sq]──┤
[gen]──in──┼──[sq]──┼──merge──out──► consumer
├──[sq]──┤
└──[sq]──┘
Fan-out: in → 5 workers (читают параллельно)
Fan-in: 5 outputs → 1 (merge)

Канал как iterator:

func Range(start, end int) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := start; i < end; i++ {
ch <- i
}
}()
return ch
}
for v := range Range(0, 10) {
println(v)
}

⚠️ Если consumer не дочитал до конца — goroutine leak! Решение: context.

func Range(ctx context.Context, start, end int) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := start; i < end; i++ {
select {
case <-ctx.Done():
return
case ch <- i:
}
}
}()
return ch
}

В Go 1.23 появился стандартный iter.Seq/iter.Seq2 — для итерации range-функциями. Но для concurrent producer всё ещё используют каналы.

Универсальный паттерн для отмены:

func orDone[T any](done <-chan struct{}, c <-chan T) <-chan T {
out := make(chan T)
go func() {
defer close(out)
for {
select {
case <-done:
return
case v, ok := <-c:
if !ok {
return
}
select {
case <-done:
return
case out <- v:
}
}
}
}()
return out
}
// Использование
for v := range orDone(done, dataCh) {
// не нужно select в каждом месте
}

Идея: вместо того чтобы каждый consumer писал select { case <-done: ... case v := <-c: ... }, упаковываем в orDone и используем обычный range.

С context.Context это:

func ctxRange[T any](ctx context.Context, c <-chan T) <-chan T {
return orDone(ctx.Done(), c)
}

Совмещение N сигналов отмены в один:

func or(channels ...<-chan struct{}) <-chan struct{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
out := make(chan struct{})
go func() {
defer close(out)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], out)...):
}
}
}()
return out
}

В современном коде вместо or — context.WithCancel композиция.

Один входной канал → два выходных, в каждый идёт каждое значение:

func tee[T any](done <-chan struct{}, in <-chan T) (<-chan T, <-chan T) {
out1 := make(chan T)
out2 := make(chan T)
go func() {
defer close(out1)
defer close(out2)
for v := range in {
var a, b = out1, out2
for i := 0; i < 2; i++ {
select {
case <-done:
return
case a <- v:
a = nil // disable
case b <- v:
b = nil
}
}
}
}()
return out1, out2
}

Trick: обнуляем переменную → case с nil каналом никогда не сработает. Это идиома Go select.

type Sem chan struct{}
func NewSem(n int) Sem {
return make(chan struct{}, n)
}
func (s Sem) Acquire() {
s <- struct{}{}
}
func (s Sem) Release() {
<-s
}
func (s Sem) TryAcquire() bool {
select {
case s <- struct{}{}:
return true
default:
return false
}
}

Использование:

sem := NewSem(10)
for _, item := range items {
sem.Acquire()
go func(item Item) {
defer sem.Release()
process(item)
}(item)
}

Когда нужны “веса” — операции с разным стоимостью:

import "golang.org/x/sync/semaphore"
sem := semaphore.NewWeighted(100) // 100 unit-ов capacity
// Тяжёлая операция (вес 50)
if err := sem.Acquire(ctx, 50); err != nil {
return err
}
defer sem.Release(50)
// Лёгкая (вес 1)
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
defer sem.Release(1)

Внутри — Mutex + waitlist. Acquire ждёт пока в bucket’е достаточно place.

import "golang.org/x/sync/singleflight"
var sf singleflight.Group
func GetUser(id string) (*User, error) {
v, err, shared := sf.Do(id, func() (any, error) {
// эта функция выполняется ТОЛЬКО раз для каждого id,
// даже если 100 горутин одновременно зовут GetUser("123").
return db.QueryUser(id)
})
if err != nil {
return nil, err
}
_ = shared // true если несколько горутин получили результат
return v.(*User), nil
}

Use case: cache stampede protection. 1000 одновременных запросов за одним ключом → один реальный DB query. Остальные 999 ждут результат первого.

Альтернатива: sf.DoChan(key, fn) — non-blocking, возвращает channel с результатом.

⚠️ Если первый запрос медленный — все остальные тоже ждут. Это компромисс: меньше нагрузки на backend vs больше latency для отдельного запроса.

import "golang.org/x/time/rate"
// 100 req/sec, burst 200
limiter := rate.NewLimiter(rate.Limit(100), 200)
func handler(w http.ResponseWriter, r *http.Request) {
if !limiter.Allow() {
http.Error(w, "rate limit", http.StatusTooManyRequests)
return
}
// ...
}
// Или с ожиданием:
if err := limiter.Wait(ctx); err != nil {
return err // ctx cancelled
}

Алгоритм token bucket:

  • Bucket с capacity burst токенов.
  • Пополняется со скоростью rate токенов/сек.
  • Каждый запрос забирает 1 (или N) токен.
  • Если токенов нет — отказ или wait.

ASCII:

drip rate=100/s
┌──────────┐
[tokens] │░░░░░░░░░░│ burst=200
│░░░░░░░░░░│
└────┬─────┘
▼ Allow() забирает 1
[Request OK?]
type LeakyBucket struct {
capacity int
bucket chan struct{}
ticker *time.Ticker
}
func NewLeakyBucket(cap int, leakInterval time.Duration) *LeakyBucket {
lb := &LeakyBucket{
capacity: cap,
bucket: make(chan struct{}, cap),
ticker: time.NewTicker(leakInterval),
}
go func() {
for range lb.ticker.C {
select {
case <-lb.bucket: // leak
default:
}
}
}()
return lb
}
func (lb *LeakyBucket) Allow() bool {
select {
case lb.bucket <- struct{}{}:
return true
default:
return false
}
}
func (lb *LeakyBucket) Close() {
lb.ticker.Stop()
}

Разница: token bucket позволяет burst, leaky — нет. Leaky — для smooth output rate.

Принцип: producer должен замедляться, когда consumer медленнее. Способы:

1. Unbuffered channel — natural backpressure:

ch := make(chan Item) // sender блокируется

2. Bounded buffered channel — backpressure при заполнении:

ch := make(chan Item, 100) // sender блокируется при 100 items

**3. Drop:

select {
case ch <- item:
default:
// drop
}

**4. Block с timeout:

select {
case ch <- item:
case <-time.After(timeout):
return ErrTimeout
}

⚠️ Антипаттерн: огромный буфер. make(chan T, 1<<20) — это память + latency, не решает проблему.

func main() {
ctx, cancel := signal.NotifyContext(context.Background(),
syscall.SIGINT, syscall.SIGTERM)
defer cancel()
srv := &http.Server{Addr: ":8080", Handler: mux}
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatal(err)
}
}()
<-ctx.Done()
log.Println("shutdown signal received")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := srv.Shutdown(shutdownCtx); err != nil {
log.Printf("forced shutdown: %v", err)
}
}

signal.NotifyContext (Go 1.16+) — context, который cancel’ится при SIGINT/SIGTERM. srv.Shutdown ждёт active requests до timeout.

type Topic[T any] struct {
mu sync.RWMutex
subs map[chan T]struct{}
}
func NewTopic[T any]() *Topic[T] {
return &Topic[T]{subs: make(map[chan T]struct{})}
}
func (t *Topic[T]) Subscribe(bufSize int) (<-chan T, func()) {
ch := make(chan T, bufSize)
t.mu.Lock()
t.subs[ch] = struct{}{}
t.mu.Unlock()
return ch, func() {
t.mu.Lock()
delete(t.subs, ch)
close(ch)
t.mu.Unlock()
}
}
func (t *Topic[T]) Publish(v T) {
t.mu.RLock()
defer t.mu.RUnlock()
for ch := range t.subs {
select {
case ch <- v:
default:
// slow consumer — drop (или log)
}
}
}

Drop slow subscribers — иначе один тормозит всех. Альтернатива: каждый sub получает свою горутину-pusher.

type Future[T any] struct {
done chan struct{}
val T
err error
}
func Async[T any](fn func() (T, error)) *Future[T] {
f := &Future[T]{done: make(chan struct{})}
go func() {
defer close(f.done)
f.val, f.err = fn()
}()
return f
}
func (f *Future[T]) Get(ctx context.Context) (T, error) {
select {
case <-f.done:
return f.val, f.err
case <-ctx.Done():
var zero T
return zero, ctx.Err()
}
}

Использование:

f := Async(func() (int, error) { return computeHeavy() })
// делаем что-то ещё...
v, err := f.Get(ctx)

⚠️ Future в Go не идиоматичен — обычно проще errgroup или горутина с каналом результата.

done := make(chan struct{})
go func() {
defer close(done)
work()
}()
// ... do other work ...
<-done // wait for completion

close(done) как broadcast: можно <-done сколько угодно раз, всё вернётся мгновенно.


Без ctx.Done() в select воркеры не остановятся при cancel.

g, _ := errgroup.WithContext(ctx)
for _, item := range millionItems {
g.Go(func() error { ... }) // 1M goroutines!
}

Всегда SetLimit(N) для batch обработки.

3.3. ⚠️ Pipeline: закрытие out выполняет тот, кто пишет

Заголовок раздела «3.3. ⚠️ Pipeline: закрытие out выполняет тот, кто пишет»

Если несколько горутин пишут в один out — координируй через WaitGroup, закрывай только когда все finish.

merged := merge(c1, c2, c3)
for v := range merged {
if cond {
break // c1/c2/c3 senders могут leak'нуть!
}
}

Используй orDone или context.

Если consumer не дочитал — generator горутина висит в ch <- v. Всегда — context.

3.6. ⚠️ Tee: один медленный consumer тормозит другой

Заголовок раздела «3.6. ⚠️ Tee: один медленный consumer тормозит другой»
o1, o2 := tee(in)
// если o1 читается медленно, o2 тоже стопает (наш select зависает на o1).

Решение: каждый out — со своим буфером, или отдельные горутины с back-pressure.

sem := make(chan struct{}, 10)
for _, item := range items {
sem <- struct{}{}
go func() { // ОШИБКА: item не захвачен
defer func() { <-sem }()
process(item) // последний item для всех!
}()
}

Правильно — func(item Item) или item := item.

sf.Do("key", func() (any, error) {
return nil, errors.New("transient")
})

Все 100 параллельных вызовов получат одну ошибку. Иногда хочется чтобы каждый ретраил. Решение: подумай, нужен ли тебе singleflight на этой задаче.

Wait(ctx) возвращает ctx.Err() при отмене. Не leak’нет.

for range time.Tick(time.Second) { ... }

time.Tick не освобождает таймер. Используй time.NewTicker + defer ticker.Stop().

Если subscriber завершается, но Topic не вызвал unsubscribe — канал утекает.

Async создаёт goroutine. Если ты делаешь 1M Async() на батч — это 1M goroutines. Используй pool/errgroup.


  • CPU-bound: N = GOMAXPROCS.
  • IO-bound: N большой (100-1000), зависит от downstream limits.

Profile с pprof: runtime.NumGoroutine() показывает текущий count.

func BenchmarkErrgroup(b *testing.B) {
for _, n := range []int{1, 10, 100, 1000} {
b.Run(fmt.Sprintf("limit=%d", n), func(b *testing.B) {
for i := 0; i < b.N; i++ {
g, _ := errgroup.WithContext(context.Background())
g.SetLimit(n)
for j := 0; j < 1000; j++ {
g.Go(func() error { return nil })
}
g.Wait()
}
})
}
}

Чем меньше limit — тем больше Wait в Go() (semaphore блокирует). Чем больше — тем больше goroutines одновременно.

Каждый этап = 1 канал + 1 горутина. Если этап делает 100 ns работы, а канал-операция 100 ns, overhead 50%. Pipeline хорош для тяжёлых операций.

Когда pipeline ХУЖЕ:

  • Работа очень дешёвая (< 1 µs).
  • Этапов много (> 5).

Решение: батчинг (передавать []T вместо T).

Один Do — это lookup в map + Mutex. ~100 ns. Очень дёшево относительно типичной операции (DB query 1-10 ms).

Allow() — ~50 ns. Под высокой нагрузкой не bottleneck.

Вместо отправки по одному:

ch <- item1
ch <- item2

Отправляй batch:

ch <- []Item{item1, item2}

Меньше lock contention, меньше Go scheduler overhead.


ЗадачаПаттерн
Параллельная обработка известных N itemserrgroup.WithContext + SetLimit
Обработка stream данныхpipeline + fan-out/in
Очередь работ в долго живущем процессеworker pool
Ограничить параллелизмsemaphore (chan или x/sync)
Защита от cache stampedesingleflight
Ограничить rate (req/s)rate.Limiter
Broadcast событияPub-Sub или close(ch)
Подождать одного результатаfuture / chan T (size 1)
Graceful shutdownsignal.NotifyContext + srv.Shutdown
Periodic tasktime.NewTicker + ctx
  • errgroup vs WaitGroup: errgroup в новых проектах almost всегда. WaitGroup — когда нет ошибок.
  • channel semaphore vs x/sync semaphore: chan быстрее для unit-веса. x/sync — для весов.
  • singleflight vs cache: они композируются. cache → miss → singleflight.

1. Реализуй worker pool на доске. См. секцию 2.1. Ключевое: chan jobs, ctx, N горутин в for-select, graceful Stop через close(jobs).

2. errgroup vs WaitGroup? errgroup += context + первая ошибка + SetLimit. WaitGroup только Add/Done/Wait, без context, без ошибок.

3. Что делает SetLimit? Создаёт internal semaphore-канал capacity N. Go() блокируется, если N горутин уже работают.

4. Что такое pipeline в Go? Цепочка этапов, соединённых каналами. Каждый этап — goroutine, читает in, пишет out. Закрытие inbound каскадирует через range.

5. Fan-out / fan-in? Fan-out: один источник → N consumer goroutines (читают один канал). Fan-in: N каналов → один (merge через WaitGroup).

6. Or-done паттерн? Обёртка канала, который закрывается при done. Позволяет в consumer писать обычный for range вместо select.

7. Tee паттерн? Дуплицирование канала: один in → два out, каждое значение идёт в оба.

8. Реализуй semaphore через канал. make(chan struct{}, N). Acquire — send. Release — recv. Try — select with default.

9. semaphore.Weighted зачем? Когда операции имеют вес: Acquire(ctx, weight). Полезно для пулов памяти/CPU.

10. Что делает singleflight? Дедупликирует одинаковые ключи: N параллельных Do(key, fn) → один реальный вызов fn, все получают результат.

11. Когда НЕ использовать singleflight?

  • Если каждый запрос должен иметь свежие данные.
  • Если ошибка одного не должна каскадировать на всех.
  • Если запрос быстрый — overhead не оправдан.

12. rate.Limiter — token bucket или leaky bucket? Token bucket. Capacity = burst, refill = rate. Allow забирает 1 токен, Wait блокирует пока не появится.

13. Token bucket vs leaky bucket? Token: позволяет burst (если bucket полон). Leaky: smooth rate, не позволяет burst.

14. Что такое backpressure? Замедление producer при медленном consumer. Реализуется через unbuffered/bounded chan, drop, или timeout.

15. Реализуй graceful shutdown HTTP сервера. signal.NotifyContext → server.ListenAndServe в goroutine → <-ctx.Done → server.Shutdown с timeout.

16. Реализуй Pub-Sub in-memory. См. секцию 2.16. Topic с map[chan T]struct{}, Publish с RLock, Subscribe возвращает канал + unsubscribe func.

17. Future в Go идиоматичен? Редко. Чаще errgroup + структура для результата, или ch := make(chan Result, 1); go func() { ch <- compute() }().

18. Как избежать leak в generator? Context.Done() в select при send. Или закрытие done канала из consumer.

19. errgroup.WithContext: что отменяется? Внутренний context cancel’ится при первой ошибке любого Go(). Все остальные горутины должны проверять ctx.Done().

20. Что вернёт g.Wait() при cancel? Первую non-nil ошибку. Если все горутины не возвращали ошибку, но ctx был cancelled — вернёт nil (горутины должны были вернуть ctx.Err() если хотели).

21. Реализуй limiter на N concurrent requests. Buffered chan size N. acquire — send, release — recv. Или x/sync semaphore.

22. Cancel в pipeline: как? Context в каждой стадии. select { case <-ctx.Done(): return; case out <- v: }.

23. Что произойдёт, если в pipeline один этап залип? Нisbalance: upstream блокируется на write (если unbuffered), downstream блокируется на read. Под backpressure обычно ок, под cancel — все должны выйти через ctx.

24. Закрытие pipeline output: кто и когда? Тот этап, который пишет. После range in exit + close(out) через defer.

25. Worker pool: queueSize=0 vs queueSize=N? 0 — Submit блокирует пока worker свободен (sync). N — Submit не блокирует до N в очереди (буфер).

26. Чем TryGo отличается от Go в errgroup? TryGo возвращает false, если semaphore полон (без блокировки). Go блокирует пока освободится слот.

27. Реализуй rate limiter на 100 req/sec через канал. Token bucket: канал buffered size 100, тикер раз в 10ms кидает 1 токен (учитывает overflow). Allow — TryRecv.

28. Pub-Sub с гарантией доставки? Каждому subscriber — своя горутина-pusher + bounded queue. Если queue полна — либо block sender, либо drop, либо kick subscriber.

29. Когда канал-семафор лучше Mutex? Когда нужно несколько concurrent (N>1) — Mutex даёт только 1. Канал size N это N concurrent.

30. singleflight DoChan vs Do? Do — блокирующий, возвращает (val, err, shared). DoChan — non-blocking, возвращает chan Result. Удобно при select с timeout.


type Job struct {
Priority int
Work func()
}
type PriorityPool struct {
high, low chan Job
wg sync.WaitGroup
}
func NewPriorityPool(n int) *PriorityPool {
p := &PriorityPool{
high: make(chan Job, 100),
low: make(chan Job, 100),
}
p.wg.Add(n)
for i := 0; i < n; i++ {
go p.worker()
}
return p
}
func (p *PriorityPool) worker() {
defer p.wg.Done()
for {
// Сначала проверяем high
select {
case j, ok := <-p.high:
if !ok { return }
j.Work()
continue
default:
}
// Иначе ждём любой
select {
case j, ok := <-p.high:
if !ok { return }
j.Work()
case j, ok := <-p.low:
if !ok { return }
j.Work()
}
}
}
func (p *PriorityPool) Submit(j Job) {
if j.Priority > 0 {
p.high <- j
} else {
p.low <- j
}
}
type Limiter struct {
rate float64 // tokens per second
capacity float64
mu sync.Mutex
tokens float64
last time.Time
}
func NewLimiter(rate, capacity float64) *Limiter {
return &Limiter{
rate: rate,
capacity: capacity,
tokens: capacity,
last: time.Now(),
}
}
func (l *Limiter) Allow() bool {
return l.AllowN(1)
}
func (l *Limiter) AllowN(n float64) bool {
l.mu.Lock()
defer l.mu.Unlock()
now := time.Now()
elapsed := now.Sub(l.last).Seconds()
l.tokens = math.Min(l.capacity, l.tokens+elapsed*l.rate)
l.last = now
if l.tokens < n {
return false
}
l.tokens -= n
return true
}

Используется в продакшен-API. Сравни с x/time/rate.Limiter — там ещё Wait и Reserve.

// pipeline: filenames → []byte (read) → []byte (process) → write
func pipeline(ctx context.Context, files []string) error {
g, ctx := errgroup.WithContext(ctx)
paths := make(chan string)
g.Go(func() error {
defer close(paths)
for _, f := range files {
select {
case <-ctx.Done(): return ctx.Err()
case paths <- f:
}
}
return nil
})
data := make(chan []byte)
// 4 readers
var rwg sync.WaitGroup
rwg.Add(4)
for i := 0; i < 4; i++ {
g.Go(func() error {
defer rwg.Done()
for p := range paths {
b, err := os.ReadFile(p)
if err != nil { return err }
select {
case <-ctx.Done(): return ctx.Err()
case data <- b:
}
}
return nil
})
}
g.Go(func() error {
rwg.Wait()
close(data)
return nil
})
// sink
g.Go(func() error {
for b := range data {
process(b)
}
return nil
})
return g.Wait()
}
type CachedFetcher struct {
cache *lru.Cache[string, []byte]
sf singleflight.Group
fetch func(ctx context.Context, k string) ([]byte, error)
}
func (c *CachedFetcher) Get(ctx context.Context, k string) ([]byte, error) {
if v, ok := c.cache.Get(k); ok {
return v, nil
}
v, err, _ := c.sf.Do(k, func() (any, error) {
return c.fetch(ctx, k)
})
if err != nil {
return nil, err
}
c.cache.Add(k, v.([]byte))
return v.([]byte), nil
}

Cache stampede protection + кеш. Канонический паттерн для backend.

func batchProcess(ctx context.Context, items []int) ([]int, error) {
in := make(chan int)
out := make(chan int)
g, ctx := errgroup.WithContext(ctx)
// producer
g.Go(func() error {
defer close(in)
for _, v := range items {
select {
case <-ctx.Done(): return ctx.Err()
case in <- v:
}
}
return nil
})
// 8 workers
var wg sync.WaitGroup
for i := 0; i < 8; i++ {
wg.Add(1)
g.Go(func() error {
defer wg.Done()
for v := range in {
r, err := process(ctx, v)
if err != nil { return err }
select {
case <-ctx.Done(): return ctx.Err()
case out <- r:
}
}
return nil
})
}
g.Go(func() error {
wg.Wait()
close(out)
return nil
})
results := make([]int, 0, len(items))
for v := range out {
results = append(results, v)
}
if err := g.Wait(); err != nil { return nil, err }
return results, nil
}

Замечание: order не сохраняется. Если нужна order — index в struct и сортировать после.

type SlowProducer struct {
out chan int
rate *rate.Limiter
}
func (p *SlowProducer) Produce(ctx context.Context, value int) error {
if err := p.rate.Wait(ctx); err != nil {
return err
}
select {
case <-ctx.Done(): return ctx.Err()
case p.out <- value:
return nil
}
}

Limiter сглаживает rate, канал даёт backpressure от consumer.

Задача 7. Pub-Sub с гарантированной доставкой

Заголовок раздела «Задача 7. Pub-Sub с гарантированной доставкой»
type GuaranteedTopic[T any] struct {
mu sync.RWMutex
subs []*sub[T]
}
type sub[T any] struct {
ch chan T
closed atomic.Bool
}
func (t *GuaranteedTopic[T]) Subscribe() (<-chan T, func()) {
s := &sub[T]{ch: make(chan T, 64)}
t.mu.Lock()
t.subs = append(t.subs, s)
t.mu.Unlock()
return s.ch, func() {
if s.closed.CompareAndSwap(false, true) {
t.mu.Lock()
for i, x := range t.subs {
if x == s {
t.subs = append(t.subs[:i], t.subs[i+1:]...)
break
}
}
t.mu.Unlock()
close(s.ch)
}
}
}
func (t *GuaranteedTopic[T]) Publish(ctx context.Context, v T) error {
t.mu.RLock()
subs := make([]*sub[T], len(t.subs))
copy(subs, t.subs)
t.mu.RUnlock()
for _, s := range subs {
if s.closed.Load() { continue }
select {
case <-ctx.Done(): return ctx.Err()
case s.ch <- v:
}
}
return nil
}

Бойко-fashion: publish блокируется, если хоть один subscriber заполнен. Trade-off: гарантия доставки vs slow subscriber может застопорить всех.

func main() {
ctx, cancel := signal.NotifyContext(context.Background(),
os.Interrupt, syscall.SIGTERM)
defer cancel()
pool := workerpool.New(ctx, 10, 100)
defer pool.Shutdown()
srv := &http.Server{
Addr: ":8080",
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_ = pool.Submit(func(_ context.Context) {
// обработка
})
w.WriteHeader(204)
}),
}
errCh := make(chan error, 1)
go func() {
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
errCh <- err
}
}()
select {
case err := <-errCh:
log.Fatal(err)
case <-ctx.Done():
log.Println("shutting down...")
}
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := srv.Shutdown(shutdownCtx); err != nil {
log.Printf("forced shutdown: %v", err)
}
pool.Stop()
log.Println("bye")
}

Это шаблон для production-приложения.


  1. Sameer Ajmani — “Go Concurrency Patterns”https://go.dev/blog/pipelines (классика).
  2. Bryan C. Mills — “Rethinking Classical Concurrency Patterns” (GopherCon 2018) — https://www.youtube.com/watch?v=5zXAHh5tJqQ.
  3. Katherine Cox-Buday — “Concurrency in Go” (книга O’Reilly).
  4. golang.org/x/sync/errgroup docshttps://pkg.go.dev/golang.org/x/sync/errgroup.
  5. golang.org/x/sync/semaphore docshttps://pkg.go.dev/golang.org/x/sync/semaphore.
  6. golang.org/x/sync/singleflight docshttps://pkg.go.dev/golang.org/x/sync/singleflight.
  7. golang.org/x/time/rate docshttps://pkg.go.dev/golang.org/x/time/rate.
  8. Rob Pike — “Concurrency is not parallelism”https://go.dev/blog/waza-talk.
  9. signal.NotifyContexthttps://pkg.go.dev/os/signal#NotifyContext.
  10. net/http.Server.Shutdownhttps://pkg.go.dev/net/http#Server.Shutdown.