Перейти к содержимому

Каналы (channels)

Каналы — основной примитив коммуникации между горутинами в Go. Лозунг Rob Pike: “Don’t communicate by sharing memory; share memory by communicating.” На собеседовании это — топ-1 тема по concurrency. Спросят про unbuffered vs buffered, send/recv на nil/closed, hchan, select, leaks таймеров, паттерны (worker pool, fan-in/out, pipeline). Без понимания каналов под капотом легко налететь на дедлок или panic.

  1. Базовое API
  2. Под капотом: hchan, sendq/recvq
  3. Тонкие моменты / Gotchas
  4. Производительность
  5. Паттерны
  6. Типичные вопросы на собесе
  7. Practice
  8. Источники

Channel — типизированный “канал” (pipe) для передачи значений между горутинами. Это first-class значение: его можно передавать в функции, складывать в структуры, возвращать.

var ch chan int // nil channel (нельзя пользоваться)
ch = make(chan int) // unbuffered
ch = make(chan int, 10) // buffered, cap=10
ch <- 42 // send (запись)
v := <-ch // recv (чтение)
v, ok := <-ch // recv с проверкой "канал закрыт?"
close(ch) // закрыть

Unbuffered (make(chan T)):

  • Send блокируется, пока другая горутина не сделает recv.
  • Recv блокируется, пока другая горутина не сделает send.
  • Это rendezvous (свидание): передача происходит синхронно, обе горутины встречаются в точке обмена.
ch := make(chan int) // unbuffered
go func() {
ch <- 42 // блок, пока main не прочтёт
fmt.Println("sent")
}()
v := <-ch // блок, пока горутина не запишет
fmt.Println(v, "received")

Buffered (make(chan T, n)):

  • Send блокируется, только если буфер полон.
  • Recv блокируется, только если буфер пуст.
  • До n элементов помещаются без блокировки.
ch := make(chan int, 2)
ch <- 1 // ок, буфер: [1]
ch <- 2 // ок, буфер: [1, 2]
ch <- 3 // блок! буфер полон

close(ch) — закрывает канал. После закрытия:

  • Send на закрытый канал → panic.
  • Recv из закрытого канала → возвращает zero value и ok = false.
  • Если в буфере остались элементы — они прочитываются по очереди до zero value.
ch := make(chan int, 3)
ch <- 1; ch <- 2; ch <- 3
close(ch)
for v := range ch {
fmt.Println(v) // 1, 2, 3 — потом цикл завершается
}
v, ok := <-ch
// v=0, ok=false

⚠️ Правило: только sender закрывает канал. Если у вас несколько senders — не закрывайте, или используйте sync.Once / отдельный done-канал.

for v := range ch {
// читает, пока канал не закрыт
}

range останавливается, когда канал закрыт И пуст. Если канал не закрывают — range блокируется вечно (потенциальный leak!).

select ждёт на нескольких канальных операциях одновременно:

select {
case v := <-ch1:
fmt.Println("from ch1:", v)
case ch2 <- 42:
fmt.Println("sent to ch2")
case <-time.After(time.Second):
fmt.Println("timeout")
default:
fmt.Println("none ready")
}

Семантика:

  • Если готов один case — выполняется он.
  • Если готовы несколько — выбирается псевдослучайно (не порядком в коде!).
  • Если ни один не готов и есть default — выполняется default (non-blocking).
  • Если нет defaultselect блокируется до готовности любого case.
func producer(out chan<- int) { // только send
out <- 42
}
func consumer(in <-chan int) { // только recv
fmt.Println(<-in)
}
ch := make(chan int)
go producer(ch)
consumer(ch)

Двусторонний chan int неявно конвертируется в chan<- int или <-chan int. Обратно нельзя. Это inhabits API: видишь сигнатуру — понимаешь, кто send, кто recv.


В исходниках Go (src/runtime/chan.go) канал — это указатель на структуру hchan:

