Перейти к содержимому

Concurrent LeetCode и System-Level задачи (Go)

Зачем знать на Middle 3: Go-разработчика часто проверяют не на классический LC, а на concurrent задачи: “напиши rate limiter”, “thread-safe LRU”, “bounded blocking queue”. Это проверяет реальный навык работы с goroutines, channels, sync. И на собесах часто дают “Design Twitter in-memory” — гибрид system design + concurrent coding.

  1. Print in Order
  2. Print FooBar Alternately
  3. Building H2O
  4. Dining Philosophers
  5. Bounded Blocking Queue
  6. Web Crawler Multithreaded
  7. Thread-safe LRU Cache
  8. Design Hit Counter
  9. Design Twitter (in-memory)
  10. Distributed Cache (sketch)
  11. Tweet Stream
  12. Rate Limiters (4 алгоритма)
  13. In-memory Message Queue
  14. Worker Pool
  15. Pub/Sub System
  16. Common Pitfalls
  17. 15 вопросов
  18. Источники

Три потока вызывают first(), second(), third(). Гарантировать порядок first → second → third.

type Foo struct {
ch1, ch2 chan struct{}
}
func NewFoo() *Foo {
return &Foo{ch1: make(chan struct{}), ch2: make(chan struct{})}
}
func (f *Foo) First(printFirst func()) {
printFirst()
close(f.ch1)
}
func (f *Foo) Second(printSecond func()) {
<-f.ch1
printSecond()
close(f.ch2)
}
func (f *Foo) Third(printThird func()) {
<-f.ch2
printThird()
}

Альтернатива через sync.Cond или sync.WaitGroup — но channels чище.

Pitfalls: не вызвать close дважды (close idempotency).


Поток A печатает “foo” N раз, B — “bar” N раз. Чередовать: foobar foobar.

type FooBar struct {
n int
fooCh, barCh chan struct{}
}
func New(n int) *FooBar {
fb := &FooBar{n: n, fooCh: make(chan struct{}, 1), barCh: make(chan struct{}, 1)}
fb.fooCh <- struct{}{} // foo starts
return fb
}
func (fb *FooBar) Foo(printFoo func()) {
for i := 0; i < fb.n; i++ {
<-fb.fooCh
printFoo()
fb.barCh <- struct{}{}
}
}
func (fb *FooBar) Bar(printBar func()) {
for i := 0; i < fb.n; i++ {
<-fb.barCh
printBar()
fb.fooCh <- struct{}{}
}
}

N потоков hydrogen() и M потоков oxygen(). Собираются группами по 2H+1O. До формирования group потоки блокируются.

type H2O struct {
hSem, oSem chan struct{}
barrier *sync.WaitGroup // на группу
mu sync.Mutex
hCount int
}
func New() *H2O {
h := &H2O{
hSem: make(chan struct{}, 2),
oSem: make(chan struct{}, 1),
}
// initial: 2H + 1O разрешены
h.hSem <- struct{}{}; h.hSem <- struct{}{}
h.oSem <- struct{}{}
h.barrier = &sync.WaitGroup{}
h.barrier.Add(3)
return h
}
func (h *H2O) Hydrogen(release func()) {
<-h.hSem
release()
h.tryRelease()
}
func (h *H2O) Oxygen(release func()) {
<-h.oSem
release()
h.tryRelease()
}
func (h *H2O) tryRelease() {
h.barrier.Done()
h.barrier.Wait()
h.mu.Lock()
defer h.mu.Unlock()
h.barrier = &sync.WaitGroup{}
h.barrier.Add(3)
h.hSem <- struct{}{}; h.hSem <- struct{}{}
h.oSem <- struct{}{}
}

(Упрощённая идея — production вариант сложнее.)


5 философов, 5 вилок. Каждый берёт левую и правую, ест, кладёт. Избегать deadlock.

Чётные философы берут сначала левую, нечётные — сначала правую. Разрывает циклическую зависимость.

