Конкурентность - одна из ключевых причин популярности Go. В отличие от большинства языков, где конкурентность добавлена через библиотеки или фреймворки, в Go она встроена в сам язык. Горутины, каналы и оператор select - примитивы первого класса, которые делают написание конкурентного кода естественным. Эта глава охватывает всё от базовых горутин до продвинутых паттернов, применяемых в production-системах.
Что вы изучите в этой главе
Разницу между конкурентностью и параллелизмом
Горутины и внутреннее устройство планировщика Go (GMP модель)
Роб Пайк в своём знаменитом докладе “Concurrency is not Parallelism” чётко разделил эти понятия:
Определения
Конкурентность (Concurrency) — это структура программы. Способ организации кода, при котором несколько задач могут находиться в процессе выполнения одновременно, но не обязательно исполняются параллельно.
Параллелизм (Parallelism) — это исполнение. Физическое одновременное выполнение нескольких операций на разных ядрах процессора.
Аналогия Роба Пайка: представьте одного сусликов (gopher), который носит книги из одной кучи в другую. Это последовательное выполнение. Теперь представьте двух сусликов с двумя тележками - они работают параллельно. Но можно организовать работу и иначе: один суслик загружает книги в тележку, другой разгружает. Это конкурентная структура - задачи разделены, но могут выполняться и одним процессором.
// Конкурентность: две задачи СТРУКТУРИРОВАНЫ для независимого выполнения// Но на одном ядре они будут чередоваться, а не выполняться параллельноpackage mainimport ( "fmt" "runtime")func main() { // Ограничиваем количество потоков ОС одним — параллелизма нет, // но конкурентность остаётся: горутины по-прежнему чередуются runtime.GOMAXPROCS(1) go func() { for i := 0; i < 5; i++ { fmt.Println("Горутина A:", i) } }() go func() { for i := 0; i < 5; i++ { fmt.Println("Горутина B:", i) } }() // Без ожидания main завершится раньше горутин // (пока используем Sleep, позже узнаем о WaitGroup) select {}}
Go намеренно делает конкурентность дешёвой. Создание горутины стоит ~2 КБ памяти на стек (который растёт по мере необходимости), тогда как создание потока ОС требует ~1 МБ стека и системного вызова. Это позволяет запускать сотни тысяч горутин в одном процессе.
Подход Go к конкурентности
“Don’t communicate by sharing memory; share memory by communicating.” — Go Proverb
Вместо блокировок и разделяемой памяти (как в C++ или Java), Go предлагает передавать данные между горутинами через каналы. Это делает конкурентный код безопаснее и проще для понимания.
// GOMAXPROCS управляет количеством потоков ОС,// на которых исполняются горутиныpackage mainimport ( "fmt" "runtime")func main() { // По умолчанию равен количеству логических ядер CPU fmt.Println("Количество ядер:", runtime.NumCPU()) fmt.Println("GOMAXPROCS:", runtime.GOMAXPROCS(0)) // 0 = запросить текущее значение // Установить вручную (обычно не нужно) runtime.GOMAXPROCS(4) // Количество активных горутин fmt.Println("Горутин:", runtime.NumGoroutine())}
🏠 Домашнее задание
Напишите программу, которая запускает 3 горутины, каждая печатает числа от 1 до 10 с указанием своего номера. Запустите с GOMAXPROCS(1) и GOMAXPROCS(4) — сравните порядок вывода.
Используя runtime.NumGoroutine(), покажите, как количество горутин меняется при их запуске и завершении.
Прочитайте доклад Rob Pike “Concurrency is not Parallelism” и напишите своими словами 3 ключевых вывода.
2. Горутины
Горутина — это легковесный поток выполнения, управляемый runtime Go, а не операционной системой. Для запуска горутины используется ключевое слово go перед вызовом функции.
package mainimport ( "fmt" "time")// greet — обычная функция, которую мы запустим как горутинуfunc greet(name string) { for i := 0; i < 3; i++ { fmt.Printf("Привет, %s! (итерация %d)\n", name, i) time.Sleep(100 * time.Millisecond) }}func main() { // Запуск горутин — ключевое слово go перед вызовом функции go greet("Алиса") // горутина 1 go greet("Боб") // горутина 2 // Анонимная горутина go func() { fmt.Println("Анонимная горутина выполняется") }() // ВАЖНО: main() не ждёт завершения горутин! // Если main завершится, все горутины будут убиты time.Sleep(500 * time.Millisecond) fmt.Println("main завершается")}
main не ждёт горутины
Когда функция main() завершается, программа завершается целиком, убивая все запущенные горутины. Позже мы рассмотрим sync.WaitGroup и каналы для корректного ожидания.
Сколько горутин можно запустить?
Горутины невероятно дешёвые. Начальный стек горутины — всего ~2 КБ (он автоматически растёт до максимума, по умолчанию 1 ГБ). Сравните: поток ОС обычно получает 1-8 МБ стека.
package mainimport ( "fmt" "runtime" "sync")func main() { const numGoroutines = 100_000 var wg sync.WaitGroup wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go func(id int) { defer wg.Done() // Каждая горутина делает минимальную работу _ = id * id }(i) } wg.Wait() // Проверяем пиковое потребление памяти var m runtime.MemStats runtime.ReadMemStats(&m) fmt.Printf("Запущено горутин: %d\n", numGoroutines) fmt.Printf("Потребление памяти: %.2f МБ\n", float64(m.TotalAlloc)/1024/1024)}
Характеристики горутин
Характеристика
Горутина
Поток ОС
Начальный стек
~2 КБ
~1-8 МБ
Создание
~0.3 мкс
~10-30 мкс
Переключение контекста
~0.2 мкс
~1-10 мкс
Управление
Go runtime
ОС
Количество
сотни тысяч
тысячи
🏠 Домашнее задание
Запустите 1 миллион горутин и измерьте потребление памяти с помощью runtime.MemStats.
Напишите функцию, которая запускает N горутин, каждая из которых выполняет вычисление (например, сумму чисел до 1000). Измерьте время выполнения для N = 1, 100, 10000, 100000.
Объясните, почему этот код не выводит ничего, и исправьте его:
func main() { go fmt.Println("Привет из горутины!")}
3. GMP модель планировщика
Планировщик Go — один из ключевых компонентов runtime. Он управляет распределением горутин по потокам ОС, используя модель, известную как GMP.
Компоненты GMP
G (Goroutine) — горутина. Содержит стек, указатель инструкции, ссылку на канал, по которому заблокирована, и другие метаданные.
M (Machine) — поток ОС. Реальный поток, который исполняет код. M всегда привязан к одному P.
P (Processor) — логический процессор. Контекст выполнения, содержащий локальную очередь горутин. Количество P = GOMAXPROCS.
Алгоритм выбора следующей горутины для M:
1. Проверить локальную очередь P (Local Run Queue)
└── Есть G? → Взять и выполнить
2. Проверить глобальную очередь (Global Run Queue)
└── Есть G? → Забрать пачку в локальную очередь
3. Проверить netpoller (готовые I/O операции)
└── Есть готовые G? → Взять
4. Work Stealing: украсть половину G из очереди другого P
└── Есть G у соседа? → Забрать половину
5. Если ничего нет → M паркуется (засыпает)
Work Stealing
Когда локальная очередь P пуста, планировщик крадёт половину горутин из очереди другого P. Это обеспечивает балансировку нагрузки:
До work stealing:
P0: [G1, G2, G3, G4, G5, G6] P1: []
После work stealing:
P0: [G1, G2, G3] P1: [G4, G5, G6]
Вытесняющее планирование (Go 1.14+)
До Go 1.14 горутина, выполняющая tight loop без вызовов функций, могла монополизировать поток:
// До Go 1.14 этот код блокировал M навсегда// С Go 1.14 планировщик использует сигналы ОС (SIGURG)// для вытеснения таких горутинgo func() { for { // tight loop без вызовов функций // до 1.14: никогда не отдаст управление // с 1.14: будет вытеснена через ~10мс }}()
Когда горутина отдаёт управление планировщику
Операции с каналами — отправка/получение из канала
Системные вызовы — файловый I/O, сетевые операции
Вызовы функций — при входе в функцию проверяется, нужен ли рост стека (stack check)
runtime.Gosched() — явная передача управления
Блокировка на mutex — sync.Mutex, sync.RWMutex
time.Sleep — горутина паркуется до срабатывания таймера
Вытесняющий сигнал (Go 1.14+) — SIGURG каждые ~10мс для tight loops
Системные вызовы и M
Когда горутина выполняет блокирующий системный вызов (например, файловый I/O), поток M блокируется вместе с ней. Планировщик отсоединяет P от заблокированного M и привязывает к свободному (или создаёт новый M):
До syscall: Во время syscall: После syscall:
P ─── M P ─── M_new P ─── M
│ │ │
G (работает) G' (работает) G (продолжает)
M_old ← заблокирован
│
G (ждёт syscall)
🏠 Домашнее задание
Используя runtime.GOMAXPROCS(), runtime.NumGoroutine(), runtime.NumCPU() напишите программу, которая выводит информацию о текущем состоянии планировщика.
Запустите программу с GODEBUG=schedtrace=1000 и проанализируйте вывод планировщика.
Напишите tight loop горутину и убедитесь, что она вытесняется (Go 1.14+), добавив runtime.Gosched() для версий до 1.14.
4. Каналы
Каналы — основной механизм коммуникации между горутинами в Go. Канал — это типизированный трубопровод, через который можно отправлять и получать значения.
Небуферизованные каналы
Небуферизованный канал — это канал без буфера. Отправка блокирует горутину, пока другая горутина не прочитает значение, и наоборот. Это обеспечивает синхронный обмен данными.
package mainimport "fmt"func main() { // Создание небуферизованного канала ch := make(chan string) // Горутина-отправитель go func() { fmt.Println("Отправитель: готовлю данные...") ch <- "Привет из горутины!" // блокируется до получения fmt.Println("Отправитель: данные отправлены") }() // Получатель в main (блокируется до отправки) msg := <-ch fmt.Println("Получатель:", msg)}
Визуализация небуферизованного канала:
Горутина A Канал Горутина B
│ │ │
├─── ch <- value ──────► │ (A блокируется) │
│ │ │
│ │ ◄────── val := <-ch ───┤
│ │ (B получает) │
├─── (A разблокирована) ◄──┤ │
│ │ │
Буферизованные каналы
Буферизованный канал имеет внутренний буфер. Отправка блокируется только когда буфер полон. Получение блокируется только когда буфер пуст.
package mainimport "fmt"func main() { // Буферизованный канал на 3 элемента ch := make(chan int, 3) // Можно отправить 3 значения без блокировки ch <- 10 // буфер: [10] ch <- 20 // буфер: [10, 20] ch <- 30 // буфер: [10, 20, 30] // ch <- 40 // ЗАБЛОКИРУЕТСЯ! Буфер полон // Чтение из буфера — FIFO порядок fmt.Println(<-ch) // 10, буфер: [20, 30] fmt.Println(<-ch) // 20, буфер: [30] fmt.Println(<-ch) // 30, буфер: [] // <-ch // ЗАБЛОКИРУЕТСЯ! Буфер пуст // Длина и ёмкость канала ch <- 42 fmt.Println("Длина:", len(ch)) // 1 — текущее количество элементов fmt.Println("Ёмкость:", cap(ch)) // 3 — размер буфера}
Когда использовать буферизованные каналы
Небуферизованные — когда нужна синхронизация (гарантия, что отправитель дождётся получателя)
Буферизованные — когда отправитель не должен ждать получателя (асинхронный режим), или для ограничения конкурентности (семафор)
Направленные каналы
В сигнатурах функций можно указывать направление канала — только для отправки или только для получения:
package mainimport "fmt"// producer принимает канал только для отправкиfunc producer(ch chan<- int) { for i := 0; i < 5; i++ { ch <- i * i } close(ch) // закрываем канал, когда данные закончились}// consumer принимает канал только для чтенияfunc consumer(ch <-chan int) { for val := range ch { // range читает до закрытия канала fmt.Println("Получено:", val) }}func main() { ch := make(chan int) // двунаправленный канал // Go автоматически преобразует двунаправленный канал // в однонаправленный при передаче в функцию go producer(ch) // chan int → chan<- int consumer(ch) // chan int → <-chan int}
Закрытие каналов
package mainimport "fmt"func main() { ch := make(chan int, 5) // Отправляем данные ch <- 1 ch <- 2 ch <- 3 // Закрываем канал — больше отправлять нельзя close(ch) // ch <- 4 // PANIC: send on closed channel // Чтение из закрытого канала возвращает zero value fmt.Println(<-ch) // 1 (из буфера) fmt.Println(<-ch) // 2 (из буфера) fmt.Println(<-ch) // 3 (из буфера) fmt.Println(<-ch) // 0 (канал закрыт, буфер пуст) fmt.Println(<-ch) // 0 (повторное чтение — снова zero value) // Comma-ok идиома — проверяем, открыт ли канал ch2 := make(chan string, 1) ch2 <- "данные" close(ch2) val, ok := <-ch2 fmt.Println(val, ok) // "данные" true val, ok = <-ch2 fmt.Println(val, ok) // "" false — канал закрыт и пуст}
Правила закрытия каналов
Закрывать канал должен только отправитель, никогда получатель
Закрытие уже закрытого канала вызывает panic
Отправка в закрытый канал вызывает panic
Чтение из закрытого канала возвращает zero value (используйте comma-ok для проверки)
Range по каналу
package mainimport "fmt"func fibonacci(n int, ch chan<- int) { a, b := 0, 1 for i := 0; i < n; i++ { ch <- a a, b = b, a+b } close(ch) // ОБЯЗАТЕЛЬНО закрыть, иначе range заблокируется навсегда}func main() { ch := make(chan int) go fibonacci(10, ch) // range автоматически читает до закрытия канала for num := range ch { fmt.Print(num, " ") } // Вывод: 0 1 1 2 3 5 8 13 21 34}
nil канал
Чтение или запись в nil канал блокирует горутину навсегда. Это полезно в select для отключения ветвей:
package mainimport "fmt"func main() { var ch chan int // nil канал // Это заблокирует навсегда (deadlock если нет других горутин): // ch <- 1 // блокировка навсегда // <-ch // блокировка навсегда // Полезное применение — отключение case в select a := make(chan int, 1) a <- 42 var b chan int // nil — этот case никогда не выполнится select { case val := <-a: fmt.Println("Из a:", val) // всегда выберется этот case case val := <-b: fmt.Println("Из b:", val) // никогда — b == nil }}
🏠 Домашнее задание
Реализуйте пинг-понг между двумя горутинами через небуферизованный канал. Горутины обмениваются сообщениями 10 раз.
Создайте генератор простых чисел через канал: горутина отправляет простые числа, main читает первые 20.
Напишите функцию merge, которая принимает два канала <-chan int и возвращает один канал, объединяющий значения из обоих.
Объясните разницу между ch := make(chan int) и ch := make(chan int, 1) — приведите пример, где они ведут себя по-разному.
5. Select
Оператор select позволяет горутине ожидать несколько операций с каналами одновременно. Он похож на switch, но каждый case — это операция отправки или получения через канал.
package mainimport ( "fmt" "time")func main() { ch1 := make(chan string) ch2 := make(chan string) // Горутина, отвечающая через 1 секунду go func() { time.Sleep(1 * time.Second) ch1 <- "Ответ от сервиса A" }() // Горутина, отвечающая через 2 секунды go func() { time.Sleep(2 * time.Second) ch2 <- "Ответ от сервиса B" }() // select ждёт первый готовый канал for i := 0; i < 2; i++ { select { case msg := <-ch1: fmt.Println("ch1:", msg) case msg := <-ch2: fmt.Println("ch2:", msg) } }}
Таймаут с time.After
package mainimport ( "fmt" "time")func slowOperation() <-chan string { ch := make(chan string) go func() { time.Sleep(3 * time.Second) // долгая операция ch <- "результат" }() return ch}func main() { result := slowOperation() select { case val := <-result: fmt.Println("Получен результат:", val) case <-time.After(2 * time.Second): fmt.Println("Таймаут! Операция заняла слишком много времени") }}
time.After создаёт утечку
В цикле time.After создаёт новый таймер на каждой итерации, который не будет собран GC до истечения. Для циклов используйте time.NewTimer с Reset():
timer := time.NewTimer(timeout)defer timer.Stop()for { timer.Reset(timeout) select { case <-ch: // обработка case <-timer.C: // таймаут }}
package mainimport ( "fmt" "sync")// merge объединяет несколько каналов в одинfunc merge(channels ...<-chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup // Для каждого входного канала запускаем горутину-читатель for _, ch := range channels { wg.Add(1) go func(c <-chan int) { defer wg.Done() for val := range c { out <- val } }(ch) } // Закрываем выходной канал, когда все входные закрыты go func() { wg.Wait() close(out) }() return out}func generate(start, count int) <-chan int { ch := make(chan int) go func() { for i := start; i < start+count; i++ { ch <- i } close(ch) }() return ch}func main() { ch1 := generate(0, 5) // 0, 1, 2, 3, 4 ch2 := generate(10, 5) // 10, 11, 12, 13, 14 for val := range merge(ch1, ch2) { fmt.Print(val, " ") } fmt.Println()}
Отключение case через nil канал
package mainimport "fmt"func main() { ch1 := make(chan int, 3) ch2 := make(chan int, 3) // Наполняем каналы ch1 <- 1 ch1 <- 2 close(ch1) ch2 <- 10 ch2 <- 20 close(ch2) // Читаем из обоих каналов, отключая закрытые for ch1 != nil || ch2 != nil { select { case val, ok := <-ch1: if !ok { ch1 = nil // присваиваем nil → этот case больше не выбирается fmt.Println("ch1 закрыт") continue } fmt.Println("ch1:", val) case val, ok := <-ch2: if !ok { ch2 = nil // отключаем этот case fmt.Println("ch2 закрыт") continue } fmt.Println("ch2:", val) } } fmt.Println("Оба канала закрыты")}
🏠 Домашнее задание
Напишите функцию first, которая запускает N горутин с разными задачами и возвращает результат первой завершившейся (остальные отменяются через context).
Реализуйте heartbeat: горутина каждые 500мс отправляет сигнал в канал. Если сигнал не приходит 2 секунды — выведите предупреждение.
Реализуйте rate limiter через time.Ticker и select — обрабатывайте не более 5 запросов в секунду.
6. Пакет sync
Пакет sync предоставляет низкоуровневые примитивы синхронизации. Хотя Go продвигает каналы как основной механизм конкурентности, бывают случаи, когда мьютексы и другие примитивы sync проще и эффективнее.
sync.WaitGroup
WaitGroup ожидает завершения группы горутин. Это самый частый способ дождаться завершения работы.
package mainimport ( "fmt" "sync" "time")func worker(id int, wg *sync.WaitGroup) { defer wg.Done() // ВСЕГДА в defer — гарантия вызова даже при panic fmt.Printf("Воркер %d начал работу\n", id) time.Sleep(time.Duration(id) * 100 * time.Millisecond) fmt.Printf("Воркер %d завершил работу\n", id)}func main() { var wg sync.WaitGroup for i := 1; i <= 5; i++ { wg.Add(1) // ВАЖНО: Add вызывается ДО запуска горутины go worker(i, &wg) } wg.Wait() // блокируется до wg.Counter == 0 fmt.Println("Все воркеры завершены")}
Типичные ошибки с WaitGroup
// ОШИБКА 1: Add после go (гонка!)go func() { wg.Add(1) // может выполниться после Wait! defer wg.Done() // ...}()// ОШИБКА 2: передача по значению (копия, не оригинал)go worker(i, wg) // wg — копия, Done() не уменьшит оригинал// ПРАВИЛЬНО: передача по указателюgo worker(i, &wg)
sync.Mutex
Мьютекс обеспечивает исключительный доступ к разделяемому ресурсу. Только одна горутина может удерживать блокировку в любой момент.
package mainimport ( "fmt" "sync")// SafeCounter — потокобезопасный счётчикtype SafeCounter struct { mu sync.Mutex count int}func (c *SafeCounter) Increment() { c.mu.Lock() defer c.mu.Unlock() // ВСЕГДА defer Unlock — защита от panic c.count++}func (c *SafeCounter) Value() int { c.mu.Lock() defer c.mu.Unlock() return c.count}func main() { counter := &SafeCounter{} var wg sync.WaitGroup // 1000 горутин инкрементируют счётчик for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() counter.Increment() }() } wg.Wait() fmt.Println("Счётчик:", counter.Value()) // всегда 1000}
sync.RWMutex
RWMutex позволяет нескольким горутинам читать одновременно, но запись блокирует всех. Идеален для read-heavy нагрузок.
package mainimport ( "fmt" "sync" "time")// Cache — кеш с поддержкой конкурентного чтенияtype Cache struct { mu sync.RWMutex data map[string]string}func NewCache() *Cache { return &Cache{data: make(map[string]string)}}// Get — читатели не блокируют друг другаfunc (c *Cache) Get(key string) (string, bool) { c.mu.RLock() // блокировка на чтение — множественная defer c.mu.RUnlock() val, ok := c.data[key] return val, ok}// Set — писатель блокирует всех (и читателей, и других писателей)func (c *Cache) Set(key, value string) { c.mu.Lock() // блокировка на запись — исключительная defer c.mu.Unlock() c.data[key] = value}func main() { cache := NewCache() var wg sync.WaitGroup // Писатель wg.Add(1) go func() { defer wg.Done() for i := 0; i < 10; i++ { key := fmt.Sprintf("key_%d", i) cache.Set(key, fmt.Sprintf("value_%d", i)) time.Sleep(10 * time.Millisecond) } }() // Множество читателей for r := 0; r < 5; r++ { wg.Add(1) go func(readerID int) { defer wg.Done() for i := 0; i < 10; i++ { key := fmt.Sprintf("key_%d", i) if val, ok := cache.Get(key); ok { _ = val // используем значение } _ = readerID } }(r) } wg.Wait() fmt.Println("Кеш заполнен")}
sync.Once
Once гарантирует, что функция выполнится ровно один раз, даже при конкурентных вызовах. Идеален для ленивой инициализации.
package mainimport ( "fmt" "sync")type Database struct { connection string}var ( dbInstance *Database dbOnce sync.Once)// GetDB — ленивый синглтон: подключение создаётся при первом вызовеfunc GetDB() *Database { dbOnce.Do(func() { fmt.Println("Подключение к БД... (вызывается только один раз)") dbInstance = &Database{connection: "postgres://localhost:5432"} }) return dbInstance}func main() { var wg sync.WaitGroup // 10 горутин одновременно запрашивают подключение for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() db := GetDB() _ = db }() } wg.Wait() // "Подключение к БД..." выведется только ОДИН раз}
sync.Map — конкурентно-безопасная map, оптимизированная для двух сценариев: (1) ключ записывается один раз и читается многократно; (2) множество горутин работают с непересекающимися наборами ключей.
package mainimport ( "fmt" "sync")func main() { var m sync.Map // Store — сохранить значение m.Store("name", "Go") m.Store("version", 1.22) // Load — прочитать значение if val, ok := m.Load("name"); ok { fmt.Println("name:", val) // name: Go } // LoadOrStore — загрузить или сохранить если отсутствует actual, loaded := m.LoadOrStore("name", "Rust") fmt.Println(actual, loaded) // Go true (уже был) actual, loaded = m.LoadOrStore("lang", "Go") fmt.Println(actual, loaded) // Go false (был сохранён) // LoadAndDelete — прочитать и удалить атомарно val, loaded := m.LoadAndDelete("lang") fmt.Println(val, loaded) // Go true // Range — итерация (порядок не гарантирован) m.Range(func(key, value any) bool { fmt.Printf("%v: %v\n", key, value) return true // false прерывает итерацию })}
sync.Map vs map + Mutex
Используйте sync.Map когда:
Ключи стабильные (запись одного раза, чтение многократное)
Горутины работают с разными ключами (не пересекаются)
Используйте map + sync.RWMutex когда:
Нужен типизированный доступ (sync.Map хранит any)
Частая запись в одни и те же ключи
Нужна итерация с гарантией целостности снимка
sync.Pool
sync.Pool — пул переиспользуемых объектов для снижения нагрузки на GC. Объекты могут быть удалены GC в любой момент.
package mainimport ( "bytes" "fmt" "sync")// Пул буферов — избегаем частого выделения памятиvar bufPool = sync.Pool{ New: func() any { fmt.Println("Создаём новый буфер") return new(bytes.Buffer) },}func processRequest(data string) string { // Берём буфер из пула (или создаём новый) buf := bufPool.Get().(*bytes.Buffer) // ОБЯЗАТЕЛЬНО очищаем перед использованием buf.Reset() // Используем буфер buf.WriteString("Обработано: ") buf.WriteString(data) result := buf.String() // Возвращаем буфер в пул для переиспользования bufPool.Put(buf) return result}func main() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() result := processRequest(fmt.Sprintf("запрос_%d", id)) fmt.Println(result) }(i) } wg.Wait() // "Создаём новый буфер" выведется гораздо меньше 10 раз}
sync.Cond
sync.Cond — условная переменная для ожидания наступления условия. Используется редко, обычно каналы удобнее.
package mainimport ( "fmt" "sync" "time")func main() { var mu sync.Mutex cond := sync.NewCond(&mu) ready := false // Ожидающая горутина go func() { mu.Lock() for !ready { // ВСЕГДА проверяйте условие в цикле (spurious wakeup) cond.Wait() // атомарно: Unlock → wait → Lock } fmt.Println("Данные готовы, обрабатываем!") mu.Unlock() }() // Подготовка данных time.Sleep(1 * time.Second) mu.Lock() ready = true mu.Unlock() cond.Signal() // разбудить одну горутину // cond.Broadcast() // разбудить все ожидающие горутины time.Sleep(100 * time.Millisecond)}
🏠 Домашнее задание
Реализуйте потокобезопасный Set[T comparable] с методами Add, Contains, Remove, Len используя sync.RWMutex.
Создайте пул HTTP-клиентов с помощью sync.Pool, измерьте разницу в аллокациях с testing.B бенчмарком.
Реализуйте ленивую инициализацию конфигурации с помощью sync.OnceValues, где конфигурация может возвращать ошибку.
Напишите rate-limited logger, который использует sync.Mutex для защиты буфера и сбрасывает его раз в секунду.
7. sync/atomic
Пакет sync/atomic предоставляет low-level атомарные операции. Они быстрее мьютексов для простых операций (инкремент, загрузка, сохранение).
package mainimport ( "fmt" "sync" "sync/atomic")func main() { // Атомарные типы (Go 1.19+) — удобный API var counter atomic.Int64 var flag atomic.Bool var name atomic.Value // для произвольных типов var wg sync.WaitGroup // 1000 горутин атомарно инкрементируют счётчик for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() counter.Add(1) // атомарный инкремент }() } wg.Wait() fmt.Println("Счётчик:", counter.Load()) // 1000 — всегда точно // atomic.Bool flag.Store(true) fmt.Println("Флаг:", flag.Load()) // true // Swap — установить новое значение, получить старое old := flag.Swap(false) fmt.Println("Старое:", old, "Новое:", flag.Load()) // CompareAndSwap — установить если текущее значение совпадает // CAS — основа lock-free алгоритмов swapped := counter.CompareAndSwap(1000, 0) // если 1000, то заменить на 0 fmt.Println("CAS успех:", swapped) fmt.Println("Счётчик после CAS:", counter.Load()) // atomic.Value — для произвольных типов (конфигурация, состояние) type Config struct { Host string Port int } name.Store(Config{Host: "localhost", Port: 8080}) cfg := name.Load().(Config) fmt.Printf("Конфиг: %s:%d\n", cfg.Host, cfg.Port)}
atomic vs Mutex
Используйте atomic когда:
Одна переменная (счётчик, флаг, указатель)
Простые операции (Load, Store, Add, CompareAndSwap)
Используйте Mutex когда:
Нужно защитить группу связанных переменных
Нужна сложная логика внутри критической секции
🏠 Домашнее задание
Реализуйте потокобезопасный счётчик тремя способами: Mutex, atomic.Int64, канал. Сравните производительность с бенчмарком.
Используя atomic.Value, реализуйте горячую перезагрузку конфигурации: одна горутина периодически обновляет конфиг, остальные читают.
Реализуйте spin-lock с помощью atomic.CompareAndSwap (для учебных целей, не для production).
8. Паттерны конкурентности
Worker Pool
Worker Pool — фиксированное количество воркеров, обрабатывающих задачи из общего канала. Один из самых востребованных паттернов.
package mainimport ( "fmt" "sync" "time")// Job описывает задачу для обработкиtype Job struct { ID int Payload string}// Result описывает результат обработкиtype Result struct { JobID int Output string}// worker — горутина-обработчик, читает задачи из jobs, пишет результаты в resultsfunc worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) { defer wg.Done() for job := range jobs { // range завершится, когда jobs закроется fmt.Printf("Воркер %d обрабатывает задачу %d\n", id, job.ID) time.Sleep(100 * time.Millisecond) // имитация работы results <- Result{ JobID: job.ID, Output: fmt.Sprintf("Результат задачи %d: %s обработано", job.ID, job.Payload), } }}func main() { const numWorkers = 3 const numJobs = 10 jobs := make(chan Job, numJobs) // буфер для всех задач results := make(chan Result, numJobs) // буфер для всех результатов // Запускаем пул воркеров var wg sync.WaitGroup for w := 1; w <= numWorkers; w++ { wg.Add(1) go worker(w, jobs, results, &wg) } // Отправляем задачи for j := 1; j <= numJobs; j++ { jobs <- Job{ID: j, Payload: fmt.Sprintf("данные_%d", j)} } close(jobs) // сигнализируем воркерам, что задач больше не будет // Ждём завершения всех воркеров и закрываем results go func() { wg.Wait() close(results) }() // Собираем результаты for result := range results { fmt.Println(result.Output) }}
Fan-out / Fan-in
Fan-out — распределение работы от одного источника к нескольким воркерам. Fan-in — объединение результатов от нескольких источников в один канал.
package mainimport ( "fmt" "math/rand" "sync" "time")// source — генератор данных (один источник)func source(count int) <-chan int { out := make(chan int) go func() { defer close(out) for i := 0; i < count; i++ { out <- rand.Intn(100) } }() return out}// fanOut — запускает n воркеров, каждый читает из in и пишет в свой outfunc fanOut(in <-chan int, n int) []<-chan int { channels := make([]<-chan int, n) for i := 0; i < n; i++ { ch := make(chan int) channels[i] = ch go func() { defer close(ch) for val := range in { // Тяжёлая обработка time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond) ch <- val * val // возводим в квадрат } }() } return channels}// fanIn — объединяет несколько каналов в одинfunc fanIn(channels ...<-chan int) <-chan int { out := make(chan int) var wg sync.WaitGroup for _, ch := range channels { wg.Add(1) go func(c <-chan int) { defer wg.Done() for val := range c { out <- val } }(ch) } go func() { wg.Wait() close(out) }() return out}func main() { // Источник: 20 чисел data := source(20) // Fan-out: 4 воркера обрабатывают данные workers := fanOut(data, 4) // Fan-in: объединяем результаты results := fanIn(workers...) // Собираем все результаты var total int for val := range results { total += val } fmt.Println("Сумма квадратов:", total)}
Pipeline
Pipeline — цепочка стадий обработки, соединённых каналами. Каждая стадия — горутина, которая читает из входного канала и пишет в выходной.
package mainimport "fmt"// generate — первая стадия: генерирует числаfunc generate(nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { out <- n } }() return out}// square — вторая стадия: возводит в квадратfunc square(in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { out <- n * n } }() return out}// filter — третья стадия: пропускает только значения > thresholdfunc filter(in <-chan int, threshold int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { if n > threshold { out <- n } } }() return out}// sum — финальная стадия: суммирует все значенияfunc sum(in <-chan int) int { total := 0 for n := range in { total += n } return total}func main() { // Строим пайплайн: generate → square → filter → sum // // generate(1,2,3,4,5) → square → filter(>5) → sum // 1,2,3,4,5 → 1,4,9,16,25 → 9,16,25 → 50 nums := generate(1, 2, 3, 4, 5) squared := square(nums) filtered := filter(squared, 5) result := sum(filtered) fmt.Println("Результат пайплайна:", result) // 50}
Семафор через буферизованный канал
Буферизованный канал — простейший способ ограничить конкурентность.
package mainimport ( "fmt" "sync" "time")func main() { const maxConcurrent = 3 // максимум 3 одновременных операции // Семафор — буферизованный канал ёмкостью maxConcurrent sem := make(chan struct{}, maxConcurrent) var wg sync.WaitGroup for i := 1; i <= 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() sem <- struct{}{} // захватываем слот (блокируемся если все заняты) defer func() { <-sem }() // освобождаем слот fmt.Printf("Задача %d начала работу (активных: %d)\n", id, len(sem)) time.Sleep(500 * time.Millisecond) fmt.Printf("Задача %d завершена\n", id) }(i) } wg.Wait()}
Rate Limiter
Ограничение частоты операций — критически важный паттерн для API и внешних сервисов.
package mainimport ( "context" "fmt" "time" "golang.org/x/time/rate")func main() { // Создаём rate limiter: 5 запросов в секунду, burst до 10 // rate.Every(200*time.Millisecond) = 5 запросов в секунду limiter := rate.NewLimiter(rate.Every(200*time.Millisecond), 10) ctx := context.Background() // Имитация 20 запросов for i := 1; i <= 20; i++ { // Wait блокирует до получения токена if err := limiter.Wait(ctx); err != nil { fmt.Printf("Запрос %d: ошибка rate limit: %v\n", i, err) continue } fmt.Printf("Запрос %d обработан в %s\n", i, time.Now().Format("15:04:05.000")) }}// Простой rate limiter через time.Ticker (без внешних зависимостей)func simpleLimiter() { // 5 запросов в секунду ticker := time.NewTicker(200 * time.Millisecond) defer ticker.Stop() requests := make(chan int, 100) for i := 1; i <= 20; i++ { requests <- i } close(requests) for req := range requests { <-ticker.C // ждём тик — ограничиваем частоту fmt.Printf("Обработан запрос %d\n", req) }}
Circuit Breaker
Circuit Breaker предотвращает каскадные отказы, прерывая вызовы к неработающему сервису.
package mainimport ( "errors" "fmt" "sync" "time")// Состояния Circuit Breakertype State intconst ( StateClosed State = iota // нормальная работа, запросы проходят StateOpen // сервис недоступен, запросы отклоняются StateHalfOpen // пробный запрос для проверки восстановления)func (s State) String() string { switch s { case StateClosed: return "CLOSED" case StateOpen: return "OPEN" case StateHalfOpen: return "HALF-OPEN" } return "UNKNOWN"}// CircuitBreaker — реализация паттернаtype CircuitBreaker struct { mu sync.Mutex state State failures int // счётчик последовательных ошибок maxFailures int // порог для открытия resetTimeout time.Duration // время до перехода в half-open lastFailureTime time.Time // время последней ошибки}var ErrCircuitOpen = errors.New("circuit breaker is open")func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker { return &CircuitBreaker{ state: StateClosed, maxFailures: maxFailures, resetTimeout: resetTimeout, }}// Execute выполняет функцию через circuit breakerfunc (cb *CircuitBreaker) Execute(fn func() error) error { cb.mu.Lock() switch cb.state { case StateOpen: // Проверяем, не пора ли перейти в half-open if time.Since(cb.lastFailureTime) > cb.resetTimeout { cb.state = StateHalfOpen fmt.Println("[CB] Переход в HALF-OPEN") } else { cb.mu.Unlock() return ErrCircuitOpen } } cb.mu.Unlock() // Выполняем функцию err := fn() cb.mu.Lock() defer cb.mu.Unlock() if err != nil { cb.failures++ cb.lastFailureTime = time.Now() if cb.failures >= cb.maxFailures { cb.state = StateOpen fmt.Printf("[CB] Переход в OPEN (ошибок: %d)\n", cb.failures) } return err } // Успех — сбрасываем счётчик cb.failures = 0 if cb.state == StateHalfOpen { cb.state = StateClosed fmt.Println("[CB] Переход в CLOSED (сервис восстановлен)") } return nil}func main() { cb := NewCircuitBreaker(3, 2*time.Second) // Имитация вызовов к нестабильному сервису callCount := 0 unstableService := func() error { callCount++ if callCount <= 5 { // первые 5 вызовов — ошибка return errors.New("service unavailable") } return nil // потом восстанавливается } for i := 1; i <= 10; i++ { err := cb.Execute(unstableService) fmt.Printf("Вызов %d: err=%v\n", i, err) time.Sleep(500 * time.Millisecond) }}
Готовые решения Circuit Breaker
В production рекомендуется использовать библиотеку sony/gobreaker, которая предоставляет полноценную реализацию с настройками, метриками и callback-функциями.
Or-done канал
Паттерн для безопасного чтения из канала с поддержкой отмены через context:
package mainimport ( "context" "fmt" "time")// orDone оборачивает чтение из канала с поддержкой отменыfunc orDone(ctx context.Context, ch <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for { select { case <-ctx.Done(): return // контекст отменён case val, ok := <-ch: if !ok { return // канал закрыт } // Двойной select: отправляем с проверкой отмены select { case out <- val: case <-ctx.Done(): return } } } }() return out}func main() { // Бесконечный генератор gen := make(chan int) go func() { i := 0 for { gen <- i i++ time.Sleep(100 * time.Millisecond) } }() // Читаем с таймаутом 500мс ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() for val := range orDone(ctx, gen) { fmt.Println("Значение:", val) } fmt.Println("Завершено по таймауту")}
🏠 Домашнее задание
Реализуйте Worker Pool с возможностью динамического масштабирования — добавление и удаление воркеров без остановки.
Создайте pipeline для обработки текстовых файлов: чтение строк → фильтрация → трансформация → запись результата.
Реализуйте Circuit Breaker с экспоненциальным backoff для retry в состоянии half-open.
Напишите per-key rate limiter: каждый уникальный ключ (например, IP-адрес) имеет свой лимит.
9. Context
Пакет context — стандартный механизм Go для управления жизненным циклом операций: отмена, таймауты, передача метаданных. Контекст пронизывает весь стек вызовов.
Правила использования context
Context — всегда первый параметр функции: func DoSomething(ctx context.Context, ...)
Не храните контекст в структурах (кроме исключительных случаев)
Всегда вызывайте cancel() через defer
Не передавайте nil context — используйте context.TODO() если не уверены
context.Background() и context.TODO()
package mainimport "context"func main() { // Background — корневой контекст, никогда не отменяется // Используется как отправная точка в main(), init(), тестах ctx := context.Background() // TODO — заглушка "я добавлю правильный контекст позже" // Используется при рефакторинге, когда пока неясно какой контекст нужен ctxTodo := context.TODO() _ = ctx _ = ctxTodo}
WithCancel — ручная отмена
package mainimport ( "context" "fmt" "time")func worker(ctx context.Context, id int) { for { select { case <-ctx.Done(): // канал закрывается при отмене fmt.Printf("Воркер %d остановлен: %v\n", id, ctx.Err()) return default: fmt.Printf("Воркер %d работает...\n", id) time.Sleep(200 * time.Millisecond) } }}func main() { // WithCancel возвращает дочерний контекст и функцию отмены ctx, cancel := context.WithCancel(context.Background()) // Запускаем воркеров for i := 1; i <= 3; i++ { go worker(ctx, i) } time.Sleep(1 * time.Second) // Отменяем контекст — ВСЕ дочерние горутины получат сигнал cancel() // ВСЕГДА вызывайте cancel(), обычно через defer time.Sleep(100 * time.Millisecond) // даём горутинам время завершиться fmt.Println("Все воркеры остановлены")}
WithTimeout и WithDeadline
package mainimport ( "context" "fmt" "time")// slowOperation имитирует долгую операциюfunc slowOperation(ctx context.Context) (string, error) { select { case <-time.After(3 * time.Second): // операция занимает 3 секунды return "результат", nil case <-ctx.Done(): return "", ctx.Err() // context.DeadlineExceeded или context.Canceled }}func main() { // WithTimeout — отменяется автоматически через указанное время ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() // ВСЕГДА defer cancel(), даже если таймаут сработает раньше result, err := slowOperation(ctx) if err != nil { fmt.Println("Ошибка:", err) // context deadline exceeded return } fmt.Println("Результат:", result) // WithDeadline — отменяется в конкретный момент времени deadline := time.Now().Add(2 * time.Second) ctxDl, cancelDl := context.WithDeadline(context.Background(), deadline) defer cancelDl() // Проверяем дедлайн if dl, ok := ctxDl.Deadline(); ok { fmt.Println("Дедлайн:", dl.Format(time.RFC3339)) }}
WithValue — передача данных
package mainimport ( "context" "fmt" "net/http")// ПРАВИЛЬНО: используйте пользовательский тип для ключей,// чтобы избежать конфликтов между пакетамиtype contextKey stringconst ( requestIDKey contextKey = "request_id" userIDKey contextKey = "user_id")func middleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Добавляем request ID в контекст ctx := context.WithValue(r.Context(), requestIDKey, "req-12345") ctx = context.WithValue(ctx, userIDKey, 42) // Передаём обогащённый контекст дальше next.ServeHTTP(w, r.WithContext(ctx)) })}func handler(w http.ResponseWriter, r *http.Request) { // Извлекаем значения из контекста reqID, _ := r.Context().Value(requestIDKey).(string) userID, _ := r.Context().Value(userIDKey).(int) fmt.Fprintf(w, "Request: %s, User: %d\n", reqID, userID)}func main() { mux := http.NewServeMux() mux.Handle("/", middleware(http.HandlerFunc(handler))) // http.ListenAndServe(":8080", mux)}
Не злоупотребляйте WithValue
WithValue предназначен для request-scoped данных: request ID, trace ID, аутентифицированный пользователь.
НЕ используйте его для:
Передачи обязательных параметров функции (используйте аргументы)
Конфигурации или зависимостей (используйте dependency injection)
Больших объектов (значения копируются при создании дочерних контекстов)
WithoutCancel (Go 1.21) — для фоновых задач
package mainimport ( "context" "fmt" "time")func main() { // Создаём контекст запроса с таймаутом reqCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() // WithoutCancel создаёт контекст с теми же значениями, // но НЕ отменяемый при отмене родителя // Полезно для фоновых задач: логирование, метрики, аудит bgCtx := context.WithoutCancel(reqCtx) // Запускаем фоновую задачу, которая переживёт запрос go func() { // bgCtx не отменится, когда reqCtx будет отменён time.Sleep(2 * time.Second) fmt.Println("Фоновая задача завершена, ctx.Err():", bgCtx.Err()) // nil }() time.Sleep(3 * time.Second)}
context.AfterFunc (Go 1.21) — callback при отмене
package mainimport ( "context" "fmt" "time")func main() { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) // AfterFunc вызывает функцию в отдельной горутине при отмене контекста stop := context.AfterFunc(ctx, func() { fmt.Println("Контекст отменён! Выполняем очистку...") }) // stop() можно вызвать, чтобы отменить callback до срабатывания _ = stop time.Sleep(2 * time.Second) cancel() // можно вызвать и после таймаута для порядка}
Context в HTTP-обработчиках
package mainimport ( "context" "fmt" "net/http" "time")func fetchData(ctx context.Context, url string) (string, error) { // Создаём HTTP-запрос с контекстом req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return "", err } resp, err := http.DefaultClient.Do(req) if err != nil { return "", err // если контекст отменён, ошибка будет context.Canceled } defer resp.Body.Close() return fmt.Sprintf("Status: %d", resp.StatusCode), nil}func apiHandler(w http.ResponseWriter, r *http.Request) { // r.Context() автоматически отменяется, когда клиент отключается ctx := r.Context() // Добавляем свой таймаут поверх (но не больше оставшегося) ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() result, err := fetchData(ctx, "https://api.example.com/data") if err != nil { http.Error(w, err.Error(), http.StatusGatewayTimeout) return } fmt.Fprint(w, result)}
Напишите функцию, которая делает HTTP-запрос с таймаутом через context. Если запрос не завершился за 2 секунды, верните ошибку.
Реализуйте цепочку контекстов: parent → child1 (таймаут 5с) → child2 (таймаут 2с). Покажите, как отмена parent отменяет всех потомков.
Используя context.AfterFunc, реализуйте уведомление (запись в лог) при отмене контекста запроса.
Реализуйте middleware, добавляющее request-ID и trace-ID в контекст, и обработчик, который логирует эти значения.
10. errgroup
Пакет golang.org/x/sync/errgroup — удобная обёртка над sync.WaitGroup с поддержкой ошибок и автоматической отменой контекста при первой ошибке.
package mainimport ( "context" "fmt" "io" "net/http" "time" "golang.org/x/sync/errgroup")// fetchURL загружает URL и возвращает размер ответаfunc fetchURL(ctx context.Context, url string) (int, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return 0, fmt.Errorf("создание запроса для %s: %w", url, err) } resp, err := http.DefaultClient.Do(req) if err != nil { return 0, fmt.Errorf("загрузка %s: %w", url, err) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return 0, fmt.Errorf("чтение %s: %w", url, err) } return len(body), nil}func main() { urls := []string{ "https://go.dev", "https://pkg.go.dev", "https://blog.golang.org", } // WithContext: при первой ошибке контекст отменяется, // что отменит остальные HTTP-запросы g, ctx := errgroup.WithContext(context.Background()) // Результаты (потокобезопасный доступ, т.к. каждый индекс уникален) results := make([]int, len(urls)) for i, url := range urls { i, url := i, url // копия для замыкания (до Go 1.22) g.Go(func() error { size, err := fetchURL(ctx, url) if err != nil { return err } results[i] = size return nil }) } // Wait ожидает все горутины и возвращает первую ошибку if err := g.Wait(); err != nil { fmt.Println("Ошибка:", err) return } for i, url := range urls { fmt.Printf("%s: %d байт\n", url, results[i]) }}
SetLimit — ограничение конкурентности (Go 1.20+)
package mainimport ( "context" "fmt" "time" "golang.org/x/sync/errgroup")func main() { g, ctx := errgroup.WithContext(context.Background()) // Ограничиваем до 3 одновременных горутин g.SetLimit(3) for i := 1; i <= 10; i++ { i := i // g.Go автоматически блокируется, если лимит достигнут g.Go(func() error { select { case <-ctx.Done(): return ctx.Err() default: } fmt.Printf("Задача %d начата\n", i) time.Sleep(500 * time.Millisecond) fmt.Printf("Задача %d завершена\n", i) return nil }) } if err := g.Wait(); err != nil { fmt.Println("Ошибка:", err) }}
errgroup vs sync.WaitGroup
Используйте errgroup когда:
Горутины возвращают ошибки, и вы хотите получить первую ошибку
Нужна автоматическая отмена остальных горутин при ошибке
Нужен лимит конкурентности (SetLimit)
Используйте sync.WaitGroup когда:
Горутины не возвращают ошибок (fire-and-forget)
Не нужна автоматическая отмена
🏠 Домашнее задание
Используя errgroup, напишите параллельный загрузчик, который скачивает 5 URL одновременно и возвращает суммарный размер. При ошибке в любом запросе остальные должны отмениться.
Добавьте SetLimit(2) и сравните время выполнения с неограниченной версией.
Реализуйте параллельную проверку здоровья нескольких сервисов с таймаутом 3 секунды.
11. singleflight
Пакет golang.org/x/sync/singleflight гарантирует, что для одного и того же ключа одновременно выполняется только один вызов функции. Все остальные горутины ждут и получают тот же результат. Идеален для предотвращения cache stampede.
package mainimport ( "fmt" "sync" "time" "golang.org/x/sync/singleflight")// Имитация медленного запроса к БДfunc fetchFromDB(key string) (string, error) { fmt.Printf("[DB] Загрузка данных для ключа: %s\n", key) time.Sleep(1 * time.Second) // долгий запрос return fmt.Sprintf("данные_для_%s", key), nil}type CacheService struct { cache map[string]string mu sync.RWMutex group singleflight.Group}func NewCacheService() *CacheService { return &CacheService{ cache: make(map[string]string), }}func (s *CacheService) Get(key string) (string, error) { // Шаг 1: проверяем кеш s.mu.RLock() if val, ok := s.cache[key]; ok { s.mu.RUnlock() return val, nil } s.mu.RUnlock() // Шаг 2: кеш промах — используем singleflight // Все горутины с одинаковым key будут ждать один запрос к БД val, err, shared := s.group.Do(key, func() (any, error) { result, err := fetchFromDB(key) if err != nil { return nil, err } // Обновляем кеш s.mu.Lock() s.cache[key] = result s.mu.Unlock() return result, nil }) if err != nil { return "", err } fmt.Printf("[Cache] key=%s shared=%v\n", key, shared) // shared=true если результат разделён return val.(string), nil}func main() { service := NewCacheService() var wg sync.WaitGroup // 10 горутин одновременно запрашивают один ключ for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() val, err := service.Get("user:123") if err != nil { fmt.Printf("Горутина %d: ошибка %v\n", id, err) return } fmt.Printf("Горутина %d: %s\n", id, val) }(i) } wg.Wait() // "[DB] Загрузка данных для ключа: user:123" выведется ТОЛЬКО ОДИН РАЗ!}
Cache stampede (стемпид кеша)
Когда запись в кеше истекает, и 1000 горутин одновременно обнаруживают промах кеша, все 1000 пойдут в БД. Это может перегрузить базу данных.
singleflight решает эту проблему: только одна горутина пойдёт в БД, остальные 999 дождутся её результата.
🏠 Домашнее задание
Реализуйте кеш с singleflight и TTL (время жизни записи). Когда TTL истекает, первый запрос обновляет кеш, остальные ждут.
Используйте метод DoChan вместо Do для неблокирующего вызова с поддержкой context.
Объясните, почему singleflight не подходит для мутирующих операций (запись, удаление).
12. Ошибки в горутинах
panic в горутине убивает всю программу
package mainimport ( "fmt" "time")func main() { go func() { panic("что-то пошло не так!") // убьёт ВСЮ программу, не только горутину }() time.Sleep(1 * time.Second) // не доберёмся сюда fmt.Println("Этот код не выполнится")}
Panic в горутине
panic в горутине, если не перехвачен через recover, убивает весь процесс, а не только горутину. recover() работает только в deferred-функции той же горутины, где произошёл panic.
Безопасный запуск горутин
package mainimport ( "fmt" "log" "sync")// safeGo — обёртка для безопасного запуска горутиныfunc safeGo(fn func()) { go func() { defer func() { if r := recover(); r != nil { // Логируем panic вместо падения всей программы log.Printf("RECOVERED panic в горутине: %v", r) } }() fn() }()}// safeGoWithError — обёртка с возвратом ошибки через каналfunc safeGoWithError(fn func() error) <-chan error { errCh := make(chan error, 1) go func() { defer func() { if r := recover(); r != nil { errCh <- fmt.Errorf("panic: %v", r) } }() errCh <- fn() }() return errCh}func main() { var wg sync.WaitGroup // Безопасный запуск — panic не убьёт программу wg.Add(1) safeGo(func() { defer wg.Done() panic("ой!") }) // Запуск с возвратом ошибки errCh := safeGoWithError(func() error { panic("паника в функции с ошибкой!") }) wg.Wait() if err := <-errCh; err != nil { fmt.Println("Получена ошибка:", err) } fmt.Println("Программа продолжает работать!")}
recover только в defer той же горутины
package mainimport "fmt"func main() { // ОШИБКА: recover в main не поймает panic в другой горутине defer func() { if r := recover(); r != nil { fmt.Println("Поймали:", r) // НЕ СРАБОТАЕТ для panic в горутине } }() go func() { panic("panic в горутине") // убьёт программу }() select {} // ждём}
🏠 Домашнее задание
Напишите SafeRunner, который запускает набор функций в горутинах, перехватывает panic каждой, и возвращает slice ошибок.
Реализуйте retry-обёртку: если горутина паникует, перезапустить её до 3 раз.
Объясните, почему defer recover() (без анонимной функции) не работает.
13. Race conditions
Race condition (состояние гонки) — ситуация, когда результат программы зависит от порядка выполнения горутин. Go предоставляет встроенный детектор гонок.
package mainimport ( "fmt" "sync")func main() { // ПРИМЕР ГОНКИ: несколько горутин пишут в одну переменную counter := 0 var wg sync.WaitGroup for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() counter++ // DATA RACE! Чтение и запись без синхронизации }() } wg.Wait() fmt.Println("Счётчик:", counter) // НЕ гарантировано 1000}
Детектор гонок
# Запуск с детектором гонокgo run -race main.go# Тесты с детекторомgo test -race ./...# Сборка с детектором (для тестовых окружений)go build -race -o myapp .
Детектор гонок в production
Детектор гонок замедляет программу в 5-10 раз и увеличивает потребление памяти в 5-10 раз. Используйте его в тестах и CI/CD, но НИКОГДА в production.
Типичные примеры гонок и исправления
package mainimport ( "fmt" "sync" "sync/atomic")func main() { // ПРИМЕР 1: Гонка при чтении/записи map // НЕПРАВИЛЬНО: // m := make(map[int]int) // go func() { m[1] = 1 }() // go func() { m[2] = 2 }() // PANIC: concurrent map writes // ПРАВИЛЬНО: sync.RWMutex var mu sync.RWMutex m := make(map[int]int) var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(1) go func(n int) { defer wg.Done() mu.Lock() m[n] = n * n mu.Unlock() }(i) } wg.Wait() fmt.Println("Map size:", len(m)) // ПРИМЕР 2: Гонка при чтении bool-флага // НЕПРАВИЛЬНО: // done := false // go func() { done = true }() // for !done {} // busy wait — data race + может не увидеть изменение // ПРАВИЛЬНО: atomic.Bool var done atomic.Bool wg.Add(1) go func() { defer wg.Done() done.Store(true) }() wg.Wait() fmt.Println("Done:", done.Load())}
🏠 Домашнее задание
Напишите программу с намеренной гонкой данных и запустите с -race. Проанализируйте вывод детектора.
Исправьте гонку тремя способами: mutex, atomic, канал. Сравните производительность.
Запустите свои тесты из предыдущих глав с -race и исправьте найденные гонки.
14. Частые ошибки
Утечка горутин (goroutine leak)
Горутина, заблокированная навсегда — это утечка. Она потребляет память и никогда не освободит ресурсы.
package mainimport ( "context" "fmt" "runtime" "time")func leakyFunction() { ch := make(chan int) // небуферизованный канал go func() { val := <-ch // НАВСЕГДА заблокируется — никто не отправит в ch fmt.Println(val) }() // Функция возвращается, но горутина живёт вечно}func fixedWithBuffer() { ch := make(chan int, 1) // буферизованный канал go func() { select { case val := <-ch: fmt.Println(val) case <-time.After(5 * time.Second): fmt.Println("Таймаут, горутина завершается") return } }()}func fixedWithContext() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() ch := make(chan int) go func() { select { case val := <-ch: fmt.Println(val) case <-ctx.Done(): fmt.Println("Контекст отменён, горутина завершается") return } }()}func main() { fmt.Println("Горутин до утечки:", runtime.NumGoroutine()) for i := 0; i < 100; i++ { leakyFunction() } time.Sleep(100 * time.Millisecond) fmt.Println("Горутин после утечки:", runtime.NumGoroutine()) // ~101 — утечка!}
Захват переменной цикла (до Go 1.22)
package mainimport ( "fmt" "sync")func main() { var wg sync.WaitGroup // ОШИБКА (до Go 1.22): все горутины видят последнее значение i // for i := 0; i < 5; i++ { // wg.Add(1) // go func() { // defer wg.Done() // fmt.Println(i) // все выведут 5 // }() // } // ИСПРАВЛЕНИЕ 1: передать через параметр for i := 0; i < 5; i++ { wg.Add(1) go func(n int) { defer wg.Done() fmt.Println("Вариант 1:", n) // правильные значения }(i) } // ИСПРАВЛЕНИЕ 2: локальная копия for i := 0; i < 5; i++ { i := i // создаём новую переменную i в каждой итерации wg.Add(1) go func() { defer wg.Done() fmt.Println("Вариант 2:", i) // правильные значения }() } wg.Wait() // Начиная с Go 1.22, переменная цикла создаётся заново // на каждой итерации — проблема решена на уровне языка}
Go 1.22 исправил захват переменных цикла
С Go 1.22 каждая итерация for создаёт новую переменную. Старый код теперь работает корректно. Но для совместимости с Go < 1.22 рекомендуется явно копировать переменную.
Забытый cancel — утечка контекста
package mainimport ( "context" "fmt" "runtime")func main() { // ОШИБКА: забыли вызвать cancel for i := 0; i < 1000; i++ { _, _ = context.WithCancel(context.Background()) // cancel() НЕ вызван — утечка горутин таймеров } fmt.Println("Горутин:", runtime.NumGoroutine()) // может быть больше ожидаемого // ПРАВИЛЬНО: всегда defer cancel() ctx, cancel := context.WithTimeout(context.Background(), 0) defer cancel() _ = ctx}
Конкурентная запись в map — panic (не data race!)
package mainimport "sync"func main() { m := make(map[int]int) var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(1) go func(n int) { defer wg.Done() // PANIC: concurrent map writes // Это не data race (детектор может не поймать), // а runtime panic — Go проверяет конкурентный доступ к map m[n] = n }(i) } wg.Wait()}// ИСПРАВЛЕНИЕ: sync.RWMutex или sync.Mapfunc fixed() { var mu sync.Mutex m := make(map[int]int) var wg sync.WaitGroup for i := 0; i < 100; i++ { wg.Add(1) go func(n int) { defer wg.Done() mu.Lock() m[n] = n mu.Unlock() }(i) } wg.Wait()}
🏠 Домашнее задание
Напишите функцию detectLeaks(), которая сравнивает runtime.NumGoroutine() до и после выполнения кода и предупреждает об утечке.
Найдите и исправьте все goroutine leaks в следующем коде:
func process(urls []string) { for _, url := range urls { ch := make(chan string) go func() { ch <- fetch(url) }() }}
Напишите unit-тест, который проверяет отсутствие утечек горутин после вызова вашей функции.
15. Модель памяти Go
Модель памяти Go определяет, при каких условиях чтение переменной в одной горутине гарантированно увидит значение, записанное в другой горутине.
Happens-before
Отношение happens-before определяет порядок видимости операций между горутинами. Если операция A happens-before операции B, то эффекты A гарантированно видны B.
Гарантии happens-before
package mainimport ( "fmt" "sync")func main() { // 1. Операции с каналами — синхронизация ch := make(chan int) x := 0 go func() { x = 42 // запись в x ch <- 1 // отправка happens-before получения }() <-ch // получение fmt.Println(x) // ГАРАНТИРОВАННО 42 // 2. sync.Mutex — Lock/Unlock как точка синхронизации var mu sync.Mutex y := 0 go func() { mu.Lock() y = 100 // запись под мьютексом mu.Unlock() // Unlock happens-before следующего Lock }() mu.Lock() // ждёт Unlock в горутине выше fmt.Println(y) // ГАРАНТИРОВАННО 100 mu.Unlock() // 3. sync.WaitGroup — Done happens-before Wait возвращает var wg sync.WaitGroup z := 0 wg.Add(1) go func() { z = 200 wg.Done() // Done happens-before Wait return }() wg.Wait() fmt.Println(z) // ГАРАНТИРОВАННО 200}
НЕПРАВИЛЬНО: busy wait без синхронизации
package mainimport "fmt"func main() { done := false value := 0 go func() { value = 42 done = true // БЕЗ синхронизации: main может НИКОГДА не увидеть true }() // НЕПРАВИЛЬНО: busy wait на bool без синхронизации // 1. Компилятор может оптимизировать цикл (done никогда не меняется с точки зрения main) // 2. CPU может кешировать done, не видя изменения из другого потока // 3. Нет гарантии happens-before для value for !done { } fmt.Println(value) // НЕ ГАРАНТИРОВАННО 42!}
ПРАВИЛЬНО: канал как точка синхронизации
package mainimport "fmt"func main() { done := make(chan struct{}) value := 0 go func() { value = 42 close(done) // close happens-before receive из закрытого канала }() <-done // ждёт закрытия канала fmt.Println(value) // ГАРАНТИРОВАННО 42}
Правило: нет синхронизации — нет гарантий
Без явной синхронизации (канал, мьютекс, atomic) нет никаких гарантий относительно порядка видимости изменений между горутинами. Даже time.Sleep НЕ является синхронизацией!
🏠 Домашнее задание
Объясните, почему time.Sleep(1*time.Second) не является достаточной синхронизацией.
Нарисуйте граф happens-before для программы с двумя горутинами, каналом и мьютексом.
16. Профилирование конкурентного кода
pprof: профиль горутин
package mainimport ( "fmt" "log" "net/http" _ "net/http/pprof" // регистрирует обработчики pprof "time")func leakyWorker(id int) { ch := make(chan int) // навсегда заблокируется <-ch _ = id}func main() { // Запускаем pprof HTTP-сервер go func() { log.Println(http.ListenAndServe("localhost:6060", nil)) }() // Создаём утечку горутин для демонстрации for i := 0; i < 50; i++ { go leakyWorker(i) } fmt.Println("pprof доступен на http://localhost:6060/debug/pprof/") fmt.Println("Профиль горутин: http://localhost:6060/debug/pprof/goroutine?debug=1") time.Sleep(1 * time.Hour)}
Команды для анализа
# Профиль горутин — показывает стеки всех горутинgo tool pprof http://localhost:6060/debug/pprof/goroutine# В интерактивном режиме pprof:# top — топ функций по количеству горутин# web — граф вызовов в браузере# list func — исходный код с аннотациями# Детектирование утечек горутин# Сравниваем два снимка:curl http://localhost:6060/debug/pprof/goroutine?debug=1 > goroutines_before.txt# ... подождать ...curl http://localhost:6060/debug/pprof/goroutine?debug=1 > goroutines_after.txtdiff goroutines_before.txt goroutines_after.txt
GODEBUG трассировка
# Трассировка планировщика (каждую секунду)GODEBUG=schedtrace=1000 ./myapp# Вывод:# SCHED 0ms: gomaxprocs=4 idleprocs=2 threads=5 spinningthreads=1# idlethreads=0 runqueue=0 [0 0 0 0]## gomaxprocs — количество P# idleprocs — P без работы# threads — общее количество M# runqueue — глобальная очередь# [0 0 0 0] — локальные очереди каждого P# Расширенная трассировка GCGODEBUG=gctrace=1 ./myapp
Тест на утечки горутин
package mainimport ( "runtime" "testing" "time")func TestNoGoroutineLeak(t *testing.T) { before := runtime.NumGoroutine() // Вызываем тестируемую функцию doWork() // Даём горутинам время завершиться time.Sleep(100 * time.Millisecond) after := runtime.NumGoroutine() if after > before+1 { // допускаем +1 на runtime горутины t.Errorf("Утечка горутин! До: %d, После: %d", before, after) }}func doWork() { // тестируемая функция}
Библиотека goleak от Uber
Для автоматического обнаружения утечек горутин в тестах используйте go.uber.org/goleak: