Перейти к содержимому

Channels и Select: внутренности

Зачем знать на Middle 1: Channel — это не магия, а конкретная структура hchan с буфером, очередями и мьютексом. Middle-разработчик должен понимать, почему unbuffered send блокирует, как работает select (random fairness), и когда channel дороже mutex. Без этого — невозможно отлаживать goroutine leaks и писать высоконагруженный код.


  1. Базовая концепция (кратко)
  2. Под капотом: hchan, sudog, send/recv
  3. Gotchas
  4. Производительность
  5. Когда использовать / альтернативы
  6. Вопросы на собесе
  7. Practice
  8. Источники

Channel в Go — это типизированная FIFO-очередь с встроенной синхронизацией. Создаётся через make:

ch := make(chan int) // unbuffered
ch := make(chan int, 10) // buffered (capacity = 10)

Операции:

ch <- 42 // send
v := <-ch // receive
v, ok := <-ch // receive с проверкой closed
close(ch) // закрыть (только sender должен закрывать)

Правила, которые джун уже знает:

  • Send в nil канал — блокировка навсегда.
  • Recv из nil канала — блокировка навсегда.
  • Send в closed канал — panic.
  • Recv из closed канала — возвращает zero value, ok = false.
  • Close уже closed канала — panic.
  • Close nil канала — panic.

Унbuffered: send и recv синхронны. Sender блокируется, пока кто-то не прочтёт. Receiver блокируется, пока кто-то не отправит. Передача данных — это rendezvous (свидание).

Buffered: send не блокирует, пока в буфере есть место. Recv не блокирует, пока в буфере есть элемент.


make(chan T, N) возвращает указатель на hchan — структура из рантайма:

// runtime/chan.go (упрощённо для Go 1.22)
type hchan struct {
qcount uint // элементов в буфере сейчас
dataqsiz uint // capacity буфера (0 для unbuffered)
buf unsafe.Pointer // указатель на массив [dataqsiz]T (circular buffer)
elemsize uint16 // размер одного элемента
closed uint32 // 0 или 1
elemtype *_type // runtime type info
sendx uint // индекс куда писать (sender's index)
recvx uint // индекс откуда читать (receiver's index)
recvq waitq // FIFO очередь "спящих" receivers (sudog)
sendq waitq // FIFO очередь "спящих" senders (sudog)
lock mutex // защищает все поля hchan
}
type waitq struct {
first *sudog
last *sudog
}

sudog (sudo-goroutine) — структура-обёртка вокруг goroutine, добавленной в очередь канала:

type sudog struct {
g *g // ссылка на саму горутину
next *sudog
prev *sudog
elem unsafe.Pointer // адрес, куда писать или откуда читать
c *hchan // канал
isSelect bool // в составе select?
success bool // удачно ли проснулась
// ...
}
ch := make(chan int) // dataqsiz = 0, buf = nil
┌──────────── hchan ────────────┐
│ qcount = 0 │
│ dataqsiz = 0 │
│ buf = nil │
│ elemsize = 8 │
│ closed = 0 │
│ sendx = 0 │
│ recvx = 0 │
│ recvq: [G2 sudog] -> nil │ <- G2 ждёт receive
│ sendq: nil │
│ lock: free │
└───────────────────────────────┘
G1 делает ch <- 42:
1. Lock hchan.
2. Видит recvq непустой → достаёт sudog для G2.
3. Memcpy 42 напрямую в G2.elem (адрес переменной, куда G2 пишет recv).
4. goready(G2). (helper переводит G2 в runnable)
5. Unlock. G1 не паркуется.
Это direct send — без копирования в buffer.
ch := make(chan int, 4) // dataqsiz = 4
┌──────────── hchan ─────────────┐
│ qcount = 2 │
│ dataqsiz = 4 │
│ buf ──┐ │
│ ... │ │
│ sendx = 3 (куда писать) │
│ recvx = 1 (откуда читать) │
│ recvq: nil │
│ sendq: nil │
└────────────┼───────────────────┘
┌───┬───┬───┬───┐
buf │ ? │10 │20 │ ? │ (circular)
└───┴───┴───┴───┘
0 1 2 3
↑ ↑
recvx sendx
Send: buf[sendx] = v; sendx = (sendx+1) % dataqsiz; qcount++
Recv: v = buf[recvx]; recvx = (recvx+1) % dataqsiz; qcount--

Когда sendx догоняет recvx и qcount == dataqsiz — буфер полон, sender паркуется в sendq.

Упрощённый псевдокод:

func chansend(c *hchan, ep unsafe.Pointer, block bool) bool {
if c == nil {
if !block { return false } // select default
gopark(...) // навсегда
}
// fast path для non-blocking без lock
if !block && c.closed == 0 && full(c) {
return false
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic("send on closed channel")
}
// Случай 1: есть спящий receiver — direct send
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, ...) // memmove(sg.elem, ep, elemsize); goready(sg.g)
unlock(&c.lock)
return true
}
// Случай 2: есть место в буфере
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz { c.sendx = 0 }
c.qcount++
unlock(&c.lock)
return true
}
// Случай 3: блокируемся
if !block {
unlock(&c.lock)
return false
}
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
mysg.c = c
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, ...)
// ... после wakeup
releaseSudog(mysg)
return true
}

Ключевая идея: если receiver уже ждёт — копируем данные напрямую в его память, минуя буфер. Это работает даже для buffered канала, когда буфер пуст и receiver уже припарковался.

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block { return false, false }
gopark(...)
}
lock(&c.lock)
// Канал закрыт и буфер пуст
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil { typedmemclr(c.elemtype, ep) }
return true, false
}
// Случай 1: есть спящий sender
if sg := c.sendq.dequeue(); sg != nil {
// Если буфер 0 — copy from sg.elem прямо в ep.
// Если буфер N — copy buf[recvx] в ep, потом sg.elem в buf[recvx].
recv(c, sg, ep, ...)
unlock(&c.lock)
return true, true
}
// Случай 2: есть данные в буфере
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil { typedmemmove(c.elemtype, ep, qp) }
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz { c.recvx = 0 }
c.qcount--
unlock(&c.lock)
return true, true
}
// Случай 3: паркуемся
if !block {
unlock(&c.lock)
return false, false
}
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
c.recvq.enqueue(mysg)
gopark(...)
return true, !mysg.success || /* ... */
}

Нюанс: при recv из buffered канала с непустым sendq происходит сразу два действия: данные из буфера → в receiver, данные от sender → в буфер. Это сохраняет FIFO порядок.

func closechan(c *hchan) {
if c == nil { panic("close of nil channel") }
lock(&c.lock)
if c.closed != 0 { panic("close of closed channel") }
c.closed = 1
// Все receivers получат zero value и ok=false.
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil { break }
if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) }
sg.success = false
glist.push(sg.g)
}
// Все senders — panic после wakeup.
for {
sg := c.sendq.dequeue()
if sg == nil { break }
sg.success = false
glist.push(sg.g)
}
unlock(&c.lock)
// Будим всех. Senders проснутся и сделают panic("send on closed channel").
for !glist.empty() {
gp := glist.pop()
goready(gp)
}
}

Поэтому правило: закрывает только sender, и только когда уверен, что больше не будет sends.

select — это switch для каналов, реализованный в runtime.selectgo. Компилятор переводит:

select {
case v := <-ch1:
...
case ch2 <- x:
...
case <-time.After(time.Second):
...
default:
...
}

в массив scase и вызов:

runtime.selectgo(cases, order, ncases) -> (selected int, recvOK bool)

Алгоритм:

  1. Random shuffle порядка проверки кейсов — это и есть знаменитая случайность select.
  2. Lock all каналов в фиксированном порядке (по адресу — чтобы избежать deadlock от ordering).
  3. Pass 1 (polling): пройти по shuffled order, проверить можем ли немедленно выполнить (есть данные в recvq/sendq/буфере).
  4. Если есть — unlock все, выполнить кейс.
  5. Если default — выполнить default, unlock все.
  6. Pass 2 (parking): создать sudog для каждого кейса и enqueue в соответствующие recvq/sendq.
  7. gopark — текущая горутина засыпает.
  8. Wakeup: какой-то канал стал доступен → goready → горутина просыпается.
  9. Cleanup: пройти по всем sudog’ам, dequeue из остальных каналов.
  10. Вернуть индекс кейса, который сработал.