type Philosophers struct {
forks [5]sync.Mutex
}
func (p *Philosophers) Eat(id int, pickLeft, pickRight, eat, putLeft, putRight func()) {
left, right := id, (id+1)%5
if id%2 == 0 {
p.forks[left].Lock(); pickLeft()
p.forks[right].Lock(); pickRight()
} else {
p.forks[right].Lock(); pickRight()
p.forks[left].Lock(); pickLeft()
}
eat()
putLeft(); p.forks[left].Unlock()
putRight(); p.forks[right].Unlock()
}

Альтернатива: Chandy/Misra (request based). Или ограничить N-1 философов едящими одновременно.


type BoundedBlockingQueue struct {
ch chan int
}
func New(capacity int) *BoundedBlockingQueue {
return &BoundedBlockingQueue{ch: make(chan int, capacity)}
}
func (q *BoundedBlockingQueue) Enqueue(v int) { q.ch <- v }
func (q *BoundedBlockingQueue) Dequeue() int { return <-q.ch }
func (q *BoundedBlockingQueue) Size() int { return len(q.ch) }

В Go это тривиально — channels уже bounded blocking queue.

type Queue struct {
mu sync.Mutex
notFull, notEmpty *sync.Cond
buf []int
cap int
}
func NewQ(cap int) *Queue {
q := &Queue{cap: cap}
q.notFull = sync.NewCond(&q.mu)
q.notEmpty = sync.NewCond(&q.mu)
return q
}
func (q *Queue) Enqueue(v int) {
q.mu.Lock()
for len(q.buf) == q.cap { q.notFull.Wait() }
q.buf = append(q.buf, v)
q.notEmpty.Signal()
q.mu.Unlock()
}
func (q *Queue) Dequeue() int {
q.mu.Lock()
for len(q.buf) == 0 { q.notEmpty.Wait() }
v := q.buf[0]; q.buf = q.buf[1:]
q.notFull.Signal()
q.mu.Unlock()
return v
}

⚠️ Используй for, не if для проверки condition (spurious wakeup).


Crawl сайт начиная с URL, only same hostname. Concurrent.

type htmlParser interface {
GetUrls(url string) []string
}
func crawl(startUrl string, parser htmlParser) []string {
host := getHost(startUrl)
visited := &sync.Map{}
var wg sync.WaitGroup
var mu sync.Mutex
result := []string{}
var crawler func(url string)
crawler = func(url string) {
defer wg.Done()
if _, loaded := visited.LoadOrStore(url, true); loaded { return }
mu.Lock(); result = append(result, url); mu.Unlock()
for _, next := range parser.GetUrls(url) {
if getHost(next) != host { continue }
wg.Add(1)
go crawler(next)
}
}
wg.Add(1)
go crawler(startUrl)
wg.Wait()
return result
}
func getHost(u string) string {
parsed, _ := url.Parse(u)
return parsed.Host
}

Pitfalls: wg.Add нужно до go, иначе race.


type node struct {
key, val int
prev, next *node
}
type LRUCache struct {
mu sync.Mutex
cap int
m map[int]*node
head, tail *node // sentinels
}
func NewLRU(cap int) *LRUCache {
c := &LRUCache{cap: cap, m: make(map[int]*node)}
c.head = &node{}; c.tail = &node{}
c.head.next = c.tail; c.tail.prev = c.head
return c
}
func (c *LRUCache) Get(key int) (int, bool) {
c.mu.Lock(); defer c.mu.Unlock()
n, ok := c.m[key]
if !ok { return 0, false }
c.remove(n); c.addFront(n)
return n.val, true
}
func (c *LRUCache) Put(key, val int) {
c.mu.Lock(); defer c.mu.Unlock()
if n, ok := c.m[key]; ok {
n.val = val
c.remove(n); c.addFront(n)
return
}
if len(c.m) >= c.cap {
evict := c.tail.prev
c.remove(evict)
delete(c.m, evict.key)
}
n := &node{key: key, val: val}
c.addFront(n)
c.m[key] = n
}
func (c *LRUCache) remove(n *node) {
n.prev.next = n.next
n.next.prev = n.prev
}
func (c *LRUCache) addFront(n *node) {
n.next = c.head.next
n.prev = c.head
c.head.next.prev = n
c.head.next = n
}