type hchan struct {
qcount uint // число элементов в буфере сейчас
dataqsiz uint // capacity буфера
buf unsafe.Pointer // указатель на кольцевой буфер
elemsize uint16 // размер одного элемента
closed uint32 // флаг "канал закрыт"
elemtype *_type // тип элемента
sendx uint // индекс, куда писать в буфере
recvx uint // индекс, откуда читать
recvq waitq // очередь горутин, ждущих recv
sendq waitq // очередь горутин, ждущих send
lock mutex // защищает всё это
}
┌─────────────────────────────────────────────────────┐
│ hchan │
│ │
│ ┌──────────────────────────────────────────────┐ │
│ │ buf (кольцевой буфер, dataqsiz слотов) │ │
│ │ ┌────┬────┬────┬────┬────┬────┬────┬────┐ │ │
│ │ │ 42 │ 17 │ 99 │ -- │ -- │ -- │ -- │ -- │ │ │
│ │ └────┴────┴────┴────┴────┴────┴────┴────┘ │ │
│ │ ▲ ▲ │ │
│ │ │ │ │ │
│ │ recvx=0 sendx=3 │ │
│ │ qcount=3, dataqsiz=8 │ │
│ └──────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────┐ ┌──────────────────────┐ │
│ │ sendq (ждут │ │ recvq (ждут recv, │ │
│ │ send, буфер │ │ буфер пуст) │ │
│ │ полон) │ │ │ │
│ │ G1 → G2 → ... │ │ G7 → G8 → ... │ │
│ └──────────────────┘ └──────────────────────┘ │
│ │
│ closed: 0 lock: mutex │
└─────────────────────────────────────────────────────┘

buf — это кольцевой буфер (circular). sendx — индекс для следующей записи, recvx — для следующего чтения. После каждой операции индексы инкрементируются по модулю dataqsiz.

При ch <- v:

  1. Взять c.lock.
  2. Если канал закрыт → release lock, panic.
  3. Если есть G в recvq (кто-то ждёт recv) → достать G, скопировать v прямо в её стек (фастпуть, минует буфер!), пробудить G, release lock.
  4. Если есть место в буфере (qcount < dataqsiz) → записать v в buf[sendx], sendx++, qcount++, release lock.
  5. Иначе (буфер полон или unbuffered) → положить текущую G в sendq, park (заблокировать), отдать lock внутри парковки. Когда какой-то receiver разбудит — продолжить.

При v := <-ch:

  1. Взять c.lock.
  2. Если канал закрыт И буфер пуст → release lock, вернуть zero value, ok=false.
  3. Если есть G в sendq:
    • Если буфер есть и не пуст: достать значение из buf[recvx], переместить значение из ожидающей sender G в buf[sendx], пробудить sender. (Хитрая логика для буферизованных.)
    • Если unbuffered: скопировать значение из стека sender G напрямую, пробудить sender.
  4. Если буфер не пуст → прочитать buf[recvx], recvx++, qcount--, release lock.
  5. Иначе → положить G в recvq, park.

При close(ch):

  1. Взять lock.
  2. Если уже closed → panic.
  3. Поставить closed = 1.
  4. Пробудить всех в recvq — они получат zero value, ok=false.
  5. Пробудить всех в sendq — они запаникуют (send на closed).
  6. Release lock.

var ch chan intch равен nil. Операции:

  • ch <- vблок навсегда (горутина в Gwaiting).
  • <-chблок навсегда.
  • close(ch)panic.

Это фича, используется в select:

var ch chan int // nil
select {
case <-ch: // никогда не сработает
...
case <-time.After(time.Second):
fmt.Println("timeout")
}

Можно “отключать” case в select-е, занулив канал:

for {
select {
case v, ok := <-inCh:
if !ok {
inCh = nil // больше не будем ждать на нём
continue
}
handle(v)
case <-ctx.Done():
return
}
if inCh == nil { break }
}

select в рантайме реализован через runtime.selectgo. Он:

  1. Перемешивает список case-ов (для fairness).
  2. Берёт locks на все каналы в определённом порядке (по адресу, чтобы избежать deadlock).
  3. Проходит cases — если есть ready, выполняет.
  4. Если нет ready и нет default — паркует G во все sendq/recvq соответствующих каналов.
  5. Когда G пробуждается — она знает, какой case сработал.

Это означает: select на N каналах стоит O(N) + локи. Не делайте select на сотнях каналов.


⚠️ Блокируется навсегда. Дедлок-детектор не всегда поможет (он срабатывает только если все горутины заблокированы).