selectgo
┌──────────────┴───────────────┐
▼ ▼
shuffle order lock all (by addr)
│ │
└──────► poll cases ◄──────────┘
┌───────────┼─────────────┐
▼ ▼ ▼
ready? default? park (sudog in
all channels' queues)
│ │ │
▼ ▼ ▼
execute execute wait → wakeup
→ cleanup → execute

Go компилятор распознаёт типовые случаи:

select {
case v := <-ch:
// ...
}
// эквивалентно: v := <-ch (компилятор так и переписывает)
select {
case v := <-ch:
// ...
default:
// ...
}
// эквивалентно: вызов chanrecv с block=false

Это видно в дизассемблере: go build -gcflags='-m=2' или go tool objdump.

  • gopark(unlockf, lock, reason, traceEv, traceskip) — кладёт goroutine в waiting state, вызывает scheduler. Перед сном вызывает unlockf(g, lock) (например, отпускает c.lock), чтобы атомарно “отдать lock и заснуть”.
  • goready(gp, traceskip) — переводит goroutine в runnable и ставит в local runq P. Сама не блокирует.

Зачем unlockf: иначе race — горутина уже в очереди, но ещё держит lock. Receiver разбудит её до того, как она реально заснёт.

hchan.lock — это runtime.mutex (semaphore-based, futex на Linux), не Go’s sync.Mutex. Он короткий: время удержания — десятки наносекунд. На многоядерных системах есть несколько spin-итераций перед park.

Hot path:

  • Если канал unbuffered и есть готовый peer — lock + memcpy + unlock без парковки.
  • Если буфер не пуст/полон — lock + index update + memcpy + unlock.

Спека Go гарантирует:

  1. Send happens-before соответствующего recv.

    var x int
    ch := make(chan struct{})
    go func() { x = 42; ch <- struct{}{} }()
    <-ch
    fmt.Println(x) // 42 гарантированно
  2. Close happens-before recv, который вернул zero value из-за close.

  3. Recv из unbuffered happens-before send-а в этот канал (по спеке Go 1.22+, поведение перевернули относительно ранних версий).

  4. Для buffered: k-й recv happens-before (k+C)-го send, где C — capacity. Это означает: receiver, прочитавший k-й элемент, видит всё, что было до (k+C)-го send-а.

Используй: для передачи готового состояния между горутинами достаточно одного канала — не нужны лишние мьютексы.


3.1. ⚠️ Send в nil канал — блокировка навсегда

Заголовок раздела «3.1. ⚠️ Send в nil канал — блокировка навсегда»
var ch chan int // nil!
ch <- 42 // deadlock forever

Полезно в select: динамически отключай ветку, обнулив переменную.

var sendCh chan int = realCh
for {
select {
case sendCh <- v:
sendCh = nil // отключаем ветку до следующего цикла
case <-done:
return
}
}
ch := make(chan int)
close(ch)
v := <-ch // v == 0, ok = false
v2, ok := <-ch
fmt.Println(v2, ok) // 0 false

Часто причина бага: цикл for v := range ch после close завершается, но for { v := <-ch } крутится в бесконечном zero.

3.3. ⚠️ Range по каналу не завершается без close

Заголовок раздела «3.3. ⚠️ Range по каналу не завершается без close»
ch := make(chan int, 3)
ch <- 1; ch <- 2; ch <- 3
for v := range ch { // зависнет после 3-го!
fmt.Println(v)
}
// Нужно: close(ch) перед range
// АНТИПАТТЕРН: receiver закрывает
go func() {
for v := range ch { ... }
close(ch) // ← если есть другой sender, он paniс'нет
}()

Правило: канал закрывает только sender. Если senders несколько — координируй через WaitGroup или channel-of-channels.

3.5. ⚠️ Select random — нельзя полагаться на порядок

Заголовок раздела «3.5. ⚠️ Select random — нельзя полагаться на порядок»
select {
case <-ch1:
// не выполнится "first", если ch2 тоже готов
case <-ch2:
}

Если нужен приоритет — двойной select:

select {
case <-priority:
return
default:
}
select {
case <-priority:
case <-regular:
}
func leak() <-chan int {
ch := make(chan int) // unbuffered
go func() {
ch <- 42 // блокируется навсегда, если caller не читает
}()
return ch
}

Решение: буферизировать (make(chan int, 1)) или context.

go func() { ch <- 1 }() // sender
close(ch) // race! Может panic.

Закрывать можно ТОЛЬКО когда уверен, что больше никто не пишет. Паттерн “M senders, 1 receiver” → нельзя receiver-у закрывать, нельзя одному из senders. Решение: отдельный done канал для сигнала остановки.

for {
select {
case <-ch:
case <-time.After(time.Second): // создаёт НОВЫЙ таймер каждый раз
}
}

Если итераций много — старые таймеры зависают в memory до срабатывания. Go 1.23+ оптимизировал GC таймеров, но всё равно: используй time.NewTimer + Reset.

select {
case <-ch:
case <-ch:
}

Компилируется, но бессмысленно — оба кейса одинаковы. Внутри shuffled, выберется случайный.

Нет способа атомарно “send if not closed”. Нужно либо никогда не закрывать (полагайся на GC), либо использовать recover в sender (грязно), либо отдельный signal channel.

3.11. ⚠️ Cap/Len каналов — не для логики синхронизации

Заголовок раздела «3.11. ⚠️ Cap/Len каналов — не для логики синхронизации»
if len(ch) > 0 {
v := <-ch // race! может быть пустым к моменту recv
}

len(ch) и cap(ch) — snapshot, не lock. Используй только для метрик/heuristic.

func producer() chan<- int // send-only
func consumer() <-chan int // recv-only

Закрывать send-only можно (close(out)), а recv-only — нельзя, compile error. Используй для API safety.


ОперацияВремя
sync.Mutex Lock + Unlock (uncontended)~15-25 ns
atomic.AddInt64~5-7 ns
chan int send + recv (unbuffered)~80-120 ns
chan int send + recv (buffered, не полон)~50-80 ns
select 1 case с готовым каналом~100-150 ns
select 4 cases~200-300 ns
context.Done() recv (closed)~30-50 ns

Под contention каналы деградируют сильнее мьютексов: внутри один lock, и все горутины через него идут.

// Hot loop: counter через канал
ch := make(chan int, 1)
ch <- 0
for i := 0; i < N; i++ {
v := <-ch
ch <- v + 1
}

Это в 20+ раз медленнее atomic. Channel хорош для передачи данных и сигналов, а не для разделяемого состояния.

  • 0 (unbuffered) — синхронизация point-to-point.
  • 1 — мaybe-signal, “сигнал готов, но не блокируем sender ещё раз”.
  • N — для batching/throughput. Чем больше — тем больше latency и memory.
  • Не делай make(chan T, 1<<20) — это сразу аллоцирует огромный массив.
Окно терминала
go test -bench=. -benchmem -cpuprofile=cpu.out
go tool pprof -http=:8080 cpu.out

В pprof ищи функции: runtime.chansend1, runtime.chanrecv1, runtime.selectgo. Если top — значит channels стали bottleneck.

chans := make([]chan int, 64) // каждый chan — 16 байт указатель,
// но hchan структура — 96 байт.

Если 64 горутины пишут в свой канал — hchan.lock каждого может быть в одной cache line с соседним. Лучше — padding, или runtime.LockOSThread + per-P state.


  • Передача владения данными между горутинами (“don’t communicate by sharing memory”).
  • Сигналы: done, cancel, tick.
  • Pipeline/fan-out/fan-in (см. файл 07).
  • Bounded work queue для worker pool.
  • Счётчик / шаренное числоsync/atomic.

    var counter atomic.Int64
    counter.Add(1)
  • Mutual exclusion над структуройsync.Mutex.

    type Cache struct {
    mu sync.Mutex
    m map[string]int
    }
  • Один-раз-выполнитьsync.Once.

  • Read-heavy кешsync.Map или RWMutex + map.

  • Простое “подождать N горутин”sync.WaitGroup.

  • Сложная синхронизация N писателей с приоритетамиsync.Cond + Mutex, или errgroup + semaphore.

Нужна синхронизация?
├─ Передаю данные? → channel
├─ Считаю/обновляю одно число? → atomic
├─ Защищаю структуру? → Mutex
│ ├─ Чтений сильно больше? → RWMutex
├─ Жду готовности? → WaitGroup / channel
├─ Раз в жизни? → Once
├─ Сигнал об изменении состояния? → Cond / channel

1. Опиши структуру hchan. Поля: qcount, dataqsiz, buf, elemsize, closed, sendx, recvx, recvq (waitq sudog’ов receiver-ов), sendq, lock. Это circular buffer + 2 очереди ожидающих + mutex.

2. Чем unbuffered отличается от buffered внутри hchan? У unbuffered dataqsiz=0, buf=nil. Send/recv всегда либо direct (если peer ждёт), либо park. У buffered есть circular buffer; пока есть место — sender не паркуется.

3. Что такое sudog? Структура-обёртка вокруг goroutine, кладётся в waitq канала, когда g паркуется. Содержит указатель на g, на канал, на буфер для данных. Pooled через acquireSudog/releaseSudog.

4. Что происходит при send в полный буфер? Lock; буфер полон, recvq пуст → создать sudog, добавить в sendq, gopark с unlock в качестве park callback. Когда receiver возьмёт из буфера — он dequeue из sendq и goready нашу горутину.

5. Что такое direct send? Если в recvq уже есть receiver, sender копирует данные прямо в sg.elem (адрес переменной, куда receiver пишет), минуя buffer. Это работает даже для buffered канала, когда буфер пустой и кто-то уже припарковался.

6. Почему close может panic?

  • close(nil) — panic.
  • close уже closed — panic.
  • send в closed — panic, потому что sender проснулся и проверил c.closed == 1.

7. Что делает close с recvq и sendq?

  • recvq: все receivers получат zero value, ok=false, и goready.
  • sendq: все senders проснутся и сделают panic. (Поэтому senders не должны быть в sendq на момент close — это бага.)

8. Как реализован select? runtime.selectgo: shuffle order для fairness, lock все каналы по адресу, pass1 — non-blocking polling, pass2 — park со sudog в каждой очереди, после wakeup cleanup лишних sudog’ов.

9. Почему select шафлит cases? Чтобы избежать starvation: если бы порядок был фиксирован, всегда срабатывал бы первый готовый.

10. Что делает default в select? Делает select non-blocking. Если ни один кейс не готов сразу — выполняется default. Внутри это block=false параметр для chansend/chanrecv.

11. Что произойдёт, если в select два кейса готовы одновременно? Выберется случайный (с равной вероятностью).

12. Что делает gopark? Кладёт текущую g в waiting state, вызывает unlockf (для отпускания канального lock), и передаёт управление scheduler-у. Никогда не возвращается без goready.

13. Что делает goready? Переводит g из waiting в runnable, кладёт в local runq P. Возвращается сразу.

14. Channel дороже Mutex? Да, под uncontended нагрузкой Mutex ~20ns, channel ~80-100ns. Channel — это Mutex + структура hchan + работа с очередями.

15. Когда channel выгоднее Mutex? Когда нужна передача данных и сигнал готовности одновременно (mutex этого не делает). Также для bounded work queue и pipeline.

16. Send в nil канал — что происходит? Блокируется навсегда. Полезно для динамического отключения case в select (присвой nil — case никогда не сработает).

17. Range по unbuffered каналу? Работает: цикл ждёт next send и завершается при close. Если нет close — leak.

18. Memory ordering: gives ли channel happens-before? Да. Send happens-before соответствующего recv. Close happens-before любого recv, который вернул zero из-за close. Это база для синхронизации без mutex.

19. Как реализован circular buffer? sendx и recvx — индексы по модулю dataqsiz. После записи sendx++ если == dataqsiz то 0. После чтения recvx++ аналогично. qcount считает фактический размер.

20. Что произойдёт при len(ch) / cap(ch)? Возвращают c.qcount и c.dataqsiz (с lock). Это snapshot, нельзя полагаться для логики.

21. Что делать с time.After в цикле? Не использовать. Каждый вызов создаёт новый Timer, который висит в heap до срабатывания. Используй time.NewTimer + Reset или context.WithTimeout.

22. Closing с несколькими senders — как? Через отдельный done chan struct{}. Senders проверяют select { case <-done: return; case ch <- v: }. Закрывает координатор done, не ch.

23. Что вернёт v, ok := <-ch, если ch не closed но пустой? Заблокируется (если не select). Если все готово — ok=true.

24. Какая разница между chan struct{} и chan bool для сигнала? chan struct{} занимает 0 байт на элемент (elemsize=0). Чисто стилистика и микро-эффективность. Семантически — то же.

25. Direct send vs buffered: можно ли понять что произошло? Со стороны кода — нет. С точки зрения runtime — в одном случае memcpy в sg.elem, в другом — в buf[sendx]. Не имеет значения для пользователя.

26. Что такое spurious wakeup в Go channels? В Go отсутствует — runtime гарантирует, что после goready горутина действительно может проверить условие. (В отличие от pthread_cond_wait.)

27. Может ли select deadlock? Может, если все каналы nil и нет default. Runtime детектит “all goroutines asleep” и упадёт с fatal error.

28. Что произойдёт при ch := make(chan int, -1)? Runtime panic: makechan: size out of range. Capacity — uint64, но negative приведётся к огромному числу, и сработает limit check.

29. Как быстро уведомить много горутин? close(ch). Все, кто <-ch, получат zero value мгновенно. Это broadcast-style notification.

30. Когда chan T лучше <-chan T / chan<- T? chan T — для внутреннего использования. Возвращай из функций <-chan T (output) или принимай chan<- T (input), чтобы API не позволял неправильную операцию.


Задача 1. Реализовать Or — комбинатор каналов

Заголовок раздела «Задача 1. Реализовать Or — комбинатор каналов»

Принимает done каналов, возвращает один канал, который закрывается, когда закрылся ЛЮБОЙ из переданных.

func Or(channels ...<-chan interface{}) <-chan interface{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
out := make(chan interface{})
go func() {
defer close(out)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-Or(append(channels[3:], out)...):
// out passed чтобы при early return дочерние горутины завершились
}
}
}()
return out
}