Trade-offs: mutex coarse — single lock. Для high contention — sharded LRU (по hash(key) % N).


hit(ts) — событие. getHits(ts) — count в последние 300 sec.

type HitCounter struct {
mu sync.Mutex
hits []int64 // timestamps
}
func (h *HitCounter) Hit(ts int64) {
h.mu.Lock(); defer h.mu.Unlock()
h.hits = append(h.hits, ts)
}
func (h *HitCounter) GetHits(ts int64) int {
h.mu.Lock(); defer h.mu.Unlock()
cutoff := ts - 300
i := 0
for i < len(h.hits) && h.hits[i] <= cutoff { i++ }
h.hits = h.hits[i:]
return len(h.hits)
}
type HitCounter struct {
mu sync.Mutex
buckets [300]int64 // timestamp seen
counts [300]int
}
func (h *HitCounter) Hit(ts int64) {
h.mu.Lock(); defer h.mu.Unlock()
idx := ts % 300
if h.buckets[idx] != ts {
h.buckets[idx] = ts
h.counts[idx] = 0
}
h.counts[idx]++
}
func (h *HitCounter) GetHits(ts int64) int {
h.mu.Lock(); defer h.mu.Unlock()
total := 0
for i := 0; i < 300; i++ {
if h.buckets[i] > ts - 300 { total += h.counts[i] }
}
return total
}

O(1) hit, O(300) get. Fixed memory.


  • postTweet(userId, tweetId)
  • getNewsFeed(userId) — top 10 recent tweets от user + followees.
  • follow(followerId, followeeId)
  • unfollow(...).
type Tweet struct {
id, ts int
}
type Twitter struct {
mu sync.Mutex
timestamp int
tweets map[int][]Tweet
follows map[int]map[int]bool
}
func NewTwitter() *Twitter {
return &Twitter{tweets: map[int][]Tweet{}, follows: map[int]map[int]bool{}}
}
func (t *Twitter) PostTweet(userId, tweetId int) {
t.mu.Lock(); defer t.mu.Unlock()
t.timestamp++
t.tweets[userId] = append(t.tweets[userId], Tweet{tweetId, t.timestamp})
}
func (t *Twitter) GetNewsFeed(userId int) []int {
t.mu.Lock(); defer t.mu.Unlock()
candidates := append([]Tweet{}, t.tweets[userId]...)
for f := range t.follows[userId] {
candidates = append(candidates, t.tweets[f]...)
}
sort.Slice(candidates, func(i, j int) bool { return candidates[i].ts > candidates[j].ts })
if len(candidates) > 10 { candidates = candidates[:10] }
res := make([]int, len(candidates))
for i, c := range candidates { res[i] = c.id }
return res
}
func (t *Twitter) Follow(follower, followee int) {
t.mu.Lock(); defer t.mu.Unlock()
if t.follows[follower] == nil { t.follows[follower] = map[int]bool{} }
t.follows[follower][followee] = true
}
func (t *Twitter) Unfollow(follower, followee int) {
t.mu.Lock(); defer t.mu.Unlock()
delete(t.follows[follower], followee)
}

Оптимизация: use heap для top-10 без full sort, O(K log N).


См. файл 43, section 2. Здесь — Go skeleton client:

type Ring struct {
nodes []uint64
mp map[uint64]string
}
type DCache struct {
ring *Ring
pool map[string]*redis.Client
}
func (c *DCache) Get(ctx context.Context, key string) (string, error) {
node := c.ring.GetNode(key)
return c.pool[node].Get(ctx, key).Result()
}