func main() {
var ch chan int
go func() { ch <- 1 }() // утечка горутины
fmt.Println("main runs")
time.Sleep(time.Second)
}

⚠️ Panic.

ch := make(chan int)
close(ch)
ch <- 1 // panic: send on closed channel

Это часто случается, когда нескольких sender-ов разводят, и один из них закрывает канал, а другой пишет.

⚠️ Panic.

ch := make(chan int)
close(ch)
close(ch) // panic: close of closed channel

Если нужно “защититься”:

var once sync.Once
once.Do(func() { close(ch) })

⚠️ Panic.

var ch chan int
close(ch) // panic: close of nil channel

Правило: закрывает sender, и только когда уверен, что больше не будет писать.

  • 1 sender, N receivers → sender закрывает.
  • N senders, 1 receiver → НЕ закрывайте отправители (они не знают, когда остановиться). Используйте отдельный done канал.
  • N senders, N receivers → используйте координатор (например, sync.WaitGroup + done канал).

Подробнее: см. Channels Closing Principle (Tao Wang) — https://go101.org/article/channel-closing.html.

⚠️ Классическая ошибка:

for {
select {
case msg := <-ch:
handle(msg)
case <-time.After(time.Second):
return
}
}

time.After создаёт новый таймер каждую итерацию. Если msg приходит часто — таймеры аккумулируются и не GC-ятся до своего срабатывания. Это утечка памяти.

Фикс — использовать time.NewTimer + Stop + Reset:

t := time.NewTimer(time.Second)
defer t.Stop()
for {
if !t.Stop() {
select { case <-t.C: default: } // дренировать
}
t.Reset(time.Second)
select {
case msg := <-ch:
handle(msg)
case <-t.C:
return
}
}

С Go 1.23 API таймеров был упрощён: Reset сам обрабатывает дренирование, и Stop корректно работает в большинстве случаев. Но для совместимости со старыми версиями знайте старое API.

Многие думают: “поставил буфер 1000 — теперь не блокирует”. Нет. Если consumer медленнее producer-а, буфер заполнится, и producer тоже встанет. Буфер — только сглаживание burst-ов, не решение архитектуры.

Использование буфера 1 — иногда уместный паттерн для “сигнала с памятью”:

done := make(chan struct{}, 1)
// send в done не блокирует, даже если никто не читает
select {
case done <- struct{}{}:
default:
}
select {} // блокирует горутину навсегда (вечный sleep)

Используется иногда в main для “удержания процесса”, но почти всегда — баг (либо не знают про context, либо не знают про graceful shutdown).

Когда что?

ЗадачаЛучше использовать
Передача владения данныхChannel
Координация горутин (pipeline)Channel
Cancellation / done signalChannel + context
Защита разделяемого stateMutex
Счётчикatomic
One-time initsync.Once
Fan-in / fan-outChannel

Цитата Sameer Ajmani: “Use a Mutex for protecting state, use channels for communicating it.”

Канал — не панацея. Защитить map от concurrent write — просто sync.Mutex или sync.Map. Зачем городить goroutine-владельца с каналом, когда есть sync.Mutex?

⚠️ Подвох: отправка значения по каналу = happens-before получения. Это означает, что данные, изменённые перед send, видны после recv в другой горутине без дополнительных мьютексов:

var data []int
go func() {
data = []int{1, 2, 3}
ch <- struct{}{} // send
}()
<-ch // recv
fmt.Println(data) // безопасно: гарантированно видим {1,2,3}

Это формально в Go Memory Model.


Примерные числа (на современном x86):

ОперацияВремя
send/recv на unbuffered, рendezvous~70 ns
send/recv на buffered (есть место)~30 ns
send с парковкой (буфер полон)~200 ns
select с 2 каналами~100 ns
atomic.AddInt64~5 ns
Mutex Lock/Unlock (uncontended)~15 ns

Mutex/atomic быстрее канала на порядок. Если задача — счётчик, не используйте канал.

Слишком маленький → producer блокируется.
Слишком большой → лишняя память, скрытые проблемы (consumer отстаёт, но не видно).

Эвристика: начните с буфера = ожидаемый размер burst-а. Замеряйте len(ch) через метрики (channel.queue.length).

Если consumer медленный и каждый item требует setup-а, читайте пачками:

