Каналы Go: внутренности hchan, sendq/recvq, sudog, selectgo
Зачем знать на Middle 3. Каналы — визитная карточка Go, но 80% разработчиков знают их только на уровне
make(chan T)и<-. На Middle 3 от тебя ждут: (1) пониманиеhchanструктуры изruntime/chan.go; (2) умение объяснить, почему unbuffered channel — это rendezvous, и как direct-send обходит buffer; (3) знание алгоритмаselectgo(compile-time generation + runtime polling); (4) понимание memory ordering channel’ов в Go memory model; (5) умение оценить, когда канал быстрее mutex, а когда медленнее. Это самый частый “deep dive” вопрос на интервью senior-grade.
Содержание
Заголовок раздела «Содержание»- Краткое введение
- Глубокое погружение в исходники
- 2.1. Структура
hchanиsudog - 2.2. ASCII-схемы (unbuffered и buffered)
- 2.3.
makechan - 2.4.
chansendалгоритм - 2.5.
chanrecvалгоритм - 2.6.
closechanалгоритм - 2.7.
selectgo: compile-time и runtime - 2.8.
nil channelсемантика
- 2.1. Структура
- Memory model и happens-before
- Gotchas
- Production-кейсы и performance
- Вопросы
- Practice
- Источники
1. Краткое введение
Заголовок раздела «1. Краткое введение»Каналы в Go — это primary primitive для CSP-style concurrency (Communicating Sequential Processes, Hoare 1978). Они:
- Type-safe:
chan Tнесёт только значения типа T. - Direction-typed (compile-time):
chan<- Tsend-only,<-chan Trecv-only. - Optional buffer:
make(chan T)— unbuffered (rendezvous),make(chan T, N)— буферизированный circular queue. - Closable:
close(ch)— sender сигнализирует “больше ничего не будет”. - Selectable:
select { case <-ch1: ... case ch2 <- v: ... }— non-deterministic выбор готового канала.
Канал стоит ~100 нс на операцию под контаймом, против ~25 нс у sync.Mutex и ~5 нс у atomic. То есть каналы — не “бесплатный аналог mutex”, а более выразительная абстракция, у которой есть своя цена.
Каноничные шаблоны:
- Pipeline:
stage1 → stage2 → stage3(Bell Labs, Rob Pike). - Worker pool:
Nворкеров читают из общегоjobs chan Job. - Fan-in/fan-out: merge нескольких каналов в один.
- Backpressure: ограниченный буфер канала естественно тормозит продьюсера.
- Done signal:
chan struct{}для cancellation (заменён наcontext.Contextпосле Go 1.7).
2. Глубокое погружение в исходники
Заголовок раздела «2. Глубокое погружение в исходники»2.1. Структура hchan и sudog
Заголовок раздела «2.1. Структура hchan и sudog»type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex}
type waitq struct { first *sudog last *sudog}sudog (sudo-goroutine) — представляет goroutine, ожидающую на канале:
type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops.
g *g
next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock.
acquiretime int64 releasetime int64 ticket uint32
// isSelect indicates g is participating in a select, so // g.selectDone must be CAS'd to win the wake-up race. isSelect bool
// success indicates whether communication over channel c // succeeded. It is true if the goroutine was awoken because // a value was delivered over channel c, and false if awoken // because c was closed. success bool
parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel}Ключевое поле elem — указатель на буфер, куда (или откуда) копируются данные при rendezvous. Может указывать на стек горутины (нет heap-аллокации при unbuffered send/recv).
2.2. ASCII-схемы
Заголовок раздела «2.2. ASCII-схемы»Unbuffered channel:
make(chan int) — dataqsiz=0, buf=nil
┌──────────────────────────────────┐ │ hchan │ │ qcount = 0 │ │ dataqsiz = 0 │ │ buf = nil │ │ elemsize = 8 │ │ closed = 0 │ │ recvq ─┐ │ │ sendq ─┼┐ │ │ lock │ └─────────────┼┼───────────────────┘ ││ ┌──────────┘└────────┐ ▼ ▼ ┌───────────┐ ┌───────────┐ │ sudog G1 │ │ sudog G3 │ │ elem→stk │ │ elem→stk │ │ next ──┐ │ │ next ──┐ │ └─────────┼─┘ └─────────┼─┘ ▼ ▼ ┌───────────┐ ┌───────────┐ │ sudog G2 │ │ sudog G4 │ └───────────┘ └───────────┘Buffered channel (cap=4, qcount=2):
make(chan int, 4)
┌──────────────────────────────────┐ │ hchan │ │ qcount = 2 │ │ dataqsiz = 4 │ │ buf → ┐ │ │ sendx = 3 (next write idx) │ │ recvx = 1 (next read idx) │ │ recvq = empty │ │ sendq = empty │ └────────────┼─────────────────────┘ ▼ ┌─────────────────────────┐ │ circular buffer cap=4 │ │ ┌────┬────┬────┬────┐ │ │ │ x │ A │ B │ x │ │ │ └────┴────┴────┴────┘ │ │ 0 1 2 3 │ │ ↑ ↑ │ │ recvx=1 sendx=3 │ └─────────────────────────┘2.3. makechan
Заголовок раздела «2.3. makechan»func makechan(t *chantype, size int) *hchan { elem := t.Elem
// compiler checks this but be safe. if elem.Size_ >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign { throw("makechan: bad alignment") }
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) }
var c *hchan switch { case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = c.raceaddr() case !elem.Pointers(): // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) }
c.elemsize = uint16(elem.Size_) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) return c}Что важно:
- Unbuffered (
size=0): аллоцируется толькоhchan, без буфера.buf = c.raceaddr()— для race-детектора. - Buffered без указателей (e.g.
chan int):hchanиbufв одной аллокации (cache-friendly). - Buffered с указателями (e.g.
chan *Foo):bufотдельной аллокацией, потому что GC хочет видеть только указатели вbuf.
2.4. chansend алгоритм
Заголовок раздела «2.4. chansend алгоритм»// Entry points:// c <- v → chansend1(c, &v) // blocking// select case c <- v → selectsend... // non-blocking attempt
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") }
// Fast path: проверка closed/full без лока — НЕТ. См. ниже.
lock(&c.lock)
if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) }
// Case 1: есть waiting receiver → direct send (минуя buffer). if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }
// Case 2: есть место в buffer. if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }
if !block { unlock(&c.lock) return false }
// Case 3: блокируемся. Создаём sudog, добавляем в sendq, паркуем горутину. gp := getg() mysg := acquireSudog() mysg.elem = ep mysg.g = gp mysg.c = c gp.waiting = mysg c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Когда нас разбудили: if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil closed := !mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true}Function send — direct send to waiting receiver:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true goready(gp, skip+1)}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { // src в стеке отправителя, dst в стеке получателя. // Прямой memmove между стеками: возможно благодаря тому, // что lock канала держит обе горутины во "frozen" состоянии. dst := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_) memmove(dst, src, t.Size_)}📌 Важно. Direct send копирует данные напрямую из стека отправителя в стек получателя, минуя буфер. Это самая быстрая ветка для unbuffered каналов. Возможно потому, что обе горутины в этот момент “заморожены”: receiver спит в
gopark, sender держитc.lock.
2.5. chanrecv алгоритм
Заголовок раздела «2.5. chanrecv алгоритм»func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") }
lock(&c.lock)
if c.closed != 0 { if c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } // continue to receive from buffer } else { // Case 1: есть waiting sender. if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } }
// Case 2: есть данные в buffer. if c.qcount > 0 { qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }
if !block { unlock(&c.lock) return false, false }
// Case 3: блок. gp := getg() mysg := acquireSudog() mysg.elem = ep mysg.g = gp mysg.c = c gp.waiting = mysg c.recvq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// wake up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { // unbuffered: direct copy if ep != nil { recvDirect(c.elemtype, sg, ep) } } else { // buffered: receive value from buffer, поместить sender's value в buffer qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true goready(gp, skip+1)}Buffered receive с waiting sender — тонкая часть. Когда buffer полон и есть waiting sender:
- Отдаём receiver’у значение из
buf[recvx]. - Кладём sender’s value в
buf[recvx](теперь tail!). - Двигаем recvx и sendx так, чтобы circular queue оставалась корректной.
Это избегает копирования через временный буфер и сохраняет FIFO-семантику.
2.6. closechan алгоритм
Заголовок раздела «2.6. closechan алгоритм»func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) }
lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) }
c.closed = 1
var glist gList
// release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) }
// release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false glist.push(gp) } unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock. for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) }}Semantics:
- Все waiting receivers просыпаются с zero value и
success=false. - Все waiting senders просыпаются и паникуют (“send on closed channel”).
- Двойной close → panic.
close(nil)→ panic.
2.7. selectgo: compile-time и runtime
Заголовок раздела «2.7. selectgo: compile-time и runtime»select в Go — это специальная конструкция, которую компилятор превращает в вызов runtime.selectgo.
Compile-time (cmd/compile/internal/walk/select.go):
Для:
select {case v := <-ch1: ...case ch2 <- x: ...case <-time.After(t): ...default: ...}Компилятор генерирует:
- Массив
scases [N]scaseсо всеми case’ами (kind=send/recv, channel, elem pointer). - Массив
pollorder [N]uint16— random shuffle для fairness. - Массив
lockorder [N]uint16— sorted by hchan pointer для avoiding deadlock при lock’инге. - Вызов
selectgo(scases, pollorder, lockorder, ...)возвращает индекс выбранного case’а. - Switch на индекс → выполнение соответствующего кода.
Runtime (selectgo, runtime/select.go):
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) { cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0)) order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0)) ncases := nsends + nrecvs scases := cas1[:ncases:ncases] pollorder := order1[:ncases:ncases] lockorder := order1[ncases:][:ncases:ncases]
// generate random poll order norder := 0 for i := range scases { cas := &scases[i] if cas.c == nil { cas.elem = nil; continue } j := cheaprandn(uint32(norder + 1)) pollorder[norder] = pollorder[j] pollorder[j] = uint16(i) norder++ }
// sort channels by pointer address (для consistent lock ordering) for i := 1; i < norder; i++ { j := i c := scases[pollorder[i]].c for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() { k := (j - 1) / 2 lockorder[j] = lockorder[k] j = k } lockorder[j] = uint16(pollorder[i]) }
// lock all channels in lockorder sellock(scases, lockorder)
var dfl *scase var casi int var casu *scase
// Pass 1: try to find ready case without blocking. for _, casei := range pollorder { casi = int(casei) cas := &scases[casi] c := cas.c
if casi >= nsends { sg := c.sendq.dequeue() if sg != nil { goto recv } if c.qcount > 0 { goto bufrecv } if c.closed != 0 { goto rclose } } else { if c.closed != 0 { goto sclose } sg := c.recvq.dequeue() if sg != nil { goto send } if c.qcount < c.dataqsiz { goto bufsend } } }
if !block { selunlock(scases, lockorder) casi = -1 goto retc }
// Pass 2: enqueue on all channels, gopark. gp := getg() nextp := &gp.waiting for _, casei := range lockorder { casi = int(casei) cas := &scases[casi] c := cas.c sg := acquireSudog() sg.g = gp sg.isSelect = true sg.elem = cas.elem sg.c = c *nextp = sg nextp = &sg.waitlink if casi < nsends { c.sendq.enqueue(sg) } else { c.recvq.enqueue(sg) } }
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
// wakeup: gp.param contains the sudog that won the wakeup race sg := (*sudog)(gp.param) ... // pass 3: cleanup unsuccessful sudogs ...}Что критично:
- Random poll order — для fairness. Без него
case <-ch1всегда выигрывал бы уcase <-ch2, если ch1 идёт первым в коде. - Sorted lock order — для избегания deadlock’а: если две горутины делают
selectна одних и тех же каналах, они блокируют их в одном и том же порядке. - Pass 1 — попытка взять ready канал без блокировки.
- Pass 2 — если ничего не готово и
block == true, паркуем горутину на ВСЕХ каналах (через sudog сisSelect=true). - Wakeup race: при сигнале на любой из каналов sudog должен выиграть CAS на
g.selectDone, чтобы остальные sudog’и поняли — “этот select уже не наш”.
2.8. nil channel семантика
Заголовок раздела «2.8. nil channel семантика»var c chan int // c == nil
// Все эти операции БЛОКИРУЮТ ВЕЧНО (без runtime panic):c <- 1<-c
// close(nil) → panic("close of nil channel")close(c)Use case nil-канала: “выключить case” в select.
func process(ctx context.Context, in <-chan int) { var out chan<- int // временно nil var pending int
for { var dst chan<- int if pending != 0 { dst = out } select { case v := <-in: pending = v case dst <- pending: pending = 0 case <-ctx.Done(): return } }}Когда pending == 0, dst == nil → этот case никогда не сработает.
3. Memory model и happens-before
Заголовок раздела «3. Memory model и happens-before»Go memory model (см. https://go.dev/ref/mem) даёт для каналов следующие гарантии:
- k-th send on c happens-before k-th recv from c completes (FIFO ordering для buffered).
- k-th receive on c (with capacity C) happens-before (k+C)-th send completes (back-pressure).
- Closing c happens-before recv returns zero value because c is closed.
- Unbuffered: receive happens-before send completes (важно: receive синхронизируется с send).
Практически:
var data intdone := make(chan struct{})
go func() { data = 42 close(done) // happens-before любых read'ов после <-done}()
<-donefmt.Println(data) // garanteed 42, no data raceКанал служит синхронизационной точкой. Этим он принципиально отличается от mutex’а (где synchronization tied к Lock/Unlock).
4. Gotchas
Заголовок раздела «4. Gotchas»Gotcha 1 — Send на closed channel → panic
Заголовок раздела «Gotcha 1 — Send на closed channel → panic»⚠️ Closing — это broadcast END-OF-STREAM от sender’а. После close никто не должен слать. Чтобы избежать: владей закрытием в одном месте (single owner principle).
// BAD: receiver не должен close'ить, если sender может ещё слать.go func() { <-ctx.Done() close(ch) // ❌ если producer ещё пишет — panic}()Gotcha 2 — Double close → panic
Заголовок раздела «Gotcha 2 — Double close → panic»⚠️ Defer close(ch) дважды → panic. Pattern для “once close”:
var once sync.Oncedefer once.Do(func() { close(ch) })или используй sync.OnceFunc (Go 1.21+).
Gotcha 3 — for range over channel требует close
Заголовок раздела «Gotcha 3 — for range over channel требует close»⚠️ for v := range ch блокируется на чтении, пока канал не закрыт. Если sender не закрывает — горутина зависнет навсегда.
go func() { defer close(ch) // обязательно for _, v := range items { ch <- v }}()for v := range ch { ... }Gotcha 4 — Утечка горутины при незаконченной отправке
Заголовок раздела «Gotcha 4 — Утечка горутины при незаконченной отправке»⚠️ Если sender пытается слать в канал, у которого никто не читает (и нет места в буфере), горутина зависнет.
func leaky() chan int { ch := make(chan int) go func() { ch <- 1 }() // leak, если caller не прочитает return ch}Решение: timeout/context cancellation, или используй buffered с достаточной capacity.
Gotcha 5 — Buffered не значит “асинхронный”
Заголовок раздела «Gotcha 5 — Buffered не значит “асинхронный”»⚠️ Buffered лишь смягчает синхронизацию: send блокируется только когда буфер полон. На малых буферах (1-2) разница с unbuffered невелика.
Gotcha 6 — Select с default → busy loop
Заголовок раздела «Gotcha 6 — Select с default → busy loop»⚠️ select { case v := <-ch: ... default: } без default не блокирует, но в цикле без yield (runtime.Gosched) даёт busy-loop, съедающий CPU.
// BAD: busy loopfor { select { case v := <-ch: handle(v) default: // 100% CPU }}
// GOODfor v := range ch { handle(v) }Gotcha 7 — Select-by-channel-direction asymmetry
Заголовок раздела «Gotcha 7 — Select-by-channel-direction asymmetry»⚠️ В select можно использовать одну и ту же переменную в разных направлениях, но порядок case’ов рандомизируется. Не полагайся на priority.
select {case <-quit: return // не гарантирует приоритет!case v := <-data: handle(v)}Если quit важнее, используй non-blocking probe перед основным select:
select {case <-quit: returndefault:}select {case <-quit: returncase v := <-data: handle(v)}Gotcha 8 — len(ch) и cap(ch) не атомарны с send/recv
Заголовок раздела «Gotcha 8 — len(ch) и cap(ch) не атомарны с send/recv»⚠️ if len(ch) < cap(ch) { ch <- v } — race condition: между проверкой и send другой goroutine может заполнить буфер.
Gotcha 9 — Direct send between stacks → элемент на стеке receiver’a
Заголовок раздела «Gotcha 9 — Direct send between stacks → элемент на стеке receiver’a»⚠️ В unbuffered direct send данные копируются в стек receiver’a через recvDirect. Если receiver горутина имеет escape элемента на heap, копирование переходит через barrier.
Gotcha 10 — sudog pool
Заголовок раздела «Gotcha 10 — sudog pool»⚠️ Каждый channel-блокирующий call аллоцирует sudog. Это объект из per-P pool (schedt.sudoglock). Если у тебя миллион горутин ждёт на каналах → миллион sudog в куче. Это видно в pprof.
Gotcha 11 — receive from closed buffered channel
Заголовок раздела «Gotcha 11 — receive from closed buffered channel»⚠️ После close(ch) receiver продолжает читать оставшиеся в буфере значения, а только потом получает (zero, false). Не полагайся, что после close сразу (zero, false).
ch := make(chan int, 3)ch <- 1; ch <- 2; close(ch)v, ok := <-ch // v=1, ok=truev, ok = <-ch // v=2, ok=truev, ok = <-ch // v=0, ok=falseGotcha 12 — Range над nil channel блокирует навсегда
Заголовок раздела «Gotcha 12 — Range над nil channel блокирует навсегда»⚠️ var ch chan int; for v := range ch { } зависает. Это не запасный путь, а valid use case (отключение pipeline-стадии), но легко ошибиться.
5. Production-кейсы и performance
Заголовок раздела «5. Production-кейсы и performance»5.1. Latency comparison
Заголовок раздела «5.1. Latency comparison»| Operation | Latency |
|---|---|
| direct function call | ~1 ns |
| atomic.LoadInt64 | ~5 ns |
| sync.Mutex Lock+Unlock (uncontended) | ~25 ns |
| sync.RWMutex RLock+RUnlock | ~30 ns |
| chan send/recv (unbuffered, paired Gs) | ~100–250 ns |
| chan send/recv (buffered, slot available) | ~60–120 ns |
| chan send/recv with contention | ~300–800 ns |
| select with 2 cases (no block) | ~120 ns |
Вывод: для горячих счётчиков atomic или sync.Mutex быстрее канала в 5–20 раз. Канал использовать там, где нужна семантика (pipeline, signal, backpressure), а не там, где можно обойтись lock’ом.
5.2. Worker pool
Заголовок раздела «5.2. Worker pool»type Pool struct { jobs chan Job results chan Result wg sync.WaitGroup}
func NewPool(n int) *Pool { p := &Pool{ jobs: make(chan Job, n*2), results: make(chan Result, n*2), } p.wg.Add(n) for i := 0; i < n; i++ { go func() { defer p.wg.Done() for j := range p.jobs { p.results <- process(j) } }() } return p}Performance tip: buffer jobs = n*2 — даёт небольшой запас перед block, амортизирует jitter.
5.3. Pipeline
Заголовок раздела «5.3. Pipeline»func gen(nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { out <- n } }() return out}
func sq(in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { out <- n * n } }() return out}
for v := range sq(sq(gen(1, 2, 3, 4))) { fmt.Println(v) // 1, 16, 81, 256}5.4. Fan-in (merge)
Заголовок раздела «5.4. Fan-in (merge)»func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int)
wg.Add(len(cs)) for _, c := range cs { go func(c <-chan int) { defer wg.Done() for v := range c { out <- v } }(c) }
go func() { wg.Wait() close(out) }()
return out}5.5. Backpressure через buffered channel
Заголовок раздела «5.5. Backpressure через buffered channel»// Ограничиваем concurrent HTTP requestssem := make(chan struct{}, 50) // max 50 in flightfor _, url := range urls { sem <- struct{}{} go func(u string) { defer func() { <-sem }() fetch(u) }(url)}5.6. Реальные применения
Заголовок раздела «5.6. Реальные применения»NATS Go client: использует chan *Msg для message delivery per-subscription. Buffer = подписочный pendingMsgsLimit. Backpressure естественно — slowConsumer detection через len(chan).
gRPC-Go streaming: ServerStream/ClientStream используют каналы для async чтения/записи RPC сообщений. Сложная state machine отдельно отслеживает close семантику.
Kafka go-client (segmentio/kafka-go): партиции читаются параллельно, результаты merge’ятся в один chan Message.
Docker daemon: event subscription — каждый клиент получает свой chan events.Message. При отписке канал закрывается.
Prometheus alertmanager: dispatcher работает через каналы между receivers, routes, и notification stages.
etcd: watcher events delivery — каналы с buffer, slowConsumer disconnect логика.
6. Вопросы
Заголовок раздела «6. Вопросы»-
Что такое
hchan? Структура канала в runtime: buffer, sendx/recvx, qcount/dataqsiz, recvq/sendq waiters, lock, closed flag. -
Что такое
sudog? “Sudo goroutine” — представление ожидающей на канале горутины. Хранит указатель на G, элемент (источник или приёмник данных), указатель на канал. -
Что происходит при
c <- vна unbuffered канал, если есть receiver?recvq.dequeue()достаёт sudog’a receiver’a, runtime копирует значение прямо вsg.elem(стек receiver’a черезsendDirect), и будит receiver’a. -
Что происходит при
c <- vна полный buffered канал? Текущая горутина создаёт sudog, добавляет вsendq, вызываетgopark. -
Что произойдёт при
c <- vна closed канал?panic("send on closed channel"). -
Что такое direct send? Передача значения напрямую от sender’s elem в receiver’s elem без копирования в буфер. Работает только для unbuffered или buffered-with-waiting-receiver.
-
Может ли горутина ждать одновременно на нескольких каналах? Через
select— да. Внутренне создаётся sudog per канал сisSelect=true; первый сработавший канал выигрывает CAS наg.selectDone. -
Зачем
lockorderв selectgo? Чтобы избежать deadlock’а: при locking множества каналов всегда делать это в одном порядке (по адресу). Это классический DRY-deadlock prevention. -
Почему
pollorderрандомный? Для fairness: иначе всегда выигрывал бы первый case в коде. -
Что произойдёт при close нулевого канала?
panic("close of nil channel"). -
Что произойдёт при send/recv на nil канал? Блокировка навсегда (
gopark), не panic. -
Зачем нужен
closedфлаг? Чтобы send’ы паниковали, а recv’ы возвращали zero value иfalse. -
Что такое circular buffer в hchan? Массив
bufразмераdataqsiz, с двумя указателямиsendx(next write) иrecvx(next read). Wrap-around при достижении конца. -
Что делает
closechanс waiting’ами? Receivers просыпаются с zero value иsuccess=false. Senders просыпаются и паникуют. -
Канал thread-safe для concurrent send и recv? Да. Внутренний lock защищает все мутации. Производительно — на ~миллион ops/sec.
-
Что быстрее: channel send или mutex Lock+Unlock? Mutex (~25 ns) в 4–10x быстрее канала (~100–250 ns) на uncontended path. На contended — разница меньше, но всё равно mutex быстрее.
-
Когда стоит выбрать канал, а не мьютекс? Когда нужна семантика “общение между горутинами” (pipeline, signal, backpressure), а не shared state.
-
Гарантирует ли канал FIFO? Для buffered — да: i-th send → i-th recv. Для unbuffered — нет понятия “буфера”, но send/recv пары happens-before в порядке готовности.
-
Что такое
ticketв sudog? Для semaphore implementation (runtime.semacquire), не для каналов. -
Что делает
goready? Помещает G в run queue (scheduler) и помечает её как runnable. Sleeping G → runnable G. -
Куда копируется значение при unbuffered direct send? В стек receiver’a, в адрес, указанный в
sg.elem. Это валидно, потому что в момент копирования receiver “заморожен” в gopark, его стек не движется. -
Что такое
qcountvsdataqsiz?dataqsiz— capacity (фикс).qcount— current count (текущее число элементов в буфере). -
Может ли select работать с send-only и recv-only одновременно? Да, в одном select могут быть оба направления для разных каналов.
-
Что произойдёт, если closed канал имеет данные в буфере? Receiver сначала прочитает все данные из буфера, а потом получит
(zero, false). -
Как обеспечить cancellation pipeline? Передавать
context.Contextилиdone <-chan struct{}каждому этапу. Каждый этап долженselect { case <-done: return; case v := <-in: ... }. -
Что такое
chanparkcommit? Callback приgopark— отпускаетc.lockпосле того, как G помечена как parked. Это гарантирует, что мы не пропустим wakeup race. -
Сколько памяти стоит создание канала?
hchan~96 байт + буферdataqsiz * elemsize. Если буфер не содержит указателей — одна аллокация, иначе две. -
Что такое spurious wakeup в каналах? Это situation, когда G разбужена, но условие не выполнено. Go runtime гарантирует отсутствие spurious wakeup’ов для каналов; если sudog проснулся — операция гарантированно завершилась.
-
Что такое
selectgoPass 1 и Pass 2? Pass 1: пытаемся завершить какой-то case без блокировки (lookups в sendq/recvq, проверка буфера/closed). Pass 2: если ничего не готово, добавляем sudog ко всем каналам и гасимся черезgopark. -
Можно ли сравнить два канала? Да:
ch1 == ch2сравнивает указатели наhchan.ch1 == nilвалидно.
7. Practice
Заголовок раздела «7. Practice»Задача 1 — Реализуй safe broadcast
Заголовок раздела «Задача 1 — Реализуй safe broadcast»Канал не поддерживает broadcast (один send → один recv). Реализуй Broadcaster[T]:
b := NewBroadcaster[int]()ch1 := b.Subscribe()ch2 := b.Subscribe()b.Publish(42) // ch1 и ch2 получают 42b.Close() // все подписки закрываютсяИспользуй sync.RWMutex + slice of channels + неблокирующий send с default.
Задача 2 — Реализуй semaphore
Заголовок раздела «Задача 2 — Реализуй semaphore»Через chan struct{} с capacity = N. Сравни с golang.org/x/sync/semaphore (там используется sync.Mutex + список waiters).
Задача 3 — Fan-out + collect
Заголовок раздела «Задача 3 — Fan-out + collect»Запусти N воркеров, обрабатывающих jobs из общего канала. Собери результаты в одном слайсе. Корректно обработай cancellation через context.
Задача 4 — Trace channel operations
Заголовок раздела «Задача 4 — Trace channel operations»С помощью runtime/trace:
trace.Start(os.Stderr)defer trace.Stop()// ваш код с каналамиПосмотри Go execution trace в go tool trace. Найди chan send/recv события, blocked Gs.
Задача 5 — Channel vs Mutex bench
Заголовок раздела «Задача 5 — Channel vs Mutex bench»Напиши бенчмарк счётчика:
chan intс одним worker’ом, увеличивающим число.sync.Mutex+ int.atomic.Int64.
Запусти на разном количестве писателей (1, 4, 16, 64). Объясни графики.
Задача 6 — Detect leak
Заголовок раздела «Задача 6 — Detect leak»Найди в коде ниже утечку:
func process(ctx context.Context) <-chan int { out := make(chan int) go func() { for i := 0; i < 1000; i++ { out <- i } }() return out}
// caller:ctx, cancel := context.WithTimeout(ctx, time.Second)defer cancel()ch := process(ctx)v := <-ch// timeout fires, caller leaves...Задача 7 — Select priority
Заголовок раздела «Задача 7 — Select priority»Реализуй select с приоритетом: case A важнее case B (когда оба готовы, всегда выбирается A).
// non-blocking probe pattern:select {case <-A: handleA()default: select { case <-A: handleA() case <-B: handleB() }}Задача 8 — Реализуй timer без time.After (избегая утечек)
Заголовок раздела «Задача 8 — Реализуй timer без time.After (избегая утечек)»time.After создаёт новый Timer на каждый вызов, который не GC’ится до истечения. Реализуй reusable timer для использования в long-running select.
8. Источники
Заголовок раздела «8. Источники»- Go source code —
src/runtime/chan.go,src/runtime/select.go,src/cmd/compile/internal/walk/select.go. - Go memory model spec — https://go.dev/ref/mem. Раздел “Channel communication”.
- Russ Cox — “Bell Labs and CSP Threads” (history of CSP and Go channels). https://swtch.com/~rsc/thread/
- Rob Pike GopherCon 2014 — “Go Concurrency Patterns: Pipelines and Cancellation”.
- Dmitry Vyukov — “Scalable Go Scheduler Design Doc” (раздел про channels). https://docs.google.com/document/d/1TTj4T2JO42uD5ID9e89oa0sLKhJYD0Y_kqxDv3I3XMw/edit
- Bryan C. Mills GopherCon 2018 — “Rethinking Classical Concurrency Patterns”.
- Sameer Ajmani GopherCon 2014 — “Pipelines and Cancellation” (Go blog). https://go.dev/blog/pipelines
- Kavya Joshi GopherCon 2017 — “Understanding Channels in Go”. (must-watch для Middle 3!)
- Dave Cheney — “Channel Axioms” blog. https://dave.cheney.net/2014/03/19/channel-axioms
- Dmitry Vyukov LSE talks — internals of Go scheduler + channels.
- Damian Gryski “go-perfbook” — channel performance numbers.
- runtime/runtime2.go — sudog, g, m structures.
- runtime/proc.go —
gopark,goready, scheduler interactions. - gVisor talks — internal use of channels at scale.
- Cloudflare blog — “Channels in Go” (production lessons).
- The Go Programming Language (Donovan & Kernighan) — глава 8 “Goroutines and Channels”.
- Concurrency in Go (Katherine Cox-Buday, O’Reilly 2017) — практические паттерны.
- CSP “Communicating Sequential Processes” (Hoare 1978) — теоретическая основа.