type Stream struct {
mu sync.RWMutex
subs map[string]chan Tweet
}
func (s *Stream) Subscribe(id string) <-chan Tweet {
s.mu.Lock(); defer s.mu.Unlock()
ch := make(chan Tweet, 100) // buffered
s.subs[id] = ch
return ch
}
func (s *Stream) Unsubscribe(id string) {
s.mu.Lock(); defer s.mu.Unlock()
if ch, ok := s.subs[id]; ok { close(ch); delete(s.subs, id) }
}
func (s *Stream) Publish(t Tweet) {
s.mu.RLock(); defer s.mu.RUnlock()
for _, ch := range s.subs {
select {
case ch <- t:
default:
// slow consumer — drop
}
}
}

type TokenBucket struct {
mu sync.Mutex
capacity int
tokens float64
rate float64 // tokens/sec
last time.Time
}
func (t *TokenBucket) Allow() bool {
t.mu.Lock(); defer t.mu.Unlock()
now := time.Now()
elapsed := now.Sub(t.last).Seconds()
t.tokens = math.Min(float64(t.capacity), t.tokens + elapsed * t.rate)
t.last = now
if t.tokens >= 1 {
t.tokens--
return true
}
return false
}
type LeakyBucket struct {
mu sync.Mutex
capacity int
queue int
rate float64
last time.Time
}
func (l *LeakyBucket) Allow() bool {
l.mu.Lock(); defer l.mu.Unlock()
now := time.Now()
elapsed := now.Sub(l.last).Seconds()
leaked := int(elapsed * l.rate)
l.queue = max(0, l.queue - leaked)
l.last = now
if l.queue < l.capacity {
l.queue++
return true
}
return false
}
type SlidingLog struct {
mu sync.Mutex
window time.Duration
limit int
log []time.Time
}
func (s *SlidingLog) Allow() bool {
s.mu.Lock(); defer s.mu.Unlock()
now := time.Now()
cutoff := now.Add(-s.window)
i := 0
for i < len(s.log) && s.log[i].Before(cutoff) { i++ }
s.log = s.log[i:]
if len(s.log) < s.limit {
s.log = append(s.log, now)
return true
}
return false
}
type SlidingCounter struct {
mu sync.Mutex
window time.Duration
limit int
curr, prev int
currStart time.Time
}
func (s *SlidingCounter) Allow() bool {
s.mu.Lock(); defer s.mu.Unlock()
now := time.Now()
if now.Sub(s.currStart) >= s.window {
if now.Sub(s.currStart) >= 2 * s.window {
s.prev = 0
} else {
s.prev = s.curr
}
s.curr = 0
s.currStart = s.currStart.Add(s.window)
}
elapsed := now.Sub(s.currStart).Seconds() / s.window.Seconds()
estimate := int(float64(s.prev) * (1 - elapsed)) + s.curr
if estimate < s.limit {
s.curr++
return true
}
return false
}
АлгоритмMemoryТочностьBurst
TokenO(1)highyes
LeakyO(1)high (smooth)no
Sliding logO(limit)perfectyes
Sliding counterO(1)approxyes

type MQ struct {
mu sync.RWMutex
topics map[string]*topic
}
type topic struct {
mu sync.Mutex
msgs [][]byte
offset int
subs map[string]int // sub_id -> last_consumed_offset
cond *sync.Cond
}
func (m *MQ) Publish(name string, msg []byte) {
m.mu.Lock()
t, ok := m.topics[name]
if !ok {
t = &topic{subs: make(map[string]int)}
t.cond = sync.NewCond(&t.mu)
m.topics[name] = t
}
m.mu.Unlock()
t.mu.Lock()
t.msgs = append(t.msgs, msg)
t.cond.Broadcast()
t.mu.Unlock()
}
func (m *MQ) Consume(name, subID string) []byte {
m.mu.RLock(); t := m.topics[name]; m.mu.RUnlock()
t.mu.Lock(); defer t.mu.Unlock()
for t.subs[subID] >= len(t.msgs) {
t.cond.Wait()
}
msg := t.msgs[t.subs[subID]]
t.subs[subID]++
return msg
}

type Pool struct {
jobs chan func()
wg sync.WaitGroup
}
func NewPool(workers int) *Pool {
p := &Pool{jobs: make(chan func(), workers*10)}
for i := 0; i < workers; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for job := range p.jobs { job() }
}()
}
return p
}
func (p *Pool) Submit(job func()) { p.jobs <- job }
func (p *Pool) Close() { close(p.jobs); p.wg.Wait() }