batch := make([]Item, 0, 100)
for {
select {
case item := <-ch:
batch = append(batch, item)
if len(batch) >= 100 {
processBatch(batch)
batch = batch[:0]
}
case <-time.After(100 * time.Millisecond):
if len(batch) > 0 {
processBatch(batch)
batch = batch[:0]
}
}
}

done := make(chan struct{})
go func() {
for {
select {
case <-done:
return
case msg := <-input:
process(msg)
}
}
}()
// закрываем для broadcast всем подписчикам:
close(done)

chan struct{} — пустая структура, 0 байт, идеально для сигнала. Закрытие канала — broadcast (все receivers получат zero value сразу).

func count(n int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; i < n; i++ {
out <- i
}
}()
return out
}
for v := range count(10) {
fmt.Println(v)
}
func workerPool(jobs <-chan Job, results chan<- Result, n int) {
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobs {
results <- process(job)
}
}()
}
go func() {
wg.Wait()
close(results)
}()
}
func source() <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; i < 10; i++ {
out <- i
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range in {
out <- v * v
}
}()
return out
}
func main() {
for v := range square(source()) {
fmt.Println(v)
}
}

Один producer, много workers:

in := source()
for i := 0; i < 4; i++ {
go worker(in) // все 4 читают из in
}

Каналы — многопотокобезопасны на чтение и запись (по одному элементу за раз).

