Concurrent LeetCode и System-Level задачи (Go)
Зачем знать на Middle 3: Go-разработчика часто проверяют не на классический LC, а на concurrent задачи: “напиши rate limiter”, “thread-safe LRU”, “bounded blocking queue”. Это проверяет реальный навык работы с goroutines, channels, sync. И на собесах часто дают “Design Twitter in-memory” — гибрид system design + concurrent coding.
Содержание
Заголовок раздела «Содержание»- Print in Order
- Print FooBar Alternately
- Building H2O
- Dining Philosophers
- Bounded Blocking Queue
- Web Crawler Multithreaded
- Thread-safe LRU Cache
- Design Hit Counter
- Design Twitter (in-memory)
- Distributed Cache (sketch)
- Tweet Stream
- Rate Limiters (4 алгоритма)
- In-memory Message Queue
- Worker Pool
- Pub/Sub System
- Common Pitfalls
- 15 вопросов
- Источники
1. Print in Order (LC 1114)
Заголовок раздела «1. Print in Order (LC 1114)»Три потока вызывают first(), second(), third(). Гарантировать порядок first → second → third.
Решение (channels)
Заголовок раздела «Решение (channels)»type Foo struct { ch1, ch2 chan struct{}}
func NewFoo() *Foo { return &Foo{ch1: make(chan struct{}), ch2: make(chan struct{})}}
func (f *Foo) First(printFirst func()) { printFirst() close(f.ch1)}
func (f *Foo) Second(printSecond func()) { <-f.ch1 printSecond() close(f.ch2)}
func (f *Foo) Third(printThird func()) { <-f.ch2 printThird()}Альтернатива через sync.Cond или sync.WaitGroup — но channels чище.
Pitfalls: не вызвать close дважды (close idempotency).
2. Print FooBar Alternately (LC 1115)
Заголовок раздела «2. Print FooBar Alternately (LC 1115)»Поток A печатает “foo” N раз, B — “bar” N раз. Чередовать: foobar foobar.
Решение (ping-pong channels)
Заголовок раздела «Решение (ping-pong channels)»type FooBar struct { n int fooCh, barCh chan struct{}}
func New(n int) *FooBar { fb := &FooBar{n: n, fooCh: make(chan struct{}, 1), barCh: make(chan struct{}, 1)} fb.fooCh <- struct{}{} // foo starts return fb}
func (fb *FooBar) Foo(printFoo func()) { for i := 0; i < fb.n; i++ { <-fb.fooCh printFoo() fb.barCh <- struct{}{} }}
func (fb *FooBar) Bar(printBar func()) { for i := 0; i < fb.n; i++ { <-fb.barCh printBar() fb.fooCh <- struct{}{} }}3. Building H2O (LC 1117)
Заголовок раздела «3. Building H2O (LC 1117)»N потоков hydrogen() и M потоков oxygen(). Собираются группами по 2H+1O. До формирования group потоки блокируются.
Решение (semaphore)
Заголовок раздела «Решение (semaphore)»type H2O struct { hSem, oSem chan struct{} barrier *sync.WaitGroup // на группу mu sync.Mutex hCount int}
func New() *H2O { h := &H2O{ hSem: make(chan struct{}, 2), oSem: make(chan struct{}, 1), } // initial: 2H + 1O разрешены h.hSem <- struct{}{}; h.hSem <- struct{}{} h.oSem <- struct{}{} h.barrier = &sync.WaitGroup{} h.barrier.Add(3) return h}
func (h *H2O) Hydrogen(release func()) { <-h.hSem release() h.tryRelease()}
func (h *H2O) Oxygen(release func()) { <-h.oSem release() h.tryRelease()}
func (h *H2O) tryRelease() { h.barrier.Done() h.barrier.Wait() h.mu.Lock() defer h.mu.Unlock() h.barrier = &sync.WaitGroup{} h.barrier.Add(3) h.hSem <- struct{}{}; h.hSem <- struct{}{} h.oSem <- struct{}{}}(Упрощённая идея — production вариант сложнее.)
4. Dining Philosophers (LC 1226)
Заголовок раздела «4. Dining Philosophers (LC 1226)»5 философов, 5 вилок. Каждый берёт левую и правую, ест, кладёт. Избегать deadlock.
Решение (asymmetric)
Заголовок раздела «Решение (asymmetric)»Чётные философы берут сначала левую, нечётные — сначала правую. Разрывает циклическую зависимость.
type Philosophers struct { forks [5]sync.Mutex}
func (p *Philosophers) Eat(id int, pickLeft, pickRight, eat, putLeft, putRight func()) { left, right := id, (id+1)%5 if id%2 == 0 { p.forks[left].Lock(); pickLeft() p.forks[right].Lock(); pickRight() } else { p.forks[right].Lock(); pickRight() p.forks[left].Lock(); pickLeft() } eat() putLeft(); p.forks[left].Unlock() putRight(); p.forks[right].Unlock()}Альтернатива: Chandy/Misra (request based). Или ограничить N-1 философов едящими одновременно.
5. Bounded Blocking Queue (LC 1188)
Заголовок раздела «5. Bounded Blocking Queue (LC 1188)»Решение через channel
Заголовок раздела «Решение через channel»type BoundedBlockingQueue struct { ch chan int}
func New(capacity int) *BoundedBlockingQueue { return &BoundedBlockingQueue{ch: make(chan int, capacity)}}
func (q *BoundedBlockingQueue) Enqueue(v int) { q.ch <- v }func (q *BoundedBlockingQueue) Dequeue() int { return <-q.ch }func (q *BoundedBlockingQueue) Size() int { return len(q.ch) }В Go это тривиально — channels уже bounded blocking queue.
Без channel (Cond)
Заголовок раздела «Без channel (Cond)»type Queue struct { mu sync.Mutex notFull, notEmpty *sync.Cond buf []int cap int}
func NewQ(cap int) *Queue { q := &Queue{cap: cap} q.notFull = sync.NewCond(&q.mu) q.notEmpty = sync.NewCond(&q.mu) return q}
func (q *Queue) Enqueue(v int) { q.mu.Lock() for len(q.buf) == q.cap { q.notFull.Wait() } q.buf = append(q.buf, v) q.notEmpty.Signal() q.mu.Unlock()}
func (q *Queue) Dequeue() int { q.mu.Lock() for len(q.buf) == 0 { q.notEmpty.Wait() } v := q.buf[0]; q.buf = q.buf[1:] q.notFull.Signal() q.mu.Unlock() return v}⚠️ Используй for, не if для проверки condition (spurious wakeup).
6. Web Crawler Multithreaded (LC 1242)
Заголовок раздела «6. Web Crawler Multithreaded (LC 1242)»Crawl сайт начиная с URL, only same hostname. Concurrent.
Решение (worker pool + visited map)
Заголовок раздела «Решение (worker pool + visited map)»type htmlParser interface { GetUrls(url string) []string}
func crawl(startUrl string, parser htmlParser) []string { host := getHost(startUrl) visited := &sync.Map{} var wg sync.WaitGroup var mu sync.Mutex result := []string{}
var crawler func(url string) crawler = func(url string) { defer wg.Done() if _, loaded := visited.LoadOrStore(url, true); loaded { return } mu.Lock(); result = append(result, url); mu.Unlock() for _, next := range parser.GetUrls(url) { if getHost(next) != host { continue } wg.Add(1) go crawler(next) } } wg.Add(1) go crawler(startUrl) wg.Wait() return result}
func getHost(u string) string { parsed, _ := url.Parse(u) return parsed.Host}Pitfalls: wg.Add нужно до go, иначе race.
7. Thread-safe LRU Cache (LC 146 + concurrent)
Заголовок раздела «7. Thread-safe LRU Cache (LC 146 + concurrent)»Структура: HashMap + Doubly Linked List
Заголовок раздела «Структура: HashMap + Doubly Linked List»type node struct { key, val int prev, next *node}
type LRUCache struct { mu sync.Mutex cap int m map[int]*node head, tail *node // sentinels}
func NewLRU(cap int) *LRUCache { c := &LRUCache{cap: cap, m: make(map[int]*node)} c.head = &node{}; c.tail = &node{} c.head.next = c.tail; c.tail.prev = c.head return c}
func (c *LRUCache) Get(key int) (int, bool) { c.mu.Lock(); defer c.mu.Unlock() n, ok := c.m[key] if !ok { return 0, false } c.remove(n); c.addFront(n) return n.val, true}
func (c *LRUCache) Put(key, val int) { c.mu.Lock(); defer c.mu.Unlock() if n, ok := c.m[key]; ok { n.val = val c.remove(n); c.addFront(n) return } if len(c.m) >= c.cap { evict := c.tail.prev c.remove(evict) delete(c.m, evict.key) } n := &node{key: key, val: val} c.addFront(n) c.m[key] = n}
func (c *LRUCache) remove(n *node) { n.prev.next = n.next n.next.prev = n.prev}
func (c *LRUCache) addFront(n *node) { n.next = c.head.next n.prev = c.head c.head.next.prev = n c.head.next = n}Trade-offs: mutex coarse — single lock. Для high contention — sharded LRU (по hash(key) % N).
8. Design Hit Counter (LC 362)
Заголовок раздела «8. Design Hit Counter (LC 362)»hit(ts) — событие. getHits(ts) — count в последние 300 sec.
Решение (deque)
Заголовок раздела «Решение (deque)»type HitCounter struct { mu sync.Mutex hits []int64 // timestamps}
func (h *HitCounter) Hit(ts int64) { h.mu.Lock(); defer h.mu.Unlock() h.hits = append(h.hits, ts)}
func (h *HitCounter) GetHits(ts int64) int { h.mu.Lock(); defer h.mu.Unlock() cutoff := ts - 300 i := 0 for i < len(h.hits) && h.hits[i] <= cutoff { i++ } h.hits = h.hits[i:] return len(h.hits)}Оптимизация: 300-bucket array
Заголовок раздела «Оптимизация: 300-bucket array»type HitCounter struct { mu sync.Mutex buckets [300]int64 // timestamp seen counts [300]int}
func (h *HitCounter) Hit(ts int64) { h.mu.Lock(); defer h.mu.Unlock() idx := ts % 300 if h.buckets[idx] != ts { h.buckets[idx] = ts h.counts[idx] = 0 } h.counts[idx]++}
func (h *HitCounter) GetHits(ts int64) int { h.mu.Lock(); defer h.mu.Unlock() total := 0 for i := 0; i < 300; i++ { if h.buckets[i] > ts - 300 { total += h.counts[i] } } return total}O(1) hit, O(300) get. Fixed memory.
9. Design Twitter (LC 355)
Заголовок раздела «9. Design Twitter (LC 355)»postTweet(userId, tweetId)getNewsFeed(userId)— top 10 recent tweets от user + followees.follow(followerId, followeeId)unfollow(...).
Решение (in-memory)
Заголовок раздела «Решение (in-memory)»type Tweet struct { id, ts int}
type Twitter struct { mu sync.Mutex timestamp int tweets map[int][]Tweet follows map[int]map[int]bool}
func NewTwitter() *Twitter { return &Twitter{tweets: map[int][]Tweet{}, follows: map[int]map[int]bool{}}}
func (t *Twitter) PostTweet(userId, tweetId int) { t.mu.Lock(); defer t.mu.Unlock() t.timestamp++ t.tweets[userId] = append(t.tweets[userId], Tweet{tweetId, t.timestamp})}
func (t *Twitter) GetNewsFeed(userId int) []int { t.mu.Lock(); defer t.mu.Unlock() candidates := append([]Tweet{}, t.tweets[userId]...) for f := range t.follows[userId] { candidates = append(candidates, t.tweets[f]...) } sort.Slice(candidates, func(i, j int) bool { return candidates[i].ts > candidates[j].ts }) if len(candidates) > 10 { candidates = candidates[:10] } res := make([]int, len(candidates)) for i, c := range candidates { res[i] = c.id } return res}
func (t *Twitter) Follow(follower, followee int) { t.mu.Lock(); defer t.mu.Unlock() if t.follows[follower] == nil { t.follows[follower] = map[int]bool{} } t.follows[follower][followee] = true}
func (t *Twitter) Unfollow(follower, followee int) { t.mu.Lock(); defer t.mu.Unlock() delete(t.follows[follower], followee)}Оптимизация: use heap для top-10 без full sort, O(K log N).
10. Distributed Cache (sketch)
Заголовок раздела «10. Distributed Cache (sketch)»См. файл 43, section 2. Здесь — Go skeleton client:
type Ring struct { nodes []uint64 mp map[uint64]string}
type DCache struct { ring *Ring pool map[string]*redis.Client}
func (c *DCache) Get(ctx context.Context, key string) (string, error) { node := c.ring.GetNode(key) return c.pool[node].Get(ctx, key).Result()}11. Tweet Stream (subscribe pattern)
Заголовок раздела «11. Tweet Stream (subscribe pattern)»type Stream struct { mu sync.RWMutex subs map[string]chan Tweet}
func (s *Stream) Subscribe(id string) <-chan Tweet { s.mu.Lock(); defer s.mu.Unlock() ch := make(chan Tweet, 100) // buffered s.subs[id] = ch return ch}
func (s *Stream) Unsubscribe(id string) { s.mu.Lock(); defer s.mu.Unlock() if ch, ok := s.subs[id]; ok { close(ch); delete(s.subs, id) }}
func (s *Stream) Publish(t Tweet) { s.mu.RLock(); defer s.mu.RUnlock() for _, ch := range s.subs { select { case ch <- t: default: // slow consumer — drop } }}12. Rate Limiter implementations
Заголовок раздела «12. Rate Limiter implementations»Token bucket
Заголовок раздела «Token bucket»type TokenBucket struct { mu sync.Mutex capacity int tokens float64 rate float64 // tokens/sec last time.Time}
func (t *TokenBucket) Allow() bool { t.mu.Lock(); defer t.mu.Unlock() now := time.Now() elapsed := now.Sub(t.last).Seconds() t.tokens = math.Min(float64(t.capacity), t.tokens + elapsed * t.rate) t.last = now if t.tokens >= 1 { t.tokens-- return true } return false}Leaky bucket
Заголовок раздела «Leaky bucket»type LeakyBucket struct { mu sync.Mutex capacity int queue int rate float64 last time.Time}
func (l *LeakyBucket) Allow() bool { l.mu.Lock(); defer l.mu.Unlock() now := time.Now() elapsed := now.Sub(l.last).Seconds() leaked := int(elapsed * l.rate) l.queue = max(0, l.queue - leaked) l.last = now if l.queue < l.capacity { l.queue++ return true } return false}Sliding window log
Заголовок раздела «Sliding window log»type SlidingLog struct { mu sync.Mutex window time.Duration limit int log []time.Time}
func (s *SlidingLog) Allow() bool { s.mu.Lock(); defer s.mu.Unlock() now := time.Now() cutoff := now.Add(-s.window) i := 0 for i < len(s.log) && s.log[i].Before(cutoff) { i++ } s.log = s.log[i:] if len(s.log) < s.limit { s.log = append(s.log, now) return true } return false}Sliding window counter
Заголовок раздела «Sliding window counter»type SlidingCounter struct { mu sync.Mutex window time.Duration limit int curr, prev int currStart time.Time}
func (s *SlidingCounter) Allow() bool { s.mu.Lock(); defer s.mu.Unlock() now := time.Now() if now.Sub(s.currStart) >= s.window { if now.Sub(s.currStart) >= 2 * s.window { s.prev = 0 } else { s.prev = s.curr } s.curr = 0 s.currStart = s.currStart.Add(s.window) } elapsed := now.Sub(s.currStart).Seconds() / s.window.Seconds() estimate := int(float64(s.prev) * (1 - elapsed)) + s.curr if estimate < s.limit { s.curr++ return true } return false}Trade-offs
Заголовок раздела «Trade-offs»| Алгоритм | Memory | Точность | Burst |
|---|---|---|---|
| Token | O(1) | high | yes |
| Leaky | O(1) | high (smooth) | no |
| Sliding log | O(limit) | perfect | yes |
| Sliding counter | O(1) | approx | yes |
13. In-memory Message Queue
Заголовок раздела «13. In-memory Message Queue»type MQ struct { mu sync.RWMutex topics map[string]*topic}
type topic struct { mu sync.Mutex msgs [][]byte offset int subs map[string]int // sub_id -> last_consumed_offset cond *sync.Cond}
func (m *MQ) Publish(name string, msg []byte) { m.mu.Lock() t, ok := m.topics[name] if !ok { t = &topic{subs: make(map[string]int)} t.cond = sync.NewCond(&t.mu) m.topics[name] = t } m.mu.Unlock() t.mu.Lock() t.msgs = append(t.msgs, msg) t.cond.Broadcast() t.mu.Unlock()}
func (m *MQ) Consume(name, subID string) []byte { m.mu.RLock(); t := m.topics[name]; m.mu.RUnlock() t.mu.Lock(); defer t.mu.Unlock() for t.subs[subID] >= len(t.msgs) { t.cond.Wait() } msg := t.msgs[t.subs[subID]] t.subs[subID]++ return msg}14. Worker Pool
Заголовок раздела «14. Worker Pool»type Pool struct { jobs chan func() wg sync.WaitGroup}
func NewPool(workers int) *Pool { p := &Pool{jobs: make(chan func(), workers*10)} for i := 0; i < workers; i++ { p.wg.Add(1) go func() { defer p.wg.Done() for job := range p.jobs { job() } }() } return p}
func (p *Pool) Submit(job func()) { p.jobs <- job }func (p *Pool) Close() { close(p.jobs); p.wg.Wait() }С context cancellation:
func (p *Pool) Run(ctx context.Context) { for i := 0; i < p.size; i++ { p.wg.Add(1) go func() { defer p.wg.Done() for { select { case <-ctx.Done(): return case job, ok := <-p.jobs: if !ok { return } job() } } }() }}15. Pub/Sub System
Заголовок раздела «15. Pub/Sub System»type Broker struct { mu sync.RWMutex topics map[string]map[string]chan interface{}}
func New() *Broker { return &Broker{topics: map[string]map[string]chan interface{}{}} }
func (b *Broker) Subscribe(topic, id string) <-chan interface{} { b.mu.Lock(); defer b.mu.Unlock() if b.topics[topic] == nil { b.topics[topic] = map[string]chan interface{}{} } ch := make(chan interface{}, 100) b.topics[topic][id] = ch return ch}
func (b *Broker) Unsubscribe(topic, id string) { b.mu.Lock(); defer b.mu.Unlock() if ch, ok := b.topics[topic][id]; ok { close(ch) delete(b.topics[topic], id) }}
func (b *Broker) Publish(topic string, msg interface{}) { b.mu.RLock(); defer b.mu.RUnlock() for _, ch := range b.topics[topic] { select { case ch <- msg: default: // drop on full } }}16. Common Pitfalls
Заголовок раздела «16. Common Pitfalls»⚠️ Forgetting defer wg.Done() в goroutine — Wait never returns.
⚠️ wg.Add(1) после go — race. Делать Add до go.
⚠️ Close на receive side — паника при следующей отправке. Close — sender’s responsibility.
⚠️ Reading from closed channel — возвращает zero value + ok=false. Не паника.
⚠️ Writing to closed channel — panic.
⚠️ sync.Cond.Wait без loop — spurious wakeup. Используй for !condition { cond.Wait() }.
⚠️ Mutex copy — go vet ловит. Mutex value = state, копия = новый.
⚠️ map под mutex для всех операций — даже read. Иначе fatal error: concurrent map read and map write.
⚠️ Goroutine leak в select { case <-ctx.Done(): } без всех путей выхода.
⚠️ Buffered channel == bounded queue — не “fire and forget”. Если consumer медленный — заблокируется sender.
⚠️ time.After в цикле — leak, timer не освобождается. Используй time.NewTimer + Reset.
⚠️ Race в LRU.Get — Get модифицирует list (move-to-front), нужен Lock не RLock.
⚠️ RWMutex для частых writers — медленнее Mutex. Mfeлкие write — простой Mutex.
17. 15 вопросов
Заголовок раздела «17. 15 вопросов»- Чем
sync.Mutexотличается отsync.RWMutex? - Когда
RWMutexхуже обычногоMutex? - Что такое
sync.Condи когда применять? - Почему
for !cond { cond.Wait() }, а неif? - Что такое
selectdefault branch? - Как реализовать timeout на канале?
time.Aftervstime.NewTimer— отличия и leaks?- Что произойдёт при
close(ch)дважды? - Чем
sync.Onceотличается отsync.Mutex-based check? - Как реализовать bounded blocking queue в Go?
- Чем
worker poolотличается отerrgroup? - Зачем
context.Contextв concurrent code? - Как обнаружить goroutine leak?
- Что такое
runtime.NumGoroutineи зачем мониторить? sync.Mapvsmap+RWMutex— когда что?
18. Источники
Заголовок раздела «18. Источники»- “Concurrency in Go” — Katherine Cox-Buday
- Go memory model: https://go.dev/ref/mem
- Go
syncdocs - Effective Go
- Dave Cheney blog
- Rob Pike “Concurrency is not parallelism”
- uber-go style guide
- “100 Go Mistakes” — Teiva Harsanyi
- LeetCode Concurrency tag
- Mikhail Pinkhasik (rsc) on go memory model
- golang-nuts archive
- “Designing Data-Intensive Applications” — concurrency chapter
- The Little Book of Semaphores — Downey
- Go runtime scheduler talks
- github.com/uber-go/atomic