Concurrent API Design: Principles, Patterns, Testing
Зачем знать на Middle 3: дизайн concurrent API — это то, что отличает middle от senior. Можно написать корректный код, но если API неправильный (race-prone, badly composable, leaky abstractions), то любой junior, использующий ваш пакет, создаст data race в production. Middle 3 / Senior должен: (1) знать, когда channels vs mutex vs atomic; (2) дизайнить APIs так, чтобы concurrency была hidden или explicit (never accidental); (3) понимать context propagation, memory ordering documentation, lock granularity; (4) уметь тестировать concurrent code (race detector, stress test, synctest). Без этого вы будете автором тех самых пакетов, которые “иногда падают на проде”.
Содержание
Заголовок раздела «Содержание»- Краткое введение
- Глубокое погружение
- 2.1 Принципы concurrent API
- 2.2 Когда channels, когда mutex (Rob Pike’s rules)
- 2.3 API patterns: functional options, builder, iterator
- 2.4 Context propagation
- 2.5 Memory ordering и happens-before
- 2.6 Lock granularity
- 2.7 Actor model и reactor pattern
- 2.8 Worker pool
- 2.9 Backpressure
- 2.10 Error handling
- 2.11 Testing concurrent code
- Gotchas
- Real cases
- Вопросы
- Practice
- Источники
1. Краткое введение
Заголовок раздела «1. Краткое введение»Concurrent API — это публичный интерфейс, который безопасно используется из нескольких горутин одновременно (или явно одно-горутинный с документацией). Хороший concurrent API:
- Скрывает синхронизацию внутри — пользователь не должен брать lock, чтобы вызвать вашу функцию.
- Не возвращает ссылки на internal state без копирования.
- Документирует thread safety в comment’е к типу/функции.
- Composable — можно безопасно использовать с другими concurrent APIs.
- Поддерживает context для cancellation.
- Имеет понятную error semantics (как ошибки распространяются между горутинами).
Главные принципы
Заголовок раздела «Главные принципы»“Don’t communicate by sharing memory; share memory by communicating.” — Rob Pike
Это не значит “всегда channel’ы”. Это значит — структурируйте программу так, чтобы данные перемещались между горутинами, а не разделялись.
“Locks внутри API, не наружу.”
Никогда не делайте API, где пользователь должен взять lock перед вызовом ваших функций. Это leaky abstraction.
“Make zero value useful.”
sync.Mutex{} готов к работе без init. То же должно быть с вашими типами.
2. Глубокое погружение
Заголовок раздела «2. Глубокое погружение»2.1 Принципы concurrent API
Заголовок раздела «2.1 Принципы concurrent API»2.1.1 Locks внутри API
Заголовок раздела «2.1.1 Locks внутри API»Bad:
type Cache struct { Mu sync.RWMutex // ⚠️ exported field Data map[string]int}
// User must do:cache.Mu.RLock()v := cache.Data["key"]cache.Mu.RUnlock()Good:
type Cache struct { mu sync.RWMutex // unexported data map[string]int}
func (c *Cache) Get(key string) (int, bool) { c.mu.RLock() defer c.mu.RUnlock() v, ok := c.data[key] return v, ok}2.1.2 Не возвращайте ссылки на internal state
Заголовок раздела «2.1.2 Не возвращайте ссылки на internal state»Bad:
func (c *Cache) Snapshot() map[string]int { c.mu.RLock() defer c.mu.RUnlock() return c.data // ⚠️ caller has reference to internal map!}
// User does:m := cache.Snapshot()m["evil"] = 666 // BOOM — race on cache.data!Good:
func (c *Cache) Snapshot() map[string]int { c.mu.RLock() defer c.mu.RUnlock() snap := make(map[string]int, len(c.data)) for k, v := range c.data { snap[k] = v } return snap}Alternative (immutable snapshot):
type Cache struct { data atomic.Pointer[map[string]int]}
func (c *Cache) Snapshot() map[string]int { return *c.data.Load() // read-only by convention}⚠️ Но нужно документировать, что snapshot read-only. Можно использовать generic interface для гарантии.
2.1.3 Documenting thread safety
Заголовок раздела «2.1.3 Documenting thread safety»Стандартная нотация в Go-коде:
// Cache is safe for concurrent use by multiple goroutines.type Cache struct { /* ... */ }
// NewIterator returns an iterator that is NOT safe for concurrent use.// Each goroutine should call NewIterator separately.func (c *Cache) NewIterator() *Iterator { /* ... */ }2.1.4 Consistent primitives
Заголовок раздела «2.1.4 Consistent primitives»Don’t mix channel-based и mutex-based в одном пакете без причины. Это путает users. Выберите парадигму и следуйте.
2.2 Когда channels, когда mutex (Rob Pike’s rules)
Заголовок раздела «2.2 Когда channels, когда mutex (Rob Pike’s rules)»“Use whatever is most expressive and/or most simple.” — Rob Pike
Но если нужны правила:
| Сценарий | Primitive |
|---|---|
| Pipeline / fan-in / fan-out / event passing | channels |
| Mutable shared state (cache, map, counter) | mutex |
| Atomic counter | atomic |
| One-time init | sync.Once |
| Wait for N goroutines | WaitGroup |
| Broadcast (signal multiple) | sync.Cond или closed channel |
| Rate limiting | time.Tick или golang.org/x/time/rate |
2.2.1 Channel для pipeline
Заголовок раздела «2.2.1 Channel для pipeline»func stage1(out chan<- int) { defer close(out) for i := 0; i < 100; i++ { out <- i }}
func stage2(in <-chan int, out chan<- int) { defer close(out) for v := range in { out <- v * 2 }}
func main() { c1 := make(chan int) c2 := make(chan int) go stage1(c1) go stage2(c1, c2) for v := range c2 { fmt.Println(v) }}2.2.2 Mutex для shared state
Заголовок раздела «2.2.2 Mutex для shared state»type Counter struct { mu sync.Mutex v int}
func (c *Counter) Inc() { c.mu.Lock() c.v++ c.mu.Unlock()}
func (c *Counter) Value() int { c.mu.Lock() defer c.mu.Unlock() return c.v}2.2.3 atomic для простого counter
Заголовок раздела «2.2.3 atomic для простого counter»type Counter struct { v atomic.Int64}
func (c *Counter) Inc() { c.v.Add(1) }func (c *Counter) Value() int64 { return c.v.Load() }2.3 API patterns
Заголовок раздела «2.3 API patterns»2.3.1 Functional options (thread-safe configuration)
Заголовок раздела «2.3.1 Functional options (thread-safe configuration)»type Server struct { addr string timeout time.Duration maxConn int // ... unexported fields, mutex, etc.}
type Option func(*Server)
func WithTimeout(d time.Duration) Option { return func(s *Server) { s.timeout = d }}
func WithMaxConn(n int) Option { return func(s *Server) { s.maxConn = n }}
func NewServer(addr string, opts ...Option) *Server { s := &Server{ addr: addr, timeout: 30 * time.Second, maxConn: 100, } for _, opt := range opts { opt(s) } return s}
// Usage:srv := NewServer(":8080", WithTimeout(10*time.Second), WithMaxConn(1000))Configuration applied до того, как server start. Никаких race conditions — options применяются на single-goroutine construction phase.
2.3.2 Builder pattern
Заголовок раздела «2.3.2 Builder pattern»type Pipeline struct { mu sync.Mutex stages []stage}
func (p *Pipeline) AddStage(s stage) *Pipeline { p.mu.Lock() p.stages = append(p.stages, s) p.mu.Unlock() return p}
func (p *Pipeline) Build() *RunningPipeline { p.mu.Lock() defer p.mu.Unlock() return &RunningPipeline{stages: append([]stage(nil), p.stages...)}}Builder thread-safe (locks внутри). После Build — immutable snapshot.
2.3.3 Iterator (channel-based)
Заголовок раздела «2.3.3 Iterator (channel-based)»type Cache struct { mu sync.RWMutex data map[string]int}
func (c *Cache) Iterate(ctx context.Context) <-chan KV { out := make(chan KV) go func() { defer close(out) c.mu.RLock() defer c.mu.RUnlock() for k, v := range c.data { select { case out <- KV{k, v}: case <-ctx.Done(): return } } }() return out}
// Usage:ctx, cancel := context.WithCancel(context.Background())defer cancel()for kv := range cache.Iterate(ctx) { if kv.K == "stop" { cancel() // graceful stop break }}⚠️ Internal lock держится всё время итерации. Если consumer медленный — writers заблокированы. Альтернатива — snapshot первый, потом итерация без lock.
2.3.4 Iterator через snapshot
Заголовок раздела «2.3.4 Iterator через snapshot»func (c *Cache) Iterate(ctx context.Context) <-chan KV { c.mu.RLock() snap := make([]KV, 0, len(c.data)) for k, v := range c.data { snap = append(snap, KV{k, v}) } c.mu.RUnlock() // release ASAP
out := make(chan KV) go func() { defer close(out) for _, kv := range snap { select { case out <- kv: case <-ctx.Done(): return } } }() return out}Trade-off: больше памяти, но не держим lock.
2.4 Context propagation
Заголовок раздела «2.4 Context propagation»2.4.1 Always first arg
Заголовок раздела «2.4.1 Always first arg»// GOOD:func (c *Client) Get(ctx context.Context, key string) ([]byte, error)
// BAD:func (c *Client) Get(key string, ctx context.Context) ([]byte, error)2.4.2 Don’t store context in struct
Заголовок раздела «2.4.2 Don’t store context in struct»// BAD:type Client struct { ctx context.Context // ...}
// GOOD:type Client struct { // ... no ctx}
func (c *Client) Do(ctx context.Context, req Request) { /* ... */ }Exception: long-running services могут иметь lifecycleCtx для shutdown signal. Но это не request context.
2.4.3 context.WithoutCancel (Go 1.21+)
Заголовок раздела «2.4.3 context.WithoutCancel (Go 1.21+)»ctx := /* request context with cancellation */logCtx := context.WithoutCancel(ctx) // detached from cancel
go func() { // logging continues even after request done log.WithContext(logCtx).Info("processed")}()Полезно для:
- Logging continues after request done.
- Cleanup operations.
- Background work, которая не должна быть cancel’нута вместе с request.
2.4.4 context.AfterFunc (Go 1.21+)
Заголовок раздела «2.4.4 context.AfterFunc (Go 1.21+)»ctx, cancel := context.WithCancel(parent)defer cancel()
stop := context.AfterFunc(ctx, func() { // called when ctx is canceled conn.Close()})defer stop() // optional cleanup
// ... do work ...Replaces:
go func() { <-ctx.Done() conn.Close()}()Преимущество: не нужна горутина (callback run on cancel’s goroutine).
2.5 Memory ordering и happens-before
Заголовок раздела «2.5 Memory ordering и happens-before»2.5.1 Go Memory Model (Go 1.19+ updated)
Заголовок раздела «2.5.1 Go Memory Model (Go 1.19+ updated)»Happens-before relation:
Event A happens-before B if:1. A and B are in the same goroutine, and A is before B in program order.2. A is goroutine start (go f()), B is anything in f.3. A is channel send, B is corresponding receive.4. A is unlock, B is corresponding lock.5. ... (other rules)
Transitive: if A → B and B → C, then A → C.2.5.2 Atomic operations memory model (Go 1.19+)
Заголовок раздела «2.5.2 Atomic operations memory model (Go 1.19+)»Перед Go 1.19 memory model для atomic was vague. Сейчас:
Load— acquire semantics.Store— release semantics.Swap,Add,CAS— sequentially consistent.
var x intvar ready atomic.Bool
// Goroutine A:x = 42ready.Store(true) // release: x=42 publishes
// Goroutine B:if ready.Load() { // acquire: sees published x fmt.Println(x) // guaranteed to see 42}2.5.3 Document memory ordering explicitly
Заголовок раздела «2.5.3 Document memory ordering explicitly»// Set updates the value.//// Set is safe for concurrent use.// Set has release semantics: any writes before Set in the calling goroutine// are visible to goroutines that observe the new value via Get.func (c *Cache) Set(v int) { c.v.Store(v) }2.6 Lock granularity
Заголовок раздела «2.6 Lock granularity»2.6.1 Coarse lock (один большой mutex)
Заголовок раздела «2.6.1 Coarse lock (один большой mutex)»type Inventory struct { mu sync.Mutex items map[string]int orders []Order stats Stats}
func (i *Inventory) AddItem(...) { i.mu.Lock() defer i.mu.Unlock() // ...}Плюсы: простота, легко reason about. Минусы: всё сериализуется через один mutex → contention.
2.6.2 Fine-grained (per-resource mutex)
Заголовок раздела «2.6.2 Fine-grained (per-resource mutex)»type Inventory struct { itemsMu sync.RWMutex items map[string]int
ordersMu sync.Mutex orders []Order
statsMu sync.Mutex stats Stats}Плюсы: разные операции независимы. Минусы: deadlock risk (нужен strict ordering), сложнее reason about.
2.6.3 Striped locks (sharded)
Заголовок раздела «2.6.3 Striped locks (sharded)»const numShards = 16
type ConcurrentMap struct { shards [numShards]struct { mu sync.Mutex m map[string]int }}
func (c *ConcurrentMap) shard(k string) int { h := fnv.New32a() h.Write([]byte(k)) return int(h.Sum32()) % numShards}
func (c *ConcurrentMap) Get(k string) (int, bool) { s := &c.shards[c.shard(k)] s.mu.Lock() defer s.mu.Unlock() v, ok := s.m[k] return v, ok}Плюсы: scale до N concurrent access (если keys хорошо распределены). Минусы: range/iterate операции дороже (нужно lock все shards).
sync.Map использует более сложную structure (read-only + dirty map), но идея похожа.
2.7 Actor model и reactor pattern
Заголовок раздела «2.7 Actor model и reactor pattern»2.7.1 Actor (Erlang/Akka style)
Заголовок раздела «2.7.1 Actor (Erlang/Akka style)»Actor = goroutine + mailbox (channel). Все communication через message passing.
type Actor struct { inbox chan Message}
func NewActor() *Actor { a := &Actor{inbox: make(chan Message, 16)} go a.run() return a}
func (a *Actor) run() { state := initialState() for msg := range a.inbox { state = handle(state, msg) }}
func (a *Actor) Send(m Message) { a.inbox <- m}
func (a *Actor) Stop() { close(a.inbox)}Преимущества:
- No shared state — нет race conditions by design.
- Single goroutine processes state — простая модель.
- Composable (actors send to actors).
Недостатки:
- Overhead на channel ops.
- Голова mailbox = bottleneck.
- Sequential processing внутри actor (если нужен parallelism — sub-actors).
2.7.2 Reactor pattern (event loop)
Заголовок раздела «2.7.2 Reactor pattern (event loop)»Single goroutine processes events from multiple sources via select.
type Server struct { requests chan Request events chan Event quit chan struct{}}
func (s *Server) run() { for { select { case req := <-s.requests: s.handleRequest(req) case ev := <-s.events: s.handleEvent(ev) case <-s.quit: return } }}Используется в HTTP/2 connection processing (один loop читает frames из socket’а).
2.8 Worker pool patterns
Заголовок раздела «2.8 Worker pool patterns»2.8.1 Basic worker pool
Заголовок раздела «2.8.1 Basic worker pool»func WorkerPool(n int, jobs <-chan Job, results chan<- Result) { var wg sync.WaitGroup for i := 0; i < n; i++ { wg.Add(1) go func() { defer wg.Done() for j := range jobs { results <- process(j) } }() } go func() { wg.Wait() close(results) }()}2.8.2 Dynamic worker pool
Заголовок раздела «2.8.2 Dynamic worker pool»type Pool struct { minWorkers int maxWorkers int
mu sync.Mutex workers int jobs chan Job}
func (p *Pool) Submit(j Job) { p.mu.Lock() if p.workers < p.maxWorkers && len(p.jobs) > 0 { p.workers++ go p.worker() } p.mu.Unlock() p.jobs <- j}Сложнее, но adaptive под load.
2.8.3 errgroup для error handling
Заголовок раздела «2.8.3 errgroup для error handling»import "golang.org/x/sync/errgroup"
func ProcessAll(ctx context.Context, items []Item) error { g, gctx := errgroup.WithContext(ctx) g.SetLimit(10) // max concurrent for _, item := range items { item := item // capture g.Go(func() error { return process(gctx, item) }) } return g.Wait() // returns first error, cancels others}errgroup — must-have для concurrent code в Go.
2.9 Backpressure
Заголовок раздела «2.9 Backpressure»2.9.1 Bounded channel
Заголовок раздела «2.9.1 Bounded channel»ch := make(chan Job, 100) // bounded buffer
// Producer:select {case ch <- job:case <-ctx.Done(): return ctx.Err()}
// Consumer:for j := range ch { process(j)}Если consumer медленный — producer блокируется. Это и есть backpressure.
2.9.2 Rate limiting
Заголовок раздела «2.9.2 Rate limiting»import "golang.org/x/time/rate"
limiter := rate.NewLimiter(rate.Limit(100), 10) // 100/sec, burst 10
for _, req := range requests { if err := limiter.Wait(ctx); err != nil { return err // ctx canceled } process(req)}2.9.3 Adaptive backpressure (AIMD-style)
Заголовок раздела «2.9.3 Adaptive backpressure (AIMD-style)»type Controller struct { mu sync.Mutex inFlight int maxConcurrent int}
func (c *Controller) OnSuccess() { c.mu.Lock() c.maxConcurrent++ // increase on success c.mu.Unlock()}
func (c *Controller) OnError() { c.mu.Lock() c.maxConcurrent /= 2 // halve on error (AIMD) if c.maxConcurrent < 1 { c.maxConcurrent = 1 } c.mu.Unlock()}Используется в gRPC (semantic retries with backoff).
2.10 Error handling в concurrent code
Заголовок раздела «2.10 Error handling в concurrent code»2.10.1 errgroup
Заголовок раздела «2.10.1 errgroup»Уже показано. Стандартный паттерн.
2.10.2 Channel for errors
Заголовок раздела «2.10.2 Channel for errors»errs := make(chan error, len(items))for _, item := range items { go func(it Item) { if err := process(it); err != nil { errs <- err } }(item)}// Collect errors:var allErrs []errorfor range items { if err := <-errs; err != nil { allErrs = append(allErrs, err) }}⚠️ Если горутина не отправляет error (success path) — deadlock. Лучше использовать errgroup или buffered channel size = N.
2.10.3 defer recover
Заголовок раздела «2.10.3 defer recover»func worker() { defer func() { if r := recover(); r != nil { log.Printf("worker panic: %v", r) } }() // ... work}⚠️ Panic в goroutine не ловится в parent goroutine. Каждая горутина должна сама defer recover. Иначе вся программа crash’нется.
2.10.4 Error aggregation
Заголовок раздела «2.10.4 Error aggregation»import "errors"
var errs []error// ... gather errors ...return errors.Join(errs...) // Go 1.20+errors.Join создаёт error, который wrap’ит несколько. Полезно при concurrent operations.
2.11 Testing concurrent code
Заголовок раздела «2.11 Testing concurrent code»2.11.1 Race detector
Заголовок раздела «2.11.1 Race detector»go test -race ./...go build -race ./...go run -race main.goВключает ThreadSanitizer-подобный runtime check. Замедляет в 2-20x, но ловит data races.
⚠️ Race detector ловит только races, которые реально произошли. Не triggered race может остаться невидимым.
⚠️ Должен быть включён в CI.
2.11.2 testing/synctest (Go 1.24+)
Заголовок раздела «2.11.2 testing/synctest (Go 1.24+)»Новый пакет для тестирования concurrent code с virtual time.
import "testing/synctest"
func TestTimeout(t *testing.T) { synctest.Run(func() { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel()
done := make(chan struct{}) go func() { <-ctx.Done() close(done) }()
// virtual time advances when all goroutines are blocked synctest.Wait()
select { case <-done: // OK default: t.Fatal("ctx should be done") } })}Идея: synctest определяет, когда все горутины блокированы на time-related ops, и продвигает virtual time. Тесты с timeouts проходят моментально.
2.11.3 Stress test
Заголовок раздела «2.11.3 Stress test»func TestStress(t *testing.T) { if testing.Short() { t.Skip() } c := NewCache() var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 1000; j++ { c.Set(fmt.Sprintf("k%d", j%10), j) _, _ = c.Get(fmt.Sprintf("k%d", j%10)) } }(i) } wg.Wait()}Запускайте с -race. Должен пройти 100+ раз подряд без падений.
2.11.4 Property-based testing
Заголовок раздела «2.11.4 Property-based testing»pgregory.net/rapid или gopter — generate random sequences of operations, проверяют invariants.
import "pgregory.net/rapid"
func TestCacheProperty(t *testing.T) { rapid.Check(t, func(t *rapid.T) { c := NewCache() ops := rapid.SliceOf(rapid.Int()).Draw(t, "ops") for _, op := range ops { c.Set(strconv.Itoa(op), op) } // verify invariants })}2.11.5 Deadlock detection
Заголовок раздела «2.11.5 Deadlock detection»Go runtime детектит deadlock, если все горутины sleep’ят. Не ловит deadlock между подгруппой.
go-deadlock (sasha-s) — drop-in replacement для sync.Mutex с deadlock detection.
import deadlock "github.com/sasha-s/go-deadlock"
var mu deadlock.Mutex// если lock держится > 30s → printf stack trace3. Gotchas
Заголовок раздела «3. Gotchas»-
⚠️ Exported mutex fields. Public mutex = leaky abstraction. Locks должны быть внутри API.
-
⚠️ Возвращение reference на internal map/slice. Caller может mutate под вашими ногами. ВСЕГДА копируйте или возвращайте read-only view.
-
⚠️ Context stored в struct. Это anti-pattern (документировано в Go). Context — per-call, не per-object.
-
⚠️ Panic в горутине = crash программы. Каждая горутина должна сама defer recover, иначе process die.
-
⚠️ Channel close дважды = panic. Чёткие правила ownership: только sender close’ит. Никогда не close из reader.
-
⚠️ Closing channel в for-loop без guard. Если несколько senders — кто close’ит? Используйте
sync.Onceили producer-coordinator pattern. -
⚠️
sync.WaitGroup.AddпослеWait= panic. Все Add должны быть до Wait. -
⚠️ errgroup без SetLimit. Если items = 1M, создаст 1M горутин. Используйте
SetLimit(Go 1.20+) для ограничения. -
⚠️
context.WithCancelбез defer cancel = leak. Парент context’а удерживает child. Резкое падение memory если есть много активных contexts. -
⚠️
sync.Mapдля не-read-heavy workload. Документация чётко говорит:sync.Mapоптимизирован для (1) read-mostly или (2) disjoint key sets. Иначе хужеsync.RWMutex + map. -
⚠️ Lock granularity vs deadlock. Fine-grained → больше deadlock risk. Strict lock ordering (по уровням) необходим.
-
⚠️ Race detector в CI без stress test. Race detector ловит only triggered races. Без стресса rare races не появятся.
-
⚠️ Variable capture в goroutine закрытии. Classic bug:
for _, x := range items { go func() { use(x) }() // ⚠️ captures loop var}В Go 1.22+ поведение исправлено (каждая iteration — новый scope), но в legacy коде проблема.
-
⚠️
time.Afterв hot loop = memory leak. Каждыйtime.Afterсоздаёт timer, который не освобождается до срабатывания. Используйтеtime.NewTimer+ Stop. -
⚠️ Не закрытый channel + range = leak.
for v := range chblocks вечно, если ch не closed и не получает новых values. -
⚠️
context.WithCancelparent context передан как nil → panic (Go 1.21+). -
⚠️
atomic.Pointer[T]Store(nil) — atomic.Pointer не принимает untyped nil. Нужноvar p *T; atomic.Pointer.Store(p). -
⚠️ Возврат channel из API — кто его close’ит? Документируйте чётко. Best practice: каждая function, которая возвращает channel, обещает его close. Read-only channels (
<-chan) clear выражают это. -
⚠️
selectбез default может block вечно. Если все case’ы NIL channel’ы — deadlock. Иногда intentional, но всегда документируйте. -
⚠️ Mutex в struct, передаваемый по значению. Mutex копируется (
go vetловит). Передавайте*T.
4. Real cases
Заголовок раздела «4. Real cases»4.1 net/http Client connection pool
Заголовок раздела «4.1 net/http Client connection pool»http.Client использует http.Transport, который имеет connection pool. Внутри — pool с mutex:
type Transport struct { idleMu sync.Mutex idleConn map[connectMethodKey][]*persistConn // ...}Pool safe for concurrent use. Одна *Client обычно shared между горутинами. Пользователь не берёт lock — всё внутри Transport.
4.2 grpc-go ClientConn
Заголовок раздела «4.2 grpc-go ClientConn»grpc.ClientConn — managed pool subconnections. Внутри:
sync.RWMutexдля общей структуры.atomic.Pointerдля current load balancer state.- Channels для control plane events.
Mix of patterns: read-heavy snapshot через atomic, modify через mutex, events через channels.
4.3 Kafka client (Sarama) batching
Заголовок раздела «4.3 Kafka client (Sarama) batching»Producer накапливает messages per-partition в batches. Каждый batch — sync.Mutex-protected slice. При flush — batch swap’ится, mutex отпускается, новый batch начинает накапливаться.
Backpressure: если broker медленный — batches растут, producer in-flight bytes counter гейтит дальнейшие sends.
4.4 etcd raft storage
Заголовок раздела «4.4 etcd raft storage»etcd использует custom storage interface, защищённый RWMutex. Reads (snapshot, replay) идут под RLock. Writes (commit, advance) — под Lock.
При high commit rate — writer starvation возможна, но в практике writes batchatся, и читателей много.
4.5 Caddy server modules
Заголовок раздела «4.5 Caddy server modules»Caddy v2 — modular HTTP server. Каждый module реализует caddy.Module. Module instances могут быть shared между горутинами (handlers). Module API документирует thread safety per-method.
Конфигурация: caddy hot-reload генерирует new module instances в parallel, swap pointer атомарно. Old instances drain’ятся (текущие requests доживают).
4.6 sync.Once usage в std
Заголовок раздела «4.6 sync.Once usage в std»sync.Once используется в Std для lazy init:
time.Local— lazy load timezone.unicode.RangeTablebuilds.- HTTP Server’s default TLS config.
Каждый раз — Once.Do(init). Init runs once, последующие caller’ы get cached result.
4.7 Kubernetes client-go informer
Заголовок раздела «4.7 Kubernetes client-go informer»k8s.io/client-go/informers — watch Kubernetes resources с local cache. Cache — sync.RWMutex-protected map + indexer.
Watch events идут через single goroutine (event loop). Listeners получают callbacks. Callback running на shared dispatcher goroutine — длинные операции в callback заблокируют informer.
5. Вопросы
Заголовок раздела «5. Вопросы»-
Почему mutex должен быть внутри API, не наружу?
Leaky abstraction: user должен знать про synchronization. Один пропущенный Lock → race. API должно скрывать synchronization детали. -
Когда использовать channels vs mutex?
Channels: pipeline, event passing, fan-in/out, ownership transfer. Mutex: mutable shared state, counter (atomic лучше), one-time init (Once). -
Можно ли хранить context в struct?
Не рекомендуется. Context — per-call lifecycle. Исключение: long-running service может иметь lifecycle context для shutdown. -
Что такое
context.WithoutCancel(Go 1.21+)?
Detach context от cancellation, но сохраняет values. Для logging, audit, cleanup, которые не должны быть cancel’нуты вместе с request. -
Что такое
context.AfterFunc?
Регистрирует callback, который вызовется при cancel context’а. Не требует отдельной горутины. -
Что такое happens-before в Go memory model?
Если A happens-before B, то эффекты A видны в B. Устанавливается через channel ops, mutex, atomic, goroutine start. -
Какие memory ordering гарантии у
atomic.Load?
Acquire semantics: после Load все последующие операции видят, что было до соответствующего Store (с release semantics). -
Что такое coarse vs fine-grained lock?
Coarse: один большой mutex (простой, но contention). Fine-grained: per-resource mutex (concurrent, но deadlock risk). -
Что такое striped lock?
Sharded lock: hash(key) % N → выбор одного из N mutex’ов. Scale до N concurrent, key distribution критична. -
Зачем нужен
sync.Map?
Optimized для (1) read-mostly map, (2) disjoint key sets per-goroutine. Иначе хужеsync.RWMutex + map. -
Чем actor model отличается от mutex-based?
Actor: state локально в горутине, communication через message passing. Mutex: shared state, manual sync. Actor — no races by design. -
Что такое reactor pattern?
Single goroutine, select по нескольким event sources. Обрабатывает события serially. Используется в HTTP/2, gRPC, network frameworks. -
Когда worker pool правильнее, чем создавать goroutine per task?
При большом числе tasks (миллионы) — pool ограничивает memory/CPU. Также для rate limiting и backpressure. -
Что такое backpressure?
Механизм, через который consumer сигнализирует producer’у, что не успевает. В Go — buffered channel (producer blocks when full). -
Что такое AIMD?
Additive Increase Multiplicative Decrease — алгоритм adaptive rate control (TCP congestion control, gRPC). Increase by 1 on success, halve on error. -
Как обрабатывать panic в горутине?
Каждая горутина должна сама defer recover. Иначе panic crashes всю программу. -
Что такое errgroup и зачем?
golang.org/x/sync/errgroup— пакет для concurrent ops с error propagation. First error cancels context, который другие горутины могут observe. -
Что такое
errgroup.SetLimit?
Ограничение числа одновременно работающих горутин. Введено в Go 1.20. -
Что делает
errors.Join(Go 1.20+)?
Возвращает один error, который wrap’ит несколько. Полезно для aggregating errors из concurrent ops. -
Что показывает race detector?
Reported данные, к которым обращались из нескольких горутин без synchronization. Использует ThreadSanitizer внутри. -
Race detector overhead?
2-20x slowdown, 5-10x memory. Не для production, но обязательно в CI. -
Что такое
testing/synctest(Go 1.24+)?
Тестирование concurrent code с virtual time. Time advances, когда все горутины блокированы. Timeouts тесты — мгновенные. -
Чем
sync.Onceлучше bool + mutex?
Optimized для one-time init: после первого вызова — atomic load (lock-free). Sync.Once используется в std для lazy init. -
Что такое
singleflight?
golang.org/x/sync/singleflight— deduplicates concurrent calls. Если 100 горутин вызываютcache.GetOrFetch(key), fetch выполняется один раз. -
Что значит “make zero value useful”?
Struct полезен без инициализации.var mu sync.Mutex; mu.Lock()works. Аналогично делайте свои types.
6. Practice
Заголовок раздела «6. Practice»6.1 Thread-safe cache
Заголовок раздела «6.1 Thread-safe cache»Реализуйте Cache[K, V] с операциями Get, Set, Delete, Snapshot, Iterate. Документируйте thread safety каждой операции. Race-test.
6.2 Pipeline с error propagation
Заголовок раздела «6.2 Pipeline с error propagation»Реализуйте 3-stage pipeline: read → transform → write. Используйте errgroup для error handling. При ошибке на любой stage — все cancel.
6.3 Actor с supervision
Заголовок раздела «6.3 Actor с supervision»Реализуйте Actor system: supervisor запускает worker actor’ов. Если worker panic’нул — supervisor restart’ит. Используйте channels для inter-actor communication.
6.4 Adaptive worker pool
Заголовок раздела «6.4 Adaptive worker pool»Реализуйте dynamic worker pool, который scale’ит от 1 до N workers в зависимости от queue depth. Если queue растёт — добавьте worker. Если пуста — удалите.
6.5 Rate limiter без golang.org/x/time/rate
Заголовок раздела «6.5 Rate limiter без golang.org/x/time/rate»Реализуйте token bucket rate limiter. Поддержите Allow(), Wait(ctx), Reserve(). Бенчмарк vs стандартный.
6.6 errgroup с deadline
Заголовок раздела «6.6 errgroup с deadline»Расширьте errgroup: вместо Wait() — WaitDeadline(d time.Duration). Возвращает либо first error, либо timeout.
6.7 Concurrent map с stripes
Заголовок раздела «6.7 Concurrent map с stripes»Реализуйте StripedMap[K, V] с N shards. Стресс-тест: 100 горутин, 10k ops each. Сравните throughput с sync.Map и sync.RWMutex+map.
6.8 Тест concurrent API с synctest
Заголовок раздела «6.8 Тест concurrent API с synctest»Возьмите свой timer-based код. Перепишите тест с testing/synctest (Go 1.24+). Должен пройти моментально (без реальных sleep’ов).
7. Источники
Заголовок раздела «7. Источники»- Rob Pike, Go Concurrency Patterns — фундаментальный talk.
- Sameer Ajmani, Advanced Go Concurrency Patterns — продолжение.
- Bryan Mills, Rethinking Classical Concurrency Patterns — современный взгляд (GopherCon 2018).
- Russ Cox, Go Memory Model — обновлён в Go 1.19.
- Effective Go Concurrency section.
- Dave Cheney, Never start a goroutine without knowing how it will stop.
- golang/sync source — errgroup, semaphore, singleflight.
- Katherine Cox-Buday, “Concurrency in Go” — книга, особенно главы про pipelines.
- Bryan Cantrill, “Falling in Love with Rust” — relevant даже для Go (about ownership).
- Akka documentation — actor model patterns.
- Reactive Manifesto reactivemanifesto.org — про responsive, resilient, elastic systems.
- golang.org/x/time/rate — token bucket implementation.
- net/http source —
Transport, connection pool как пример хорошего design. - grpc-go source —
ClientConn, mixed channels/mutex patterns. - testing/synctest proposal — Go 1.24+ feature.