Замечание: на практике используй context.WithCancel — это проще.

Задача 2. Реализовать ограниченный буферизованный канал на основе мьютекса

Заголовок раздела «Задача 2. Реализовать ограниченный буферизованный канал на основе мьютекса»
type BoundedQueue[T any] struct {
mu sync.Mutex
cond *sync.Cond
buf []T
capacity int
closed bool
}
func NewBoundedQueue[T any](cap int) *BoundedQueue[T] {
q := &BoundedQueue[T]{buf: make([]T, 0, cap), capacity: cap}
q.cond = sync.NewCond(&q.mu)
return q
}
func (q *BoundedQueue[T]) Push(v T) bool {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.buf) == q.capacity && !q.closed {
q.cond.Wait()
}
if q.closed { return false }
q.buf = append(q.buf, v)
q.cond.Broadcast()
return true
}
func (q *BoundedQueue[T]) Pop() (T, bool) {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.buf) == 0 && !q.closed {
q.cond.Wait()
}
var zero T
if len(q.buf) == 0 { return zero, false }
v := q.buf[0]
q.buf = q.buf[1:]
q.cond.Broadcast()
return v, true
}
func (q *BoundedQueue[T]) Close() {
q.mu.Lock()
q.closed = true
q.cond.Broadcast()
q.mu.Unlock()
}