С context cancellation:

func (p *Pool) Run(ctx context.Context) {
for i := 0; i < p.size; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case <-ctx.Done(): return
case job, ok := <-p.jobs:
if !ok { return }
job()
}
}
}()
}
}

type Broker struct {
mu sync.RWMutex
topics map[string]map[string]chan interface{}
}
func New() *Broker { return &Broker{topics: map[string]map[string]chan interface{}{}} }
func (b *Broker) Subscribe(topic, id string) <-chan interface{} {
b.mu.Lock(); defer b.mu.Unlock()
if b.topics[topic] == nil { b.topics[topic] = map[string]chan interface{}{} }
ch := make(chan interface{}, 100)
b.topics[topic][id] = ch
return ch
}
func (b *Broker) Unsubscribe(topic, id string) {
b.mu.Lock(); defer b.mu.Unlock()
if ch, ok := b.topics[topic][id]; ok {
close(ch)
delete(b.topics[topic], id)
}
}
func (b *Broker) Publish(topic string, msg interface{}) {
b.mu.RLock(); defer b.mu.RUnlock()
for _, ch := range b.topics[topic] {
select {
case ch <- msg:
default:
// drop on full
}
}
}

⚠️ Forgetting defer wg.Done() в goroutine — Wait never returns.

⚠️ wg.Add(1) после go — race. Делать Add до go.

⚠️ Close на receive side — паника при следующей отправке. Close — sender’s responsibility.

⚠️ Reading from closed channel — возвращает zero value + ok=false. Не паника.

⚠️ Writing to closed channel — panic.

⚠️ sync.Cond.Wait без loop — spurious wakeup. Используй for !condition { cond.Wait() }.

⚠️ Mutex copygo vet ловит. Mutex value = state, копия = новый.

⚠️ map под mutex для всех операций — даже read. Иначе fatal error: concurrent map read and map write.

⚠️ Goroutine leak в select { case <-ctx.Done(): } без всех путей выхода.

⚠️ Buffered channel == bounded queue — не “fire and forget”. Если consumer медленный — заблокируется sender.

⚠️ time.After в цикле — leak, timer не освобождается. Используй time.NewTimer + Reset.

⚠️ Race в LRU.Get — Get модифицирует list (move-to-front), нужен Lock не RLock.

⚠️ RWMutex для частых writers — медленнее Mutex. Mfeлкие write — простой Mutex.


  1. Чем sync.Mutex отличается от sync.RWMutex?
  2. Когда RWMutex хуже обычного Mutex?
  3. Что такое sync.Cond и когда применять?
  4. Почему for !cond { cond.Wait() }, а не if?
  5. Что такое select default branch?
  6. Как реализовать timeout на канале?
  7. time.After vs time.NewTimer — отличия и leaks?
  8. Что произойдёт при close(ch) дважды?
  9. Чем sync.Once отличается от sync.Mutex-based check?
  10. Как реализовать bounded blocking queue в Go?
  11. Чем worker pool отличается от errgroup?
  12. Зачем context.Context в concurrent code?
  13. Как обнаружить goroutine leak?
  14. Что такое runtime.NumGoroutine и зачем мониторить?
  15. sync.Map vs map+RWMutex — когда что?

  1. “Concurrency in Go” — Katherine Cox-Buday
  2. Go memory model: https://go.dev/ref/mem
  3. Go sync docs
  4. Effective Go
  5. Dave Cheney blog
  6. Rob Pike “Concurrency is not parallelism”
  7. uber-go style guide
  8. “100 Go Mistakes” — Teiva Harsanyi
  9. LeetCode Concurrency tag
  10. Mikhail Pinkhasik (rsc) on go memory model
  11. golang-nuts archive
  12. “Designing Data-Intensive Applications” — concurrency chapter
  13. The Little Book of Semaphores — Downey
  14. Go runtime scheduler talks
  15. github.com/uber-go/atomic