func merge(chans ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range chans {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func or(chans ...<-chan struct{}) <-chan struct{} {
out := make(chan struct{})
go func() {
defer close(out)
cases := make([]reflect.SelectCase, len(chans))
for i, ch := range chans {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
}
}
reflect.Select(cases) // ждёт любой
}()
return out
}

Современнее — использовать context.WithCancel.

select {
case result := <-doSomething():
use(result)
case <-time.After(time.Second):
return ErrTimeout
}

⚠️ Подвох с time.After (см. 3.6). Если внутри hot loop — используйте context.WithTimeout.

limiter := time.Tick(time.Millisecond * 200) // тикает каждые 200ms
for req := range requests {
<-limiter // ждать тика
go handle(req)
}

⚠️ time.Tick не GC-ится. Для production используйте time.NewTicker + Stop.

sem := make(chan struct{}, 10) // макс 10 одновременно
for _, task := range tasks {
sem <- struct{}{} // блок, если 10 уже работают
go func(t Task) {
defer func() { <-sem }()
process(t)
}(task)
}

  1. Что произойдёт при send в nil канал? Блок навсегда.

  2. Что произойдёт при close(nil)? Panic.

  3. Что произойдёт при send в закрытый канал? Panic.

  4. Что вернёт recv из закрытого канала? Zero value + ok=false. Если в буфере что-то осталось — сначала прочитает их.

  5. Чем отличается unbuffered от buffered канала? Unbuffered — rendezvous (send блокируется до recv). Buffered — до cap не блокируется.

  6. Какова семантика select при нескольких готовых case? Псевдослучайный выбор (для fairness).

  7. Что делает default в select? Делает select non-blocking: если ни один case не готов, выполняется default.

  8. Можно ли закрыть канал из receiver? Технически да, но не нужно. Закрывает sender — он знает, когда не будет писать.

  9. Что такое hchan? Внутренняя структура канала: буфер, sendq, recvq, mutex, sendx/recvx.

  10. Как реализован буфер канала? Кольцевой буфер (ring buffer) с индексами sendx/recvx по модулю cap.

  11. Почему for v := range ch иногда висит? Канал не закрывают — range ждёт следующий элемент или закрытие.

  12. Как сделать broadcast (одно событие — всем receivers)? close(ch) — все, кто читает из ch, получат zero value одновременно.

  13. time.After в цикле — что плохого? Каждая итерация создаёт новый таймер. Если case на канале срабатывает чаще, чем d — таймеры не GC-ятся → утечка памяти.

  14. chan struct{} vs chan bool для сигнала? struct{} — 0 байт, более идиоматично.

  15. Чем nil канал полезен в select? Дисэйблит соответствующий case (никогда не сработает).

  16. Что делает range по каналу размера 0 после close? Цикл сразу завершается.

  17. Когда лучше mutex, чем канал? Защита разделяемого state, счётчики, простые critical sections. Канал — overhead.

  18. Что такое “happens-before” в контексте каналов? Send на канал happens-before соответствующего recv → данные, записанные перед send, видны после recv.

  19. Как реализовать worker pool с лимитом? Создать N горутин, читающих из общего jobs канала. Когда jobs закрыт — горутины завершаются.

  20. Что не так с этим кодом?

    ch := make(chan int)
    ch <- 1
    fmt.Println(<-ch)

    Deadlock: send блокируется в main, пока никто не читает (а read в той же горутине!). Нужен go или buffered.

  21. Можно ли отправлять в канал из нескольких горутин? Да, send и recv многопоточно-безопасны (один элемент = атомарно).

  22. Можно ли закрыть канал из нескольких горутин? Нет, второй close → panic. Используйте sync.Once.

  23. Что такое fan-in / fan-out? Fan-out: один канал, много consumers. Fan-in: много producers → один канал.

  24. Как реализовать pipeline? Каждая стадия — горутина, возвращающая канал. Следующая стадия читает из канала предыдущей. Последняя стадия — consumer.

  25. Что произойдёт при двойном close? Panic. Защита — sync.Once.

  26. Зачем нужны направленные каналы (chan<-, <-chan)? Сужают API: видно, кто sender, кто receiver. Защищает от случайного close в receiver-функции.

  27. Передача больших значений по каналу — что плохо? Копирование. Лучше передавать указатель — НО! тогда нужна аккуратность с дальнейшим использованием.

  28. Зачем chan struct{} иногда буферизуют размером 1? Чтобы send не блокировался даже если никто не читает — “non-blocking notify”.

  29. Чем отличается close от nil? Close — финальное состояние (recv возвращает zero+false). Nil — канал-плейсхолдер (send/recv блокируют навсегда).

  30. Можно ли использовать канал как мьютекс? Да, через буфер 1: ch <- struct{}{} = Lock, <-ch = Unlock. Но настоящий sync.Mutex быстрее и идиоматичнее.


Напишите программу, в которой 3 producer-а пишут случайные числа в канал, 2 consumer-а читают и складывают. Используйте WaitGroup для корректного завершения.

Реализуйте pipeline: source → filter (только чётные) → square → sink (печать). Каждая стадия — отдельная горутина.

func main() {
ch := make(chan int)
ch <- 1
fmt.Println(<-ch)
}

Почему deadlock? Как исправить (без изменения логики)?

Ответ: make(chan int, 1) или вынести send в горутину.

Напишите merge[T any](chans ...<-chan T) <-chan T без reflect.Select. Используйте WaitGroup.

Реализуйте Mutex через канал:

type ChMutex chan struct{}
func NewChMutex() ChMutex {
return make(ChMutex, 1)
}
func (m ChMutex) Lock() { m <- struct{}{} }
func (m ChMutex) Unlock() { <-m }

Сравните производительность с sync.Mutex через benchmark. Что быстрее? Почему?

Напишите функцию, которая получает несколько <-chan struct{} и возвращает канал, закрывающийся, когда любой из входных закрыт.

Реализуйте RateLimiter с методом Wait(), который блокирует, если в последнюю секунду было >N вызовов.

func fetchAll(urls []string) []string {
results := make([]string, 0, len(urls))
ch := make(chan string)
for _, url := range urls {
go func(u string) {
ch <- fetch(u)
}(url)
}
for range urls {
results = append(results, <-ch)
}
return results
}

Если один fetch повисит навсегда — что произойдёт? Как исправить (context, timeout)?


  1. Effective Go: Channelshttps://go.dev/doc/effective_go#channels — официальный гид.
  2. Go Channels Under the Hood — Ian Lance Taylor, GopherCon: https://www.youtube.com/watch?v=KBZlN0izeiY
  3. Channels Closing Principle — Tao Wang: https://go101.org/article/channel-closing.html — must read.
  4. Go Memory Modelhttps://go.dev/ref/mem — happens-before для каналов.
  5. runtime/chan.go — исходник: https://github.com/golang/go/blob/master/src/runtime/chan.go — для тех, кто хочет видеть код.
  6. Concurrency Patterns in Go — Sameer Ajmani, Google I/O: https://www.youtube.com/watch?v=f6kdp27TYZs