Сравни: канал делает ровно это, но в одной строке make(chan T, cap). Bell — узнал, как работают каналы внутри.

Задача 3. Безопасный broadcast: один sender, N receivers

Заголовок раздела «Задача 3. Безопасный broadcast: один sender, N receivers»
type Broadcaster[T any] struct {
mu sync.Mutex
subs []chan T
}
func (b *Broadcaster[T]) Subscribe() <-chan T {
ch := make(chan T, 16)
b.mu.Lock()
b.subs = append(b.subs, ch)
b.mu.Unlock()
return ch
}
func (b *Broadcaster[T]) Publish(v T) {
b.mu.Lock()
defer b.mu.Unlock()
for _, ch := range b.subs {
select {
case ch <- v:
default:
// slow consumer — drop
}
}
}
func (b *Broadcaster[T]) Close() {
b.mu.Lock()
defer b.mu.Unlock()
for _, ch := range b.subs {
close(ch)
}
b.subs = nil
}

Задача 4. Atomic counter через канал — антипаттерн, но как упражнение

Заголовок раздела «Задача 4. Atomic counter через канал — антипаттерн, но как упражнение»
type ChannelCounter struct {
inc chan struct{}
get chan int
done chan struct{}
}
func NewChannelCounter() *ChannelCounter {
c := &ChannelCounter{
inc: make(chan struct{}),
get: make(chan int),
done: make(chan struct{}),
}
go func() {
n := 0
for {
select {
case <-c.inc:
n++
case c.get <- n:
case <-c.done:
return
}
}
}()
return c
}
func (c *ChannelCounter) Inc() { c.inc <- struct{}{} }
func (c *ChannelCounter) Get() int { return <-c.get }
func (c *ChannelCounter) Close() { close(c.done) }

Запусти бенчмарк против atomic.Int64 — увидишь разницу в 20-100 раз. Цель упражнения: понять, что актор-стиль через channel реален, но atomic выигрывает для счётчиков.

// Из двух каналов: priority и normal. Если в priority есть данные — берём её.
func priorityRead(priority, normal <-chan int) (int, bool) {
// Двойной select: сначала только priority с default.
select {
case v := <-priority:
return v, true
default:
}
select {
case v := <-priority:
return v, true
case v := <-normal:
return v, true
}
}
func Tee[T any](done <-chan struct{}, in <-chan T) (<-chan T, <-chan T) {
out1 := make(chan T)
out2 := make(chan T)
go func() {
defer close(out1)
defer close(out2)
for v := range in {
var o1, o2 = out1, out2
for i := 0; i < 2; i++ {
select {
case <-done:
return
case o1 <- v:
o1 = nil // отключаем эту ветку
case o2 <- v:
o2 = nil
}
}
}
}()
return out1, out2
}

Трюк с обнулением переменной — типовой для select.

func RunBounded(items []int, n int, fn func(int)) {
sem := make(chan struct{}, n)
var wg sync.WaitGroup
for _, item := range items {
item := item
sem <- struct{}{} // acquire (блокируется при N запущенных)
wg.Add(1)
go func() {
defer wg.Done()
defer func() { <-sem }() // release
fn(item)
}()
}
wg.Wait()
}

Запусти с -race:

func race() {
ch := make(chan int, 1)
go func() { ch <- 1 }()
close(ch) // race! либо panic, либо успешно
}

Покажи коллегам — частый источник продакшен-багов.


  1. Go source: runtime/chan.go — официальная реализация hchan, send/recv, close. https://github.com/golang/go/blob/master/src/runtime/chan.go
  2. Go source: runtime/select.goselectgo. https://github.com/golang/go/blob/master/src/runtime/select.go
  3. The Go Memory Modelhttps://go.dev/ref/mem (happens-before гарантии).
  4. Dave Cheney — “Channels Considered Harmful”https://dave.cheney.net/2013/04/30/curious-channels (когда не нужно).
  5. Kavya Joshi — “Understanding Channels” (GopherCon 2017)https://www.youtube.com/watch?v=KBZlN0izeiY (классический разбор hchan).
  6. Go 101 — “Channel Use Cases”https://go101.org/article/channel-use-cases.html.
  7. Russ Cox — “Go Concurrency Patterns”https://blog.golang.org/pipelines.
  8. sync/atomic docshttps://pkg.go.dev/sync/atomic (когда atomic вместо channel).