Go стал одним из самых популярных языков для построения микросервисной архитектуры. Быстрая компиляция в единый бинарный файл, встроенная конкурентность, минимальное потребление ресурсов и богатая стандартная библиотека делают его идеальным выбором для создания высоконагруженных распределённых систем. В этой главе мы рассмотрим архитектурные паттерны, способы межсервисного взаимодействия, работу с брокерами сообщений и построение полноценного микросервисного приложения.
1. Архитектура микросервисов
От монолита к микросервисам
Монолитная архитектура — это когда всё приложение представляет собой единый развёртываемый артефакт. Все модули живут в одном процессе, используют одну базу данных и деплоятся вместе. Для небольших команд и стартапов это отличный выбор — простота разработки, отладки и развёртывания.
Проблемы начинаются, когда проект растёт:
Масштабирование — нужно масштабировать всё приложение, даже если нагрузка растёт только на один модуль
Развёртывание — изменение в одной строке требует пересборки и деплоя всего приложения
Надёжность — ошибка в одном модуле может уронить весь сервис
Команда — десятки разработчиков работают в одном репозитории, конфликты при мерже, сложный code review
Закон Конвея (Conway's Law)
Организации проектируют системы, которые копируют структуру коммуникации этой организации. Если у вас три команды — вы получите три сервиса. Микросервисная архитектура работает лучше всего, когда каждый сервис принадлежит одной команде.
Микросервисная архитектура — это подход, при котором приложение разбивается на набор небольших, независимо развёртываемых сервисов. Каждый сервис:
Отвечает за одну бизнес-функцию (Single Responsibility)
Имеет собственную базу данных (Database per Service)
Общается с другими сервисами через сеть (API, сообщения)
Развёртывается независимо от других
Подробнее о принципах проектирования распределённых систем — в System Design.
Когда переходить на микросервисы
Не стоит начинать проект сразу с микросервисов. Рекомендуемый путь:
Начните с монолита — быстрая разработка, простое тестирование
Выделите чёткие модули — хорошая модульная архитектура внутри монолита
Определите границы — когда модули стабильны, их можно выделять в отдельные сервисы
Разделяйте постепенно — по одному сервису за раз, начиная с самых независимых
Когда НЕ нужны микросервисы
Маленькая команда (до 5-7 человек)
MVP и прототипы — скорость важнее архитектуры
Нет DevOps-культуры и инфраструктуры (CI/CD, контейнеры, оркестрация)
Домен недостаточно изучен — вы не знаете, где проводить границы
Преимущества
Преимущество
Описание
Независимый деплой
Каждый сервис деплоится отдельно, без влияния на другие
Масштабирование
Масштабируем только те сервисы, которые под нагрузкой
Технологическое разнообразие
Разные сервисы могут использовать разные языки и БД
Отказоустойчивость
Падение одного сервиса не роняет всю систему
Организационная гибкость
Каждая команда владеет своими сервисами
Вызовы
Вызов
Описание
Распределённые транзакции
Нет единой ACID-транзакции между сервисами
Сетевые сбои
Сеть ненадёжна, сервисы могут быть недоступны
Согласованность данных
Eventual consistency вместо strong consistency
Наблюдаемость
Сложнее отследить запрос через десяток сервисов
Операционная сложность
Больше сервисов = больше мониторинга, деплоев, конфигов
Нарисуйте диаграмму текущего или учебного проекта в виде монолита. Определите, какие модули можно было бы выделить в отдельные сервисы.
Для каждого потенциального сервиса определите: какие данные ему принадлежат, через какой API он общается с другими, какие зависимости у него есть.
Опишите, какие вызовы (из таблицы выше) возникнут при разделении, и предложите подходы к их решению.
2. Паттерны микросервисов
API Gateway
API Gateway — единая точка входа для всех внешних клиентов. Вместо того чтобы клиент обращался к десяткам сервисов напрямую, он обращается к одному Gateway, который маршрутизирует запросы.
Обязанности API Gateway:
Маршрутизация — направление запросов к нужным сервисам
Аутентификация — проверка токенов, JWT-валидация
Rate Limiting — ограничение частоты запросов
Агрегация — объединение ответов от нескольких сервисов
Кеширование — кеширование часто запрашиваемых данных
package mainimport ( "log/slog" "net/http" "net/http/httputil" "net/url" "strings" "sync" "time")// RateLimiter — простой rate limiter на основе token buckettype RateLimiter struct { mu sync.Mutex tokens map[string]int // количество оставшихся токенов по IP lastTime map[string]time.Time rate int // максимум запросов interval time.Duration // за какой период}// NewRateLimiter создаёт новый rate limiterfunc NewRateLimiter(rate int, interval time.Duration) *RateLimiter { return &RateLimiter{ tokens: make(map[string]int), lastTime: make(map[string]time.Time), rate: rate, interval: interval, }}// Allow проверяет, можно ли пропустить запрос от данного IPfunc (rl *RateLimiter) Allow(ip string) bool { rl.mu.Lock() defer rl.mu.Unlock() now := time.Now() last, exists := rl.lastTime[ip] // Если прошло достаточно времени — сбрасываем счётчик if !exists || now.Sub(last) > rl.interval { rl.tokens[ip] = rl.rate rl.lastTime[ip] = now } if rl.tokens[ip] > 0 { rl.tokens[ip]-- return true } return false}// APIGateway — простой API Gateway с маршрутизацией и middlewaretype APIGateway struct { routes map[string]*httputil.ReverseProxy // маршруты к сервисам limiter *RateLimiter logger *slog.Logger}// NewAPIGateway создаёт Gateway с настроенными маршрутамиfunc NewAPIGateway(logger *slog.Logger) *APIGateway { gw := &APIGateway{ routes: make(map[string]*httputil.ReverseProxy), limiter: NewRateLimiter(100, time.Minute), // 100 запросов в минуту logger: logger, } // Регистрация маршрутов к внутренним сервисам gw.registerRoute("/api/users", "http://user-service:8081") gw.registerRoute("/api/orders", "http://order-service:8082") gw.registerRoute("/api/products", "http://product-service:8083") return gw}// registerRoute добавляет маршрут к конкретному сервисуfunc (gw *APIGateway) registerRoute(prefix, target string) { targetURL, err := url.Parse(target) if err != nil { gw.logger.Error("не удалось распарсить URL сервиса", "target", target, "error", err) return } // httputil.ReverseProxy — стандартный обратный прокси Go proxy := httputil.NewSingleHostReverseProxy(targetURL) gw.routes[prefix] = proxy}// ServeHTTP обрабатывает все входящие запросыfunc (gw *APIGateway) ServeHTTP(w http.ResponseWriter, r *http.Request) { // 1. Rate Limiting clientIP := r.RemoteAddr if !gw.limiter.Allow(clientIP) { http.Error(w, `{"error": "rate limit exceeded"}`, http.StatusTooManyRequests) return } // 2. Аутентификация (упрощённая проверка JWT) token := r.Header.Get("Authorization") if token == "" && !strings.HasPrefix(r.URL.Path, "/api/auth") { http.Error(w, `{"error": "unauthorized"}`, http.StatusUnauthorized) return } // 3. Маршрутизация к нужному сервису for prefix, proxy := range gw.routes { if strings.HasPrefix(r.URL.Path, prefix) { gw.logger.Info("проксирование запроса", "path", r.URL.Path, "target", prefix, "method", r.Method, ) proxy.ServeHTTP(w, r) return } } // Сервис не найден http.Error(w, `{"error": "service not found"}`, http.StatusNotFound)}func main() { logger := slog.Default() gateway := NewAPIGateway(logger) server := &http.Server{ Addr: ":8080", Handler: gateway, ReadTimeout: 10 * time.Second, WriteTimeout: 30 * time.Second, } logger.Info("API Gateway запущен", "addr", ":8080") if err := server.ListenAndServe(); err != nil { logger.Error("ошибка сервера", "error", err) }}
Service Discovery
В микросервисной архитектуре сервисы создаются и уничтожаются динамически. Жёсткое указание адресов (hardcoded URLs) не работает. Service Discovery решает задачу: как один сервис находит другой?
Client-side discovery — клиент сам запрашивает реестр и выбирает инстанс:
package discoveryimport ( "fmt" "math/rand" "sync")// ServiceInstance — информация об одном экземпляре сервисаtype ServiceInstance struct { ID string // уникальный идентификатор инстанса Name string // имя сервиса (например, "user-service") Host string // адрес хоста Port int // порт Healthy bool // флаг здоровья}// Addr возвращает полный адрес инстансаfunc (s ServiceInstance) Addr() string { return fmt.Sprintf("http://%s:%d", s.Host, s.Port)}// Registry — простой реестр сервисов (in-memory)// В продакшене используют Consul, etcd или DNS-based discoverytype Registry struct { mu sync.RWMutex services map[string][]ServiceInstance // имя сервиса → список инстансов}// NewRegistry создаёт пустой реестрfunc NewRegistry() *Registry { return &Registry{ services: make(map[string][]ServiceInstance), }}// Register добавляет инстанс сервиса в реестрfunc (r *Registry) Register(instance ServiceInstance) { r.mu.Lock() defer r.mu.Unlock() r.services[instance.Name] = append(r.services[instance.Name], instance)}// Deregister удаляет инстанс из реестра (при остановке сервиса)func (r *Registry) Deregister(serviceName, instanceID string) { r.mu.Lock() defer r.mu.Unlock() instances := r.services[serviceName] for i, inst := range instances { if inst.ID == instanceID { // Удаляем элемент из слайса r.services[serviceName] = append(instances[:i], instances[i+1:]...) return } }}// Discover возвращает случайный здоровый инстанс (простая балансировка)func (r *Registry) Discover(serviceName string) (ServiceInstance, error) { r.mu.RLock() defer r.mu.RUnlock() instances := r.services[serviceName] // Фильтруем только здоровые инстансы var healthy []ServiceInstance for _, inst := range instances { if inst.Healthy { healthy = append(healthy, inst) } } if len(healthy) == 0 { return ServiceInstance{}, fmt.Errorf( "нет доступных инстансов сервиса %s", serviceName, ) } // Случайный выбор — простейшая форма балансировки return healthy[rand.Intn(len(healthy))], nil}// DiscoverAll возвращает все здоровые инстансы сервисаfunc (r *Registry) DiscoverAll(serviceName string) []ServiceInstance { r.mu.RLock() defer r.mu.RUnlock() var healthy []ServiceInstance for _, inst := range r.services[serviceName] { if inst.Healthy { healthy = append(healthy, inst) } } return healthy}
Server-side discovery — используется балансировщик нагрузки (Kubernetes Service, AWS ELB), который сам знает о доступных инстансах. Клиент обращается к фиксированному адресу, балансировщик маршрутизирует запрос.
В Kubernetes
В Kubernetes каждый Service получает DNS-имя вида <service-name>.<namespace>.svc.cluster.local. Это server-side discovery из коробки — не нужен отдельный реестр типа Consul.
Circuit Breaker
Circuit Breaker — паттерн, предотвращающий каскадные сбои. Если сервис, к которому мы обращаемся, постоянно отвечает ошибками, Circuit Breaker «размыкает цепь» и перестаёт отправлять запросы, давая сервису время на восстановление.
Три состояния:
Closed — запросы проходят нормально, ошибки считаются
Open — все запросы немедленно отклоняются без вызова сервиса
Half-Open — пропускается один тестовый запрос; если успешен — переход в Closed, если нет — обратно в Open
Подробнее о реализации конкурентных паттернов — в 05-concurrency.
package circuitbreakerimport ( "errors" "sync" "time")// State — состояние Circuit Breakertype State intconst ( StateClosed State = iota // Нормальная работа StateOpen // Запросы блокируются StateHalfOpen // Тестовый режим)// String возвращает строковое представление состоянияfunc (s State) String() string { switch s { case StateClosed: return "CLOSED" case StateOpen: return "OPEN" case StateHalfOpen: return "HALF-OPEN" default: return "UNKNOWN" }}var ( ErrCircuitOpen = errors.New("circuit breaker is open"))// CircuitBreaker реализует паттерн Circuit Breakertype CircuitBreaker struct { mu sync.Mutex state State failureCount int // текущее количество последовательных ошибок failureThreshold int // порог для перехода в Open successCount int // количество успехов в Half-Open successThreshold int // порог для возврата в Closed timeout time.Duration // время, через которое Open → Half-Open lastFailure time.Time // время последней ошибки}// NewCircuitBreaker создаёт Circuit Breaker с заданными параметрамиfunc NewCircuitBreaker(failureThreshold, successThreshold int, timeout time.Duration) *CircuitBreaker { return &CircuitBreaker{ state: StateClosed, failureThreshold: failureThreshold, successThreshold: successThreshold, timeout: timeout, }}// Execute выполняет функцию через Circuit Breakerfunc (cb *CircuitBreaker) Execute(fn func() error) error { cb.mu.Lock() switch cb.state { case StateOpen: // Проверяем, не пора ли перейти в Half-Open if time.Since(cb.lastFailure) > cb.timeout { cb.state = StateHalfOpen cb.successCount = 0 cb.mu.Unlock() // Пропускаем один тестовый запрос return cb.doRequest(fn) } cb.mu.Unlock() return ErrCircuitOpen case StateHalfOpen: cb.mu.Unlock() return cb.doRequest(fn) default: // StateClosed cb.mu.Unlock() return cb.doRequest(fn) }}// doRequest выполняет запрос и обновляет состояниеfunc (cb *CircuitBreaker) doRequest(fn func() error) error { err := fn() cb.mu.Lock() defer cb.mu.Unlock() if err != nil { cb.failureCount++ cb.lastFailure = time.Now() // Если ошибок слишком много — размыкаем цепь if cb.failureCount >= cb.failureThreshold { cb.state = StateOpen } return err } // Запрос успешен if cb.state == StateHalfOpen { cb.successCount++ // Если достаточно успешных запросов — замыкаем цепь if cb.successCount >= cb.successThreshold { cb.state = StateClosed cb.failureCount = 0 } } else { // В Closed-состоянии сбрасываем счётчик ошибок при успехе cb.failureCount = 0 } return nil}// State возвращает текущее состояниеfunc (cb *CircuitBreaker) GetState() State { cb.mu.Lock() defer cb.mu.Unlock() return cb.state}
Библиотека sony/gobreaker
В продакшене рекомендуется использовать готовую библиотеку github.com/sony/gobreaker, которая предоставляет настраиваемый Circuit Breaker с поддержкой колбэков и метрик.
Retry с экспоненциальной задержкой и jitter
При временных сбоях (таймаут сети, 503) имеет смысл повторить запрос. Но делать это нужно с увеличивающимися интервалами и добавлением случайного отклонения (jitter), чтобы избежать «thundering herd» — когда все клиенты повторяют запросы одновременно.
package retryimport ( "context" "fmt" "math" "math/rand" "time")// Config — настройки политики повторовtype Config struct { MaxRetries int // максимальное количество попыток BaseDelay time.Duration // начальная задержка MaxDelay time.Duration // максимальная задержка Multiplier float64 // множитель для экспоненциального роста JitterRatio float64 // доля случайного отклонения (0.0–1.0)}// DefaultConfig возвращает стандартную конфигурациюfunc DefaultConfig() Config { return Config{ MaxRetries: 3, BaseDelay: 100 * time.Millisecond, MaxDelay: 10 * time.Second, Multiplier: 2.0, JitterRatio: 0.3, // ±30% к вычисленной задержке }}// Do выполняет функцию с повторами при ошибкеfunc Do(ctx context.Context, cfg Config, fn func() error) error { var lastErr error for attempt := 0; attempt <= cfg.MaxRetries; attempt++ { // Выполняем функцию lastErr = fn() if lastErr == nil { return nil // Успех — выходим } // Если это последняя попытка — не ждём if attempt == cfg.MaxRetries { break } // Вычисляем задержку: base * multiplier^attempt delay := float64(cfg.BaseDelay) * math.Pow(cfg.Multiplier, float64(attempt)) if delay > float64(cfg.MaxDelay) { delay = float64(cfg.MaxDelay) } // Добавляем jitter — случайное отклонение jitter := delay * cfg.JitterRatio * (rand.Float64()*2 - 1) actualDelay := time.Duration(delay + jitter) fmt.Printf("Попытка %d/%d не удалась: %v. Повтор через %v\n", attempt+1, cfg.MaxRetries+1, lastErr, actualDelay) // Ждём с учётом контекста (можно отменить извне) select { case <-ctx.Done(): return fmt.Errorf("повторы отменены: %w", ctx.Err()) case <-time.After(actualDelay): // Продолжаем следующую попытку } } return fmt.Errorf("все %d попыток исчерпаны: %w", cfg.MaxRetries+1, lastErr)}
Bulkhead
Паттерн Bulkhead изолирует разные части системы, чтобы отказ одной не затронул другие. Аналогия с переборками на корабле — если одна секция затоплена, остальные защищены.
В Go это реализуется через ограничение конкурентных запросов с помощью семафоров:
package bulkheadimport ( "context" "errors" "time")var ErrBulkheadFull = errors.New("bulkhead: все слоты заняты")// Bulkhead ограничивает количество параллельных запросов к ресурсуtype Bulkhead struct { semaphore chan struct{} // буферизированный канал как семафор timeout time.Duration}// NewBulkhead создаёт Bulkhead с ограничением на maxConcurrent запросовfunc NewBulkhead(maxConcurrent int, timeout time.Duration) *Bulkhead { return &Bulkhead{ semaphore: make(chan struct{}, maxConcurrent), timeout: timeout, }}// Execute выполняет функцию, если есть свободный слотfunc (b *Bulkhead) Execute(ctx context.Context, fn func() error) error { // Пытаемся занять слот с таймаутом select { case b.semaphore <- struct{}{}: // Слот получен — выполняем функцию defer func() { <-b.semaphore }() // освобождаем слот после выполнения return fn() case <-time.After(b.timeout): return ErrBulkheadFull case <-ctx.Done(): return ctx.Err() }}// Пример использования: разные Bulkhead для разных сервисов// userBulkhead := bulkhead.NewBulkhead(20, 5*time.Second) // макс 20 запросов к user-service// orderBulkhead := bulkhead.NewBulkhead(10, 5*time.Second) // макс 10 запросов к order-service//// Если order-service тормозит, он займёт только свои 10 слотов,// а user-service продолжит работать с 20 свободными слотами.
Sidecar-паттерн
Sidecar — это вспомогательный процесс, который развёртывается рядом с основным сервисом (в одном Pod в Kubernetes). Он берёт на себя сквозные задачи: логирование, мониторинг, mTLS, прокси. Пример — Envoy proxy в Istio service mesh. Основной сервис общается с sidecar по localhost, а sidecar уже маршрутизирует трафик, проверяет сертификаты и собирает метрики.
Service Mesh
Service Mesh (Istio, Linkerd) — это инфраструктурный слой, который реализует Sidecar-паттерн для всех сервисов. Он обеспечивает: mTLS между сервисами, маршрутизацию трафика, circuit breakers, retry, наблюдаемость — без изменения кода самих сервисов.
🏠 Домашнее задание
Реализуйте обёртку HTTP-клиента, которая комбинирует Circuit Breaker, Retry и Bulkhead. При вызове внешнего сервиса должны последовательно применяться все три паттерна.
Добавьте к Circuit Breaker колбэк OnStateChange(from, to State), который логирует переходы между состояниями.
Реализуйте Round-Robin балансировку в Registry.Discover() вместо случайного выбора.
3. Межсервисная коммуникация
Сервисы общаются друг с другом двумя основными способами: синхронно и асинхронно. Выбор зависит от требований к задержке, надёжности и связанности сервисов.
Синхронная коммуникация
При синхронном вызове клиент отправляет запрос и ждёт ответа. Это просто, но создаёт временнУю связанность — если вызываемый сервис медленный или недоступен, вызывающий заблокирован.
REST (HTTP/JSON) — самый простой и распространённый способ. Широко поддерживается, легко отлаживается (curl, Postman), человекочитаемый формат.
gRPC (HTTP/2 + Protocol Buffers) — бинарный протокол с типизированными контрактами. Быстрее REST за счёт бинарной сериализации, поддержка стриминга, автогенерация клиентов. Подробнее о gRPC и REST — в 03-networking.
Асинхронная коммуникация
При асинхронном взаимодействии отправитель не ждёт ответа. Он публикует сообщение в брокер, а получатель забирает его когда готов. Это обеспечивает:
Развязанность — сервисы не знают друг о друге
Надёжность — сообщения сохраняются в брокере при недоступности получателя
Обработка пиков — брокер буферизирует нагрузку
Сравнительная таблица
Характеристика
REST
gRPC
Брокер сообщений
Тип связи
Синхронная
Синхронная
Асинхронная
Формат
JSON (текст)
Protobuf (бинарный)
Любой
Производительность
Средняя
Высокая
Зависит от брокера
Типизация
Нет (OpenAPI)
Строгая (.proto)
Нужна схема
Стриминг
Нет (WebSocket)
Да
Да
Связанность
Высокая
Высокая
Низкая
Отладка
Простая
Нужны инструменты
Нужны инструменты
Когда использовать
CRUD API, простые запросы
Внутренняя связь, streaming
События, фоновые задачи
Правило выбора
Нужен немедленный ответ → синхронный вызов (REST/gRPC)
Можно обработать позже → асинхронный (брокер сообщений)
Внутренняя коммуникация с высокими требованиями к скорости → gRPC
Публичный API → REST
Событийная архитектура → брокер сообщений
🏠 Домашнее задание
Реализуйте простой HTTP-клиент для вызова другого сервиса с retry (3 попытки), таймаутом (5 секунд) и логированием. Используйте context.Context для управления отменой.
Подумайте, какие вызовы в вашем проекте можно сделать асинхронными (отправка email, генерация отчётов, обновление кеша).
4. RabbitMQ (amqp091-go)
RabbitMQ — это классический брокер сообщений, реализующий протокол AMQP. Он отлично подходит для задач, где важна гарантия доставки, маршрутизация сообщений по правилам и управление очередями. Подробнее о работе с сетевыми протоколами — в 03-networking.
Основные концепции
Connection — TCP-соединение с брокером (тяжёлый ресурс, одно на приложение)
Channel — виртуальное соединение внутри Connection (лёгкий, один на горутину)
Exchange — маршрутизатор сообщений. Принимает сообщение и направляет в очереди по правилам
Queue — буфер, хранящий сообщения до обработки потребителем
Binding — правило, связывающее Exchange и Queue
Routing Key — ключ, по которому Exchange решает, в какую очередь направить сообщение
Типы Exchange
Тип
Маршрутизация
direct
Точное совпадение routing key с binding key
topic
Паттерн с подстановочными символами (* — одно слово, # — ноль и более слов)
fanout
Отправка во все привязанные очереди (broadcast)
headers
Маршрутизация по заголовкам сообщения
Подключение и настройка
package rabbitmqimport ( "context" "encoding/json" "fmt" "log/slog" "time" amqp "github.com/rabbitmq/amqp091-go")// Connection — обёртка над AMQP-соединением с переподключениемtype Connection struct { conn *amqp.Connection channel *amqp.Channel logger *slog.Logger url string}// NewConnection устанавливает соединение с RabbitMQfunc NewConnection(url string, logger *slog.Logger) (*Connection, error) { // Устанавливаем TCP-соединение conn, err := amqp.Dial(url) if err != nil { return nil, fmt.Errorf("не удалось подключиться к RabbitMQ: %w", err) } // Открываем канал — лёгкое виртуальное соединение ch, err := conn.Channel() if err != nil { conn.Close() return nil, fmt.Errorf("не удалось открыть канал: %w", err) } // Устанавливаем prefetch — сколько сообщений брокер отправит // потребителю до получения подтверждения (ack) // Это контролирует нагрузку на потребителя err = ch.Qos( 10, // prefetch count — не более 10 неподтверждённых сообщений 0, // prefetch size — без ограничения по размеру false, // global — применяется к каналу, а не ко всему соединению ) if err != nil { return nil, fmt.Errorf("не удалось установить QoS: %w", err) } logger.Info("подключение к RabbitMQ установлено", "url", url) return &Connection{ conn: conn, channel: ch, logger: logger, url: url, }, nil}// DeclareExchange создаёт exchange, если он не существуетfunc (c *Connection) DeclareExchange(name, kind string) error { return c.channel.ExchangeDeclare( name, // имя exchange kind, // тип: "direct", "topic", "fanout", "headers" true, // durable — exchange переживёт перезапуск RabbitMQ false, // autoDelete — не удалять, когда нет привязанных очередей false, // internal — можно публиковать напрямую false, // noWait — ждать подтверждения от сервера nil, // args — дополнительные параметры )}// DeclareQueue создаёт очередь и возвращает её имяfunc (c *Connection) DeclareQueue(name string, args amqp.Table) (amqp.Queue, error) { return c.channel.QueueDeclare( name, // имя очереди (пустая строка — сервер сгенерирует уникальное) true, // durable — очередь переживёт перезапуск false, // autoDelete — не удалять, когда нет потребителей false, // exclusive — не эксклюзивная (доступна из других соединений) false, // noWait — ждать подтверждения args, // аргументы (например, для Dead Letter Queue) )}// BindQueue привязывает очередь к exchange с routing keyfunc (c *Connection) BindQueue(queueName, routingKey, exchangeName string) error { return c.channel.QueueBind( queueName, // имя очереди routingKey, // routing key для маршрутизации exchangeName, // имя exchange false, // noWait nil, // args )}// Close закрывает канал и соединениеfunc (c *Connection) Close() { if c.channel != nil { c.channel.Close() } if c.conn != nil { c.conn.Close() } c.logger.Info("соединение с RabbitMQ закрыто")}
Публикация сообщений
// Message — структура сообщения для отправкиtype Message struct { Type string `json:"type"` // тип события Payload interface{} `json:"payload"` // данные Timestamp time.Time `json:"timestamp"` // время создания MessageID string `json:"message_id"` // уникальный идентификатор}// Publish отправляет сообщение в exchange с указанным routing keyfunc (c *Connection) Publish(ctx context.Context, exchange, routingKey string, msg Message) error { // Сериализуем сообщение в JSON body, err := json.Marshal(msg) if err != nil { return fmt.Errorf("ошибка сериализации сообщения: %w", err) } // Публикуем сообщение err = c.channel.PublishWithContext(ctx, exchange, // exchange — куда отправляем routingKey, // routing key — по какому ключу маршрутизировать false, // mandatory — вернуть сообщение, если не удалось маршрутизировать false, // immediate — доставить немедленно или вернуть amqp.Publishing{ ContentType: "application/json", DeliveryMode: amqp.Persistent, // сообщение сохраняется на диск MessageId: msg.MessageID, // для дедупликации Timestamp: msg.Timestamp, Body: body, }, ) if err != nil { return fmt.Errorf("ошибка публикации сообщения: %w", err) } c.logger.Info("сообщение опубликовано", "exchange", exchange, "routing_key", routingKey, "message_id", msg.MessageID, ) return nil}
Потребление сообщений
// MessageHandler — функция-обработчик входящих сообщенийtype MessageHandler func(ctx context.Context, msg Message) error// Consume начинает потребление сообщений из очередиfunc (c *Connection) Consume(ctx context.Context, queueName string, handler MessageHandler) error { // Запускаем потребление deliveries, err := c.channel.Consume( queueName, // имя очереди "", // consumer tag (пустой — сервер сгенерирует) false, // autoAck — НЕ автоматическое подтверждение false, // exclusive false, // noLocal false, // noWait nil, // args ) if err != nil { return fmt.Errorf("ошибка подписки на очередь: %w", err) } c.logger.Info("начато потребление сообщений", "queue", queueName) // Обрабатываем сообщения в цикле for { select { case <-ctx.Done(): c.logger.Info("потребление остановлено", "queue", queueName) return nil case delivery, ok := <-deliveries: if !ok { return fmt.Errorf("канал доставки закрыт") } // Десериализуем сообщение var msg Message if err := json.Unmarshal(delivery.Body, &msg); err != nil { c.logger.Error("ошибка десериализации", "error", err, "body", string(delivery.Body), ) // Nack без requeue — сообщение уйдёт в Dead Letter Queue delivery.Nack(false, false) continue } // Обрабатываем сообщение if err := handler(ctx, msg); err != nil { c.logger.Error("ошибка обработки сообщения", "error", err, "message_id", msg.MessageID, ) // Nack с requeue — сообщение вернётся в очередь // для повторной обработки delivery.Nack(false, true) continue } // Подтверждаем успешную обработку delivery.Ack(false) c.logger.Debug("сообщение обработано", "message_id", msg.MessageID, ) } }}
Dead Letter Queue
Dead Letter Queue (DLQ) — это очередь для «мёртвых» сообщений, которые не удалось обработать. Это позволяет не терять проблемные сообщения и анализировать их позже.
// SetupWithDLQ создаёт основную очередь с привязкой к Dead Letter Queuefunc (c *Connection) SetupWithDLQ(exchangeName, queueName, routingKey string) error { // 1. Создаём DLQ exchange и очередь dlxName := exchangeName + ".dlx" dlqName := queueName + ".dlq" if err := c.DeclareExchange(dlxName, "direct"); err != nil { return fmt.Errorf("ошибка создания DLX: %w", err) } if _, err := c.DeclareQueue(dlqName, nil); err != nil { return fmt.Errorf("ошибка создания DLQ: %w", err) } if err := c.BindQueue(dlqName, routingKey, dlxName); err != nil { return fmt.Errorf("ошибка привязки DLQ: %w", err) } // 2. Создаём основной exchange if err := c.DeclareExchange(exchangeName, "direct"); err != nil { return fmt.Errorf("ошибка создания exchange: %w", err) } // 3. Создаём основную очередь с привязкой к DLX // При Nack (без requeue) сообщение автоматически уйдёт в DLQ args := amqp.Table{ "x-dead-letter-exchange": dlxName, // куда отправлять отклонённые "x-dead-letter-routing-key": routingKey, // с каким routing key "x-message-ttl": int32(60000), // TTL в миллисекундах (60 сек) } if _, err := c.DeclareQueue(queueName, args); err != nil { return fmt.Errorf("ошибка создания очереди: %w", err) } if err := c.BindQueue(queueName, routingKey, exchangeName); err != nil { return fmt.Errorf("ошибка привязки очереди: %w", err) } c.logger.Info("очередь с DLQ настроена", "queue", queueName, "dlq", dlqName, "exchange", exchangeName, ) return nil}
Никогда не используйте autoAck: true в продакшене. Если потребитель упадёт после получения сообщения, но до его обработки — сообщение будет потеряно навсегда. Всегда подтверждайте (Ack) вручную после успешной обработки и отклоняйте (Nack) при ошибке.
🏠 Домашнее задание
Реализуйте паттерн с topic exchange: публикатор отправляет события order.created, order.paid, order.shipped; один потребитель подписан на order.*, другой — только на order.paid.
Добавьте обработку DLQ: отдельный потребитель читает сообщения из Dead Letter Queue и логирует их с деталями ошибки.
Реализуйте механизм повторных попыток с задержкой через TTL и DLQ: при ошибке обработки сообщение уходит в DLQ с TTL=30s, после чего возвращается в основную очередь.
5. Apache Kafka (segmentio/kafka-go)
Apache Kafka — это распределённая платформа для потоковой обработки данных. В отличие от RabbitMQ, Kafka хранит все сообщения на диске и позволяет перечитывать их. Это делает Kafka идеальным для event sourcing, аналитики и потоковой обработки.
Основные концепции
Topic — именованный канал для сообщений (аналог таблицы в БД). Подробнее о хранении данных — в 04-databases.
Partition — раздел внутри топика. Сообщения внутри одной партиции строго упорядочены.
Offset — порядковый номер сообщения внутри партиции. Потребитель отслеживает, до какого offset он прочитал.
Consumer Group — группа потребителей, которая делит партиции между собой. Каждая партиция обрабатывается ровно одним потребителем в группе.
Broker — один сервер Kafka. Кластер состоит из нескольких брокеров.
Replication Factor — сколько копий каждой партиции хранится на разных брокерах (для отказоустойчивости).
Topic: "orders" (3 партиции)
Partition 0: [msg0, msg1, msg2, msg3, msg4, ...] ← Consumer A
Partition 1: [msg0, msg1, msg2, ...] ← Consumer B
Partition 2: [msg0, msg1, msg2, msg3, ...] ← Consumer C
Consumer Group "order-processor" — каждый потребитель
читает из своей партиции
Producer (Писатель)
package kafkaimport ( "context" "encoding/json" "fmt" "log/slog" "time" "github.com/segmentio/kafka-go")// Event — структура события для Kafkatype Event struct { Type string `json:"type"` // тип события Key string `json:"key"` // ключ для партиционирования Payload interface{} `json:"payload"` // данные Timestamp time.Time `json:"timestamp"` // время создания EventID string `json:"event_id"` // уникальный идентификатор}// Producer — писатель сообщений в Kafkatype Producer struct { writer *kafka.Writer logger *slog.Logger}// NewProducer создаёт нового продюсера для указанного топикаfunc NewProducer(brokers []string, topic string, logger *slog.Logger) *Producer { writer := &kafka.Writer{ Addr: kafka.TCP(brokers...), // адреса брокеров Topic: topic, // топик для записи // Стратегия партиционирования по ключу сообщения // Сообщения с одинаковым ключом попадут в одну партицию // Это гарантирует порядок для связанных событий Balancer: &kafka.Hash{}, // Батчинг — накапливаем сообщения перед отправкой BatchSize: 100, // максимум сообщений в батче BatchTimeout: 10 * time.Millisecond, // максимальная задержка // Подтверждения от брокеров // RequireAll — ждём подтверждения от всех реплик // RequireOne — только от лидера // RequireNone — не ждём (самый быстрый, но ненадёжный) RequiredAcks: kafka.RequireAll, // Сжатие сообщений — экономим трафик и место на диске Compression: kafka.Snappy, // Логирование Logger: kafka.LoggerFunc(func(msg string, args ...interface{}) { logger.Debug(fmt.Sprintf(msg, args...)) }), ErrorLogger: kafka.LoggerFunc(func(msg string, args ...interface{}) { logger.Error(fmt.Sprintf(msg, args...)) }), } return &Producer{ writer: writer, logger: logger, }}// Publish отправляет событие в Kafkafunc (p *Producer) Publish(ctx context.Context, event Event) error { // Сериализуем payload в JSON value, err := json.Marshal(event) if err != nil { return fmt.Errorf("ошибка сериализации события: %w", err) } // Создаём сообщение Kafka msg := kafka.Message{ Key: []byte(event.Key), // ключ определяет партицию Value: value, // данные сообщения Headers: []kafka.Header{ {Key: "event_type", Value: []byte(event.Type)}, {Key: "event_id", Value: []byte(event.EventID)}, }, Time: event.Timestamp, } // Отправляем сообщение if err := p.writer.WriteMessages(ctx, msg); err != nil { return fmt.Errorf("ошибка записи в Kafka: %w", err) } p.logger.Info("событие опубликовано", "topic", p.writer.Topic, "event_type", event.Type, "event_id", event.EventID, "key", event.Key, ) return nil}// PublishBatch отправляет несколько событий одним батчемfunc (p *Producer) PublishBatch(ctx context.Context, events []Event) error { messages := make([]kafka.Message, 0, len(events)) for _, event := range events { value, err := json.Marshal(event) if err != nil { return fmt.Errorf("ошибка сериализации: %w", err) } messages = append(messages, kafka.Message{ Key: []byte(event.Key), Value: value, Headers: []kafka.Header{ {Key: "event_type", Value: []byte(event.Type)}, {Key: "event_id", Value: []byte(event.EventID)}, }, Time: event.Timestamp, }) } // WriteMessages в батч-режиме — эффективнее, чем по одному if err := p.writer.WriteMessages(ctx, messages...); err != nil { return fmt.Errorf("ошибка батчевой записи: %w", err) } p.logger.Info("батч событий опубликован", "count", len(events)) return nil}// Close закрывает writer и дожидается отправки оставшихся сообщенийfunc (p *Producer) Close() error { return p.writer.Close()}
Consumer (Потребитель)
// Consumer — потребитель сообщений из Kafka с consumer grouptype Consumer struct { reader *kafka.Reader logger *slog.Logger}// NewConsumer создаёт нового потребителяfunc NewConsumer(brokers []string, topic, groupID string, logger *slog.Logger) *Consumer { reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, Topic: topic, GroupID: groupID, // Consumer Group — координация между потребителями // Стратегия начального чтения // FirstOffset — читаем с самого начала (если нет сохранённого offset) // LastOffset — читаем только новые сообщения StartOffset: kafka.FirstOffset, // Размер батча при чтении MinBytes: 1, // минимум 1 байт — получаем сразу MaxBytes: 10e6, // максимум 10 МБ за один fetch // Таймаут ожидания новых сообщений MaxWait: 1 * time.Second, // Автоматическая перебалансировка при добавлении/удалении потребителей // В consumer group каждая партиция назначается одному потребителю // При изменении состава группы — происходит ребалансировка }) return &Consumer{ reader: reader, logger: logger, }}// EventHandler — функция-обработчик событийtype EventHandler func(ctx context.Context, event Event) error// Consume запускает бесконечный цикл потребления сообщенийfunc (c *Consumer) Consume(ctx context.Context, handler EventHandler) error { c.logger.Info("потребитель запущен", "topic", c.reader.Config().Topic, "group", c.reader.Config().GroupID, ) for { // FetchMessage читает следующее сообщение, но НЕ коммитит offset // Это даёт нам at-least-once семантику — если потребитель упадёт // до коммита, сообщение будет прочитано повторно msg, err := c.reader.FetchMessage(ctx) if err != nil { if ctx.Err() != nil { return nil // Контекст отменён — штатное завершение } c.logger.Error("ошибка чтения сообщения", "error", err) continue } // Десериализуем событие var event Event if err := json.Unmarshal(msg.Value, &event); err != nil { c.logger.Error("ошибка десериализации", "error", err, "partition", msg.Partition, "offset", msg.Offset, ) // Коммитим offset даже для некорректных сообщений, // чтобы не застрять на одном месте c.reader.CommitMessages(ctx, msg) continue } // Обрабатываем событие if err := handler(ctx, event); err != nil { c.logger.Error("ошибка обработки события", "error", err, "event_id", event.EventID, "event_type", event.Type, ) // При ошибке можно: повторить, отправить в DLQ, или пропустить // В данном примере — пропускаем и коммитим // В продакшене стоит реализовать retry или DLQ } // Коммитим offset — подтверждаем обработку if err := c.reader.CommitMessages(ctx, msg); err != nil { c.logger.Error("ошибка коммита offset", "error", err) } c.logger.Debug("сообщение обработано", "topic", msg.Topic, "partition", msg.Partition, "offset", msg.Offset, "event_type", event.Type, ) }}// Close закрывает readerfunc (c *Consumer) Close() error { return c.reader.Close()}
Идемпотентная обработка сообщений
В Kafka гарантируется at-least-once доставка — это значит, что одно и то же сообщение может быть получено повторно (при ребалансировке, перезапуске). Нужно обеспечить идемпотентность обработки.
package dedupimport ( "context" "fmt" "time" "github.com/redis/go-redis/v9")// Deduplicator предотвращает повторную обработку сообщений// Использует Redis для хранения обработанных IDtype Deduplicator struct { client *redis.Client ttl time.Duration // время хранения записей prefix string // префикс ключа в Redis}// NewDeduplicator создаёт дедупликатор с Redis-бэкендомfunc NewDeduplicator(client *redis.Client, prefix string, ttl time.Duration) *Deduplicator { return &Deduplicator{ client: client, ttl: ttl, prefix: prefix, }}// IsProcessed проверяет, было ли сообщение уже обработаноfunc (d *Deduplicator) IsProcessed(ctx context.Context, messageID string) (bool, error) { key := fmt.Sprintf("%s:%s", d.prefix, messageID) // SETNX (SET if Not eXists) — атомарная операция // Возвращает true, если ключ был установлен (сообщение новое) // Возвращает false, если ключ уже существовал (дубликат) wasSet, err := d.client.SetNX(ctx, key, "1", d.ttl).Result() if err != nil { return false, fmt.Errorf("ошибка проверки дедупликации: %w", err) } // wasSet=true → ключ создан → сообщение НОВОЕ → NOT processed // wasSet=false → ключ уже был → сообщение ДУБЛИКАТ → processed return !wasSet, nil}// ProcessWithDedup оборачивает обработчик с проверкой на дубликатыfunc ProcessWithDedup( ctx context.Context, dedup *Deduplicator, messageID string, handler func() error,) error { processed, err := dedup.IsProcessed(ctx, messageID) if err != nil { return fmt.Errorf("ошибка дедупликации: %w", err) } if processed { // Сообщение уже обработано — пропускаем return nil } // Обрабатываем сообщение return handler()}
Когда использовать Kafka vs RabbitMQ vs NATS
Характеристика
Kafka
RabbitMQ
NATS
Модель
Лог (append-only)
Очередь (FIFO)
Pub/Sub + очереди
Хранение
На диске, configurable retention
До подтверждения
JetStream — на диске
Пропускная способность
Очень высокая (миллионы msg/s)
Высокая (десятки тысяч)
Очень высокая
Перечитывание
Да (по offset)
Нет
JetStream — да
Порядок
В пределах партиции
В пределах очереди
В пределах stream
Сложность
Высокая (ZooKeeper/KRaft)
Средняя
Низкая
Лучше всего для
Event sourcing, аналитика, стримы
Task queues, RPC, маршрутизация
Микросервисы, IoT, realtime
🏠 Домашнее задание
Реализуйте продюсера, который генерирует 10 000 событий с разными ключами. Проверьте, как распределяются сообщения по партициям.
Запустите 3 потребителя в одной consumer group и наблюдайте за распределением партиций. Остановите одного — что произойдёт при ребалансировке?
Реализуйте DLQ для Kafka: при ошибке обработки сообщение отправляется в отдельный топик <topic>.dlq.
Добавьте дедупликацию через Redis и проверьте, что при повторной обработке одного и того же сообщения обработчик не вызывается дважды.
6. NATS (nats.go)
NATS — это лёгкий, высокопроизводительный брокер сообщений, идеально подходящий для микросервисной архитектуры. Он прост в развёртывании (один бинарник), быстр (миллионы сообщений в секунду) и поддерживает несколько моделей коммуникации.
Core NATS: Publish/Subscribe
Базовый NATS — это «огонь и забудь» (fire-and-forget). Сообщения не сохраняются: если подписчика нет в момент публикации — сообщение потеряно.
package mainimport ( "context" "encoding/json" "fmt" "log/slog" "os" "os/signal" "time" "github.com/nats-io/nats.go")// Event — универсальная структура событияtype Event struct { Type string `json:"type"` Source string `json:"source"` // какой сервис отправил Data json.RawMessage `json:"data"` // данные в формате JSON EventID string `json:"event_id"` Timestamp time.Time `json:"timestamp"`}func main() { logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) // Подключаемся к NATS-серверу nc, err := nats.Connect( nats.DefaultURL, // nats://localhost:4222 nats.Name("my-service"), // имя клиента для мониторинга nats.ReconnectWait(2*time.Second), // интервал переподключения nats.MaxReconnects(60), // максимум попыток nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { logger.Warn("отключение от NATS", "error", err) }), nats.ReconnectHandler(func(nc *nats.Conn) { logger.Info("переподключение к NATS", "url", nc.ConnectedUrl()) }), ) if err != nil { logger.Error("ошибка подключения к NATS", "error", err) os.Exit(1) } defer nc.Close() logger.Info("подключено к NATS", "url", nc.ConnectedUrl()) // --- Подписка на события --- // Простая подписка — получаем все сообщения по субъекту sub, err := nc.Subscribe("orders.created", func(msg *nats.Msg) { var event Event if err := json.Unmarshal(msg.Data, &event); err != nil { logger.Error("ошибка десериализации", "error", err) return } logger.Info("получено событие", "type", event.Type, "event_id", event.EventID, ) }) if err != nil { logger.Error("ошибка подписки", "error", err) os.Exit(1) } defer sub.Unsubscribe() // Подписка с wildcard — * заменяет одно слово // orders.* → orders.created, orders.updated, orders.deleted nc.Subscribe("orders.*", func(msg *nats.Msg) { logger.Info("все события заказов", "subject", msg.Subject) }) // Подписка с wildcard — > заменяет одно или более слов // events.> → events.order.created, events.user.registered и т.д. nc.Subscribe("events.>", func(msg *nats.Msg) { logger.Debug("любое событие", "subject", msg.Subject) }) // --- Публикация события --- event := Event{ Type: "order.created", Source: "order-service", Data: json.RawMessage(`{"order_id":"ORD-001","amount":150.00}`), EventID: "evt-001", Timestamp: time.Now(), } data, _ := json.Marshal(event) if err := nc.Publish("orders.created", data); err != nil { logger.Error("ошибка публикации", "error", err) } // Ожидаем сигнал завершения ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() <-ctx.Done()}
Request/Reply (Запрос/Ответ)
NATS поддерживает паттерн «запрос-ответ» — аналог синхронного вызова, но через брокер. Это удобно для service-to-service RPC.
package mainimport ( "encoding/json" "fmt" "log/slog" "os" "time" "github.com/nats-io/nats.go")// UserRequest — запрос информации о пользователеtype UserRequest struct { UserID string `json:"user_id"`}// UserResponse — ответ с данными пользователяtype UserResponse struct { UserID string `json:"user_id"` Name string `json:"name"` Email string `json:"email"` Error string `json:"error,omitempty"` // пустое, если ошибки нет}func main() { logger := slog.Default() nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() // --- Сервер (обработчик запросов) --- nc.Subscribe("users.get", func(msg *nats.Msg) { var req UserRequest if err := json.Unmarshal(msg.Data, &req); err != nil { resp, _ := json.Marshal(UserResponse{Error: "invalid request"}) msg.Respond(resp) // Отвечаем с ошибкой return } // Имитация поиска пользователя в БД logger.Info("обработка запроса пользователя", "user_id", req.UserID) resp := UserResponse{ UserID: req.UserID, Name: "Иван Петров", Email: "ivan@example.com", } data, _ := json.Marshal(resp) // msg.Respond — отправляет ответ обратно запрашивающему // NATS автоматически создаёт уникальный reply-subject msg.Respond(data) }) // --- Клиент (отправитель запроса) --- // Формируем запрос reqData, _ := json.Marshal(UserRequest{UserID: "USR-001"}) // Request — отправляем запрос и ждём ответа с таймаутом reply, err := nc.Request("users.get", reqData, 5*time.Second) if err != nil { logger.Error("таймаут ожидания ответа", "error", err) os.Exit(1) } // Десериализуем ответ var userResp UserResponse json.Unmarshal(reply.Data, &userResp) fmt.Printf("Пользователь: %s (%s)\n", userResp.Name, userResp.Email)}
Queue Groups (Балансировка нагрузки)
Queue Group — это механизм распределения сообщений между подписчиками. Каждое сообщение доставляется ровно одному подписчику в группе (аналог consumer group в Kafka).
// Три экземпляра одного сервиса подписываются на одну группу// NATS автоматически распределяет сообщения между ними// Экземпляр 1nc.QueueSubscribe("orders.process", "order-workers", func(msg *nats.Msg) { fmt.Println("worker-1 обрабатывает:", string(msg.Data))})// Экземпляр 2nc.QueueSubscribe("orders.process", "order-workers", func(msg *nats.Msg) { fmt.Println("worker-2 обрабатывает:", string(msg.Data))})// Экземпляр 3nc.QueueSubscribe("orders.process", "order-workers", func(msg *nats.Msg) { fmt.Println("worker-3 обрабатывает:", string(msg.Data))})// Публикация — каждое сообщение попадёт только к ОДНОМУ workerfor i := 0; i < 9; i++ { nc.Publish("orders.process", []byte(fmt.Sprintf("order-%d", i)))}// Результат: worker-1 получит ~3, worker-2 ~3, worker-3 ~3
NATS JetStream (Персистентные сообщения)
JetStream — расширение NATS для гарантированной доставки. Сообщения сохраняются в потоках (streams) и могут быть перечитаны.
package mainimport ( "context" "encoding/json" "fmt" "log/slog" "os" "time" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream")func main() { logger := slog.Default() // Подключаемся к NATS nc, err := nats.Connect(nats.DefaultURL) if err != nil { logger.Error("ошибка подключения", "error", err) os.Exit(1) } defer nc.Close() // Создаём JetStream-контекст js, err := jetstream.New(nc) if err != nil { logger.Error("ошибка JetStream", "error", err) os.Exit(1) } ctx := context.Background() // --- Настройка Stream --- // Stream — хранилище сообщений для одного или нескольких субъектов stream, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ Name: "ORDERS", // имя потока Subjects: []string{"orders.>"}, // какие субъекты захватывать Storage: jetstream.FileStorage, // хранение на диске (надёжнее) // Можно также jetstream.MemoryStorage — быстрее, но пропадёт при перезапуске Retention: jetstream.LimitsPolicy, // удаление по лимитам MaxAge: 7 * 24 * time.Hour, // хранить 7 дней MaxMsgs: 1_000_000, // максимум 1 млн сообщений MaxBytes: 1 << 30, // максимум 1 ГБ Replicas: 1, // количество реплик (для кластера) }) if err != nil { logger.Error("ошибка создания stream", "error", err) os.Exit(1) } logger.Info("stream создан", "name", stream.CachedInfo().Config.Name) // --- Публикация сообщений --- for i := 0; i < 5; i++ { event := map[string]interface{}{ "order_id": fmt.Sprintf("ORD-%03d", i+1), "amount": float64(i+1) * 99.99, "status": "created", } data, _ := json.Marshal(event) // Publish в JetStream — ждём подтверждения записи ack, err := js.Publish(ctx, "orders.created", data) if err != nil { logger.Error("ошибка публикации", "error", err) continue } logger.Info("опубликовано", "stream", ack.Stream, "sequence", ack.Sequence, ) } // --- Consumer (Pull-based) --- // Pull Consumer — потребитель сам запрашивает сообщения consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ Name: "order-processor", // имя потребителя Durable: "order-processor", // durable — сохраняет позицию между перезапусками AckPolicy: jetstream.AckExplicitPolicy, // явное подтверждение FilterSubject: "orders.created", // фильтр по субъекту MaxDeliver: 5, // максимум повторных доставок AckWait: 30 * time.Second, // время на обработку до повторной доставки }) if err != nil { logger.Error("ошибка создания consumer", "error", err) os.Exit(1) } // Потребляем сообщения через итератор msgs, err := consumer.Messages() if err != nil { logger.Error("ошибка получения сообщений", "error", err) os.Exit(1) } // Обработка в горутине go func() { for msg := range msgs.Messages() { logger.Info("получено сообщение", "subject", msg.Subject(), "data", string(msg.Data()), ) // Подтверждаем обработку msg.Ack() } }() // Ожидаем 5 секунд и завершаем (для демонстрации) time.Sleep(5 * time.Second) msgs.Stop()}
Push vs Pull Consumer
Pull Consumer — потребитель сам запрашивает сообщения. Лучше контролирует нагрузку.
Push Consumer — NATS сам отправляет сообщения потребителю. Проще в использовании, но нужно следить за backpressure.
Для микросервисов обычно предпочтительнее Pull Consumer.
🏠 Домашнее задание
Реализуйте систему уведомлений через NATS: один сервис публикует события notifications.email, notifications.sms, notifications.push; три разных потребителя обрабатывают свой тип уведомлений.
Используйте JetStream для гарантированной доставки: при остановке потребителя и последующем запуске — он должен получить все пропущенные сообщения.
Реализуйте Request/Reply для получения статуса заказа: клиент отправляет ID заказа, сервис отвечает текущим статусом из своей БД.
7. Saga-паттерн
Проблема распределённых транзакций
В монолите у нас есть ACID-транзакции: начали транзакцию, выполнили несколько операций, закоммитили — всё атомарно. В микросервисах каждый сервис имеет свою базу данных (подробнее в 04-databases), и единой транзакции нет.
Пример: оформление заказа требует:
Создать заказ (order-service)
Списать деньги (payment-service)
Зарезервировать товар (inventory-service)
Отправить уведомление (notification-service)
Если оплата прошла, но товара нет на складе — нужно вернуть деньги. Saga решает эту проблему.
Два подхода
Choreography (Хореография) — каждый сервис слушает события и реагирует. Нет центрального координатора.
Orchestration (Оркестрация) — центральный координатор (оркестратор) управляет шагами и компенсациями.
Orchestrator:
1. POST /orders → order-service
2. POST /payments → payment-service
3. POST /inventory/reserve → inventory-service
4. Если шаг 3 fail:
POST /payments/refund → payment-service
POST /orders/cancel → order-service
Хореография vs Оркестрация
Хореография — проще для 2-3 шагов, но сложнее отслеживать flow при большом количестве сервисов. Сервисы связаны через события.
Оркестрация — централизованная логика, проще отлаживать, но оркестратор — единая точка отказа. Для сложных саг (5+ шагов) обычно выбирают оркестрацию. Смотрите также System Design для паттернов распределённых систем.
Реализация хореографии через NATS
package sagaimport ( "context" "encoding/json" "fmt" "log/slog" "time" "github.com/google/uuid" "github.com/nats-io/nats.go")// SagaStatus — статус сагиtype SagaStatus stringconst ( SagaStarted SagaStatus = "started" SagaCompleted SagaStatus = "completed" SagaFailed SagaStatus = "failed" SagaCompensating SagaStatus = "compensating")// SagaEvent — событие сагиtype SagaEvent struct { SagaID string `json:"saga_id"` // ID саги для корреляции Step string `json:"step"` // текущий шаг Status SagaStatus `json:"status"` // статус Data json.RawMessage `json:"data"` // данные Error string `json:"error,omitempty"` Timestamp time.Time `json:"timestamp"`}// --- Order Service ---// OrderService — сервис заказовtype OrderService struct { nc *nats.Conn logger *slog.Logger}// NewOrderService создаёт сервис заказов и подписывается на событияfunc NewOrderService(nc *nats.Conn, logger *slog.Logger) *OrderService { svc := &OrderService{nc: nc, logger: logger} // Подписка на запрос компенсации — отмена заказа nc.Subscribe("saga.order.compensate", func(msg *nats.Msg) { var event SagaEvent json.Unmarshal(msg.Data, &event) svc.compensateOrder(event) }) return svc}// CreateOrder начинает сагу — создаёт заказ и публикует событиеfunc (s *OrderService) CreateOrder(ctx context.Context, orderData map[string]interface{}) error { sagaID := uuid.New().String() s.logger.Info("создание заказа", "saga_id", sagaID) // Создаём заказ в локальной БД orderData["order_id"] = uuid.New().String() orderData["status"] = "pending" data, _ := json.Marshal(orderData) // Публикуем событие — запускаем сагу event := SagaEvent{ SagaID: sagaID, Step: "order_created", Status: SagaStarted, Data: data, Timestamp: time.Now(), } eventData, _ := json.Marshal(event) return s.nc.Publish("saga.order.created", eventData)}// compensateOrder — компенсирующая транзакция: отмена заказаfunc (s *OrderService) compensateOrder(event SagaEvent) { s.logger.Warn("компенсация: отмена заказа", "saga_id", event.SagaID) // Обновляем статус заказа в БД на "cancelled" // UPDATE orders SET status = 'cancelled' WHERE saga_id = ?}// --- Payment Service ---// PaymentService — сервис оплатыtype PaymentService struct { nc *nats.Conn logger *slog.Logger}// NewPaymentService создаёт сервис оплаты и подписывается на событияfunc NewPaymentService(nc *nats.Conn, logger *slog.Logger) *PaymentService { svc := &PaymentService{nc: nc, logger: logger} // Слушаем событие создания заказа — пытаемся провести оплату nc.Subscribe("saga.order.created", func(msg *nats.Msg) { var event SagaEvent json.Unmarshal(msg.Data, &event) svc.processPayment(event) }) // Слушаем запрос компенсации — делаем возврат nc.Subscribe("saga.payment.compensate", func(msg *nats.Msg) { var event SagaEvent json.Unmarshal(msg.Data, &event) svc.compensatePayment(event) }) return svc}// processPayment — обработка оплатыfunc (s *PaymentService) processPayment(event SagaEvent) { s.logger.Info("обработка оплаты", "saga_id", event.SagaID) // Имитация обработки платежа // В реальности — вызов платёжного шлюза paymentSuccess := true // Изменить на false для тестирования компенсации if !paymentSuccess { s.logger.Error("оплата не удалась", "saga_id", event.SagaID) // Оплата не прошла — запускаем компенсацию заказа failEvent := SagaEvent{ SagaID: event.SagaID, Step: "payment_failed", Status: SagaFailed, Error: "insufficient funds", Timestamp: time.Now(), } data, _ := json.Marshal(failEvent) s.nc.Publish("saga.order.compensate", data) return } // Оплата прошла — публикуем событие для следующего шага successEvent := SagaEvent{ SagaID: event.SagaID, Step: "payment_completed", Status: SagaStarted, Data: event.Data, Timestamp: time.Now(), } data, _ := json.Marshal(successEvent) s.nc.Publish("saga.payment.completed", data)}// compensatePayment — компенсирующая транзакция: возврат средствfunc (s *PaymentService) compensatePayment(event SagaEvent) { s.logger.Warn("компенсация: возврат средств", "saga_id", event.SagaID) // Возвращаем деньги пользователю // В реальности — вызов API платёжного шлюза для refund // После возврата — компенсируем заказ compensateEvent := SagaEvent{ SagaID: event.SagaID, Step: "payment_refunded", Status: SagaCompensating, Timestamp: time.Now(), } data, _ := json.Marshal(compensateEvent) s.nc.Publish("saga.order.compensate", data)}// --- Notification Service ---// NotificationService — сервис уведомленийtype NotificationService struct { nc *nats.Conn logger *slog.Logger}// NewNotificationService создаёт сервис уведомленийfunc NewNotificationService(nc *nats.Conn, logger *slog.Logger) *NotificationService { svc := &NotificationService{nc: nc, logger: logger} // Слушаем успешную оплату — отправляем уведомление nc.Subscribe("saga.payment.completed", func(msg *nats.Msg) { var event SagaEvent json.Unmarshal(msg.Data, &event) svc.sendNotification(event) }) // Слушаем события компенсации — уведомляем об отмене nc.Subscribe("saga.order.compensate", func(msg *nats.Msg) { var event SagaEvent json.Unmarshal(msg.Data, &event) svc.sendCancellationNotification(event) }) return svc}// sendNotification — отправка уведомления об успехеfunc (s *NotificationService) sendNotification(event SagaEvent) { s.logger.Info("отправка уведомления: заказ оплачен", "saga_id", event.SagaID, ) // Отправляем email/SMS/push}// sendCancellationNotification — уведомление об отменеfunc (s *NotificationService) sendCancellationNotification(event SagaEvent) { s.logger.Warn("отправка уведомления: заказ отменён", "saga_id", event.SagaID, "reason", event.Error, )}// --- Запуск всех сервисов ---func main() { logger := slog.Default() nc, err := nats.Connect(nats.DefaultURL) if err != nil { logger.Error("ошибка подключения к NATS", "error", err) return } defer nc.Close() // Запускаем все сервисы orderSvc := NewOrderService(nc, logger) NewPaymentService(nc, logger) NewNotificationService(nc, logger) // Создаём тестовый заказ — запускаем сагу orderSvc.CreateOrder(context.Background(), map[string]interface{}{ "user_id": "USR-001", "amount": 299.99, "items": []string{"item-1", "item-2"}, }) // Ждём обработки fmt.Println("Нажмите Enter для выхода...") fmt.Scanln()}
🏠 Домашнее задание
Добавьте в сагу шаг inventory-service (резервирование товара) между payment и notification. Реализуйте компенсацию при нехватке товара.
Реализуйте Saga Orchestrator — центральный координатор, который последовательно вызывает шаги и при ошибке запускает компенсации в обратном порядке.
Добавьте персистентное хранение состояния саги (в PostgreSQL): saga_id, текущий шаг, статус, время создания. Это позволит восстановить незавершённые саги после перезапуска.
8. Outbox-паттерн
Проблема двойной записи (Dual Write Problem)
Представьте: сервис должен сохранить данные в БД и опубликовать событие в брокер сообщений. Если сначала записать в БД, а потом в брокер — брокер может быть недоступен, и событие потеряется. Если сначала в брокер, потом в БД — БД может упасть, и данные не сохранятся.
Это и есть проблема двойной записи — нет атомарной операции, которая одновременно запишет в два разных хранилища.
Решение: Outbox Pattern
Идея: пишем всё в одну БД в рамках одной транзакции — и данные, и событие (в специальную таблицу outbox). Отдельный процесс (relay/publisher) читает из outbox и отправляет в брокер.
1. BEGIN TRANSACTION
INSERT INTO orders (...)
INSERT INTO outbox (event_type, payload, ...)
COMMIT
2. Outbox Relay (отдельная горутина):
SELECT * FROM outbox WHERE published = false
→ Публикует в Kafka/NATS/RabbitMQ
→ UPDATE outbox SET published = true WHERE id = ?
Схема таблицы Outbox
-- Таблица для хранения исходящих событийCREATE TABLE outbox ( id BIGSERIAL PRIMARY KEY, -- Тип агрегата (например, "order", "user", "payment") aggregate_type VARCHAR(255) NOT NULL, -- ID агрегата (например, ID заказа) aggregate_id VARCHAR(255) NOT NULL, -- Тип события (например, "order.created") event_type VARCHAR(255) NOT NULL, -- Данные события в формате JSON payload JSONB NOT NULL, -- Метаданные (trace_id, user_id и т.д.) metadata JSONB DEFAULT '{}', -- Было ли событие опубликовано published BOOLEAN DEFAULT FALSE, -- Время создания created_at TIMESTAMPTZ DEFAULT NOW(), -- Время публикации published_at TIMESTAMPTZ);-- Индекс для быстрого поиска неопубликованных событийCREATE INDEX idx_outbox_unpublished ON outbox (created_at) WHERE published = FALSE;
Реализация на Go
package outboximport ( "context" "database/sql" "encoding/json" "fmt" "log/slog" "time")// OutboxEvent — событие в таблице outboxtype OutboxEvent struct { ID int64 `json:"id"` AggregateType string `json:"aggregate_type"` AggregateID string `json:"aggregate_id"` EventType string `json:"event_type"` Payload json.RawMessage `json:"payload"` Metadata json.RawMessage `json:"metadata"` Published bool `json:"published"` CreatedAt time.Time `json:"created_at"`}// Repository — работа с outbox через БДtype Repository struct { db *sql.DB logger *slog.Logger}// NewRepository создаёт репозиторий outboxfunc NewRepository(db *sql.DB, logger *slog.Logger) *Repository { return &Repository{db: db, logger: logger}}// SaveWithOutbox сохраняет данные и событие в одной транзакции// execFn — функция, выполняющая основную бизнес-логику внутри транзакцииfunc (r *Repository) SaveWithOutbox( ctx context.Context, execFn func(tx *sql.Tx) error, event OutboxEvent,) error { // Начинаем транзакцию tx, err := r.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("ошибка начала транзакции: %w", err) } defer tx.Rollback() // Rollback если не был Commit // Выполняем основную бизнес-логику (INSERT/UPDATE в бизнес-таблицу) if err := execFn(tx); err != nil { return fmt.Errorf("ошибка бизнес-операции: %w", err) } // Сохраняем событие в outbox — в той же транзакции! _, err = tx.ExecContext(ctx, ` INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload, metadata) VALUES ($1, $2, $3, $4, $5) `, event.AggregateType, event.AggregateID, event.EventType, event.Payload, event.Metadata) if err != nil { return fmt.Errorf("ошибка записи в outbox: %w", err) } // Коммитим — обе записи атомарно if err := tx.Commit(); err != nil { return fmt.Errorf("ошибка коммита: %w", err) } r.logger.Info("данные и событие сохранены", "aggregate_type", event.AggregateType, "aggregate_id", event.AggregateID, "event_type", event.EventType, ) return nil}// FetchUnpublished возвращает неопубликованные события (для relay)func (r *Repository) FetchUnpublished(ctx context.Context, limit int) ([]OutboxEvent, error) { rows, err := r.db.QueryContext(ctx, ` SELECT id, aggregate_type, aggregate_id, event_type, payload, metadata, created_at FROM outbox WHERE published = FALSE ORDER BY created_at ASC LIMIT $1 FOR UPDATE SKIP LOCKED `, limit) // FOR UPDATE SKIP LOCKED — блокируем строки, чтобы другие relay не взяли те же события // SKIP LOCKED — пропускаем уже заблокированные строки if err != nil { return nil, fmt.Errorf("ошибка чтения outbox: %w", err) } defer rows.Close() var events []OutboxEvent for rows.Next() { var e OutboxEvent err := rows.Scan(&e.ID, &e.AggregateType, &e.AggregateID, &e.EventType, &e.Payload, &e.Metadata, &e.CreatedAt) if err != nil { return nil, fmt.Errorf("ошибка сканирования: %w", err) } events = append(events, e) } return events, rows.Err()}// MarkPublished помечает событие как опубликованноеfunc (r *Repository) MarkPublished(ctx context.Context, id int64) error { _, err := r.db.ExecContext(ctx, ` UPDATE outbox SET published = TRUE, published_at = NOW() WHERE id = $1 `, id) return err}// Publisher — интерфейс публикации событий в брокерtype Publisher interface { Publish(ctx context.Context, event OutboxEvent) error}// Relay — процесс, который читает из outbox и публикует в брокерtype Relay struct { repo *Repository publisher Publisher logger *slog.Logger interval time.Duration // интервал опроса batchSize int // размер батча}// NewRelay создаёт outbox relayfunc NewRelay(repo *Repository, publisher Publisher, logger *slog.Logger) *Relay { return &Relay{ repo: repo, publisher: publisher, logger: logger, interval: 500 * time.Millisecond, // опрашиваем каждые 500мс batchSize: 100, }}// Start запускает бесконечный цикл публикацииfunc (r *Relay) Start(ctx context.Context) { ticker := time.NewTicker(r.interval) defer ticker.Stop() r.logger.Info("outbox relay запущен", "interval", r.interval) for { select { case <-ctx.Done(): r.logger.Info("outbox relay остановлен") return case <-ticker.C: r.processBatch(ctx) } }}// processBatch обрабатывает один батч неопубликованных событийfunc (r *Relay) processBatch(ctx context.Context) { events, err := r.repo.FetchUnpublished(ctx, r.batchSize) if err != nil { r.logger.Error("ошибка чтения из outbox", "error", err) return } for _, event := range events { // Публикуем в брокер if err := r.publisher.Publish(ctx, event); err != nil { r.logger.Error("ошибка публикации события", "error", err, "event_id", event.ID, "event_type", event.EventType, ) continue // Пропускаем — попробуем в следующем цикле } // Помечаем как опубликованное if err := r.repo.MarkPublished(ctx, event.ID); err != nil { r.logger.Error("ошибка пометки события", "error", err, "event_id", event.ID) // Это не критично — в худшем случае событие опубликуется повторно // Потребитель должен обеспечить идемпотентность } r.logger.Debug("событие опубликовано из outbox", "event_id", event.ID, "event_type", event.EventType, ) }}
Гарантии outbox-паттерна
Outbox обеспечивает at-least-once доставку. Если relay упадёт после публикации, но до пометки published=true, событие опубликуется повторно. Поэтому потребители обязаны обрабатывать сообщения идемпотентно (см. раздел дедупликации в секции Kafka).
🏠 Домашнее задание
Реализуйте полный цикл: сервис создаёт заказ с outbox, relay публикует в NATS, потребитель обрабатывает событие.
Добавьте очистку старых опубликованных событий: DELETE FROM outbox WHERE published = TRUE AND published_at < NOW() - INTERVAL '7 days'.
Замените polling (опрос по таймеру) на PostgreSQL LISTEN/NOTIFY: при вставке в outbox — отправлять уведомление, relay реагирует мгновенно.
9. CQRS и Event Sourcing основы
CQRS (Command Query Responsibility Segregation)
CQRS — это разделение модели для записи (Command) и модели для чтения (Query). Вместо одной модели, которая обслуживает и записи, и чтения, мы используем две.
Зачем:
Чтение обычно требует денормализованных данных (JOIN’ы тормозят)
Запись требует нормализованных данных (целостность, валидация)
Нагрузка на чтение часто в 10-100 раз выше, чем на запись
Разные модели можно масштабировать независимо
package cqrsimport ( "context" "encoding/json" "time")// --- Command Side (Запись) ---// CreateOrderCommand — команда на создание заказаtype CreateOrderCommand struct { UserID string `json:"user_id"` Items []OrderItem `json:"items"`}type OrderItem struct { ProductID string `json:"product_id"` Quantity int `json:"quantity"` Price float64 `json:"price"`}// CommandHandler обрабатывает команды записиtype CommandHandler struct { eventStore EventStore // хранилище событий publisher EventPublisher // публикация в брокер}// HandleCreateOrder обрабатывает команду создания заказаfunc (h *CommandHandler) HandleCreateOrder(ctx context.Context, cmd CreateOrderCommand) error { // Валидация if len(cmd.Items) == 0 { return ErrEmptyOrder } // Вычисляем сумму var total float64 for _, item := range cmd.Items { total += item.Price * float64(item.Quantity) } // Создаём событие (не записываем напрямую в «текущее состояние») event := Event{ Type: "OrderCreated", AggregateID: generateID(), Data: OrderCreatedData{ UserID: cmd.UserID, Items: cmd.Items, Total: total, }, Timestamp: time.Now(), } // Сохраняем событие if err := h.eventStore.Save(ctx, event); err != nil { return err } // Публикуем для обновления модели чтения return h.publisher.Publish(ctx, event)}// --- Query Side (Чтение) ---// OrderView — денормализованное представление заказа для чтенияtype OrderView struct { OrderID string `json:"order_id"` UserID string `json:"user_id"` UserName string `json:"user_name"` // денормализовано (не нужен JOIN) Items []string `json:"items"` // названия товаров Total float64 `json:"total"` Status string `json:"status"` CreatedAt time.Time `json:"created_at"`}// QueryHandler обрабатывает запросы чтенияtype QueryHandler struct { readDB ReadDatabase // отдельная БД/таблица для чтения}// GetOrderByID — быстрое чтение без JOIN'овfunc (h *QueryHandler) GetOrderByID(ctx context.Context, orderID string) (*OrderView, error) { // Читаем из денормализованной модели — быстрый SELECT без JOIN return h.readDB.FindOrder(ctx, orderID)}// GetUserOrders — все заказы пользователяfunc (h *QueryHandler) GetUserOrders(ctx context.Context, userID string) ([]OrderView, error) { return h.readDB.FindOrdersByUser(ctx, userID)}// --- Проекция (синхронизация модели чтения) ---// Projector слушает события и обновляет модель чтенияtype Projector struct { readDB ReadDatabase}// HandleEvent обрабатывает событие и обновляет представлениеfunc (p *Projector) HandleEvent(ctx context.Context, event Event) error { switch event.Type { case "OrderCreated": var data OrderCreatedData json.Unmarshal(event.Data.(json.RawMessage), &data) // Создаём денормализованное представление view := OrderView{ OrderID: event.AggregateID, UserID: data.UserID, Total: data.Total, Status: "created", CreatedAt: event.Timestamp, } return p.readDB.SaveOrder(ctx, view) case "OrderPaid": return p.readDB.UpdateOrderStatus(ctx, event.AggregateID, "paid") case "OrderShipped": return p.readDB.UpdateOrderStatus(ctx, event.AggregateID, "shipped") default: return nil // Неизвестное событие — игнорируем }}
Eventual Consistency
В CQRS модель чтения обновляется асинхронно после записи. Это значит, что после создания заказа при немедленном чтении вы можете не увидеть его. Это нормально — «eventual consistency». Данные станут согласованными через короткий промежуток времени.
Event Sourcing
Event Sourcing — это хранение событий вместо текущего состояния. Текущее состояние объекта восстанавливается путём «проигрывания» всех его событий с начала.
package eventsourcingimport ( "context" "fmt" "time")// Event — базовое событиеtype Event struct { ID string `json:"id"` Type string `json:"type"` AggregateID string `json:"aggregate_id"` Data interface{} `json:"data"` Version int `json:"version"` // версия для оптимистичной блокировки Timestamp time.Time `json:"timestamp"`}// --- Агрегат Order ---// Order — агрегат заказа. Текущее состояние восстанавливается из событийtype Order struct { ID string UserID string Items []OrderItem Total float64 Status string Version int // текущая версия для конкурентного контроля}// Apply применяет событие к агрегату, обновляя его состояниеfunc (o *Order) Apply(event Event) { switch event.Type { case "OrderCreated": data := event.Data.(OrderCreatedData) o.ID = event.AggregateID o.UserID = data.UserID o.Items = data.Items o.Total = data.Total o.Status = "created" case "OrderPaid": o.Status = "paid" case "OrderShipped": o.Status = "shipped" case "OrderCancelled": o.Status = "cancelled" } o.Version = event.Version}// --- Event Store ---// EventStore — хранилище событийtype EventStore interface { // Save сохраняет новые события для агрегата Save(ctx context.Context, events []Event) error // Load загружает все события для агрегата Load(ctx context.Context, aggregateID string) ([]Event, error)}// LoadOrder восстанавливает состояние заказа из событийfunc LoadOrder(ctx context.Context, store EventStore, orderID string) (*Order, error) { // Загружаем все события для этого заказа events, err := store.Load(ctx, orderID) if err != nil { return nil, fmt.Errorf("ошибка загрузки событий: %w", err) } if len(events) == 0 { return nil, fmt.Errorf("заказ %s не найден", orderID) } // Создаём пустой агрегат и проигрываем все события order := &Order{} for _, event := range events { order.Apply(event) } return order, nil}// Пример последовательности событий для заказа ORD-001://// Version 1: OrderCreated { user_id: "U1", items: [...], total: 500 }// Version 2: OrderPaid { payment_id: "PAY-001" }// Version 3: OrderShipped { tracking: "TRACK-001" }//// LoadOrder("ORD-001") → Order{ Status: "shipped", Total: 500, ... }//// Преимущества:// - Полная история изменений (аудит)// - Можно восстановить состояние на любой момент времени// - Можно добавить новые проекции задним числом//// Недостатки:// - Сложнее реализовать// - Загрузка длинной цепочки событий может быть медленной (решается снэпшотами)// - Eventual consistency
🏠 Домашнее задание
Реализуйте простой EventStore на PostgreSQL: таблица events с полями id, aggregate_id, event_type, data (JSONB), version, created_at.
Добавьте механизм снэпшотов: после каждых 100 событий сохраняйте текущее состояние агрегата. При загрузке — начинайте с последнего снэпшота.
Реализуйте две проекции для одного потока событий: одна для REST API (денормализованная), другая для отчётов (агрегированная).
10. Observability (Наблюдаемость)
Observability — это способность понять внутреннее состояние системы по её внешним выходам. В микросервисной архитектуре это критически важно, потому что один пользовательский запрос проходит через множество сервисов.
Три столпа наблюдаемости: Логирование, Трейсинг, Метрики.
Структурированное логирование
Используем log/slog из стандартной библиотеки Go (с версии 1.21). Структурированные логи в формате JSON легко парсить и индексировать (ELK, Loki).
package loggingimport ( "context" "log/slog" "os")// contextKey — тип для ключей контекстаtype contextKey stringconst ( requestIDKey contextKey = "request_id" traceIDKey contextKey = "trace_id" userIDKey contextKey = "user_id")// SetupLogger создаёт настроенный логгерfunc SetupLogger(env string) *slog.Logger { var handler slog.Handler switch env { case "production": // JSON-формат для продакшена — парсится ELK/Loki handler = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelInfo, // Добавляем source (файл и строку) для отладки AddSource: true, }) default: // Текстовый формат для разработки — читается человеком handler = slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelDebug, }) } return slog.New(handler)}// WithRequestID добавляет request_id в контекстfunc WithRequestID(ctx context.Context, requestID string) context.Context { return context.WithValue(ctx, requestIDKey, requestID)}// WithTraceID добавляет trace_id в контекстfunc WithTraceID(ctx context.Context, traceID string) context.Context { return context.WithValue(ctx, traceIDKey, traceID)}// LoggerFromContext создаёт логгер с полями из контекста// Каждое лог-сообщение будет автоматически содержать request_id, trace_idfunc LoggerFromContext(ctx context.Context, base *slog.Logger) *slog.Logger { logger := base if requestID, ok := ctx.Value(requestIDKey).(string); ok { logger = logger.With("request_id", requestID) } if traceID, ok := ctx.Value(traceIDKey).(string); ok { logger = logger.With("trace_id", traceID) } if userID, ok := ctx.Value(userIDKey).(string); ok { logger = logger.With("user_id", userID) } return logger}// LoggerWithTrace создаёт логгер с trace_id для корреляции// логов, трейсов и метрикfunc LoggerWithTrace(base *slog.Logger, traceID, spanID string) *slog.Logger { return base.With( "trace_id", traceID, "span_id", spanID, )}// Пример использования://// logger := SetupLogger("production")// ctx = WithRequestID(ctx, "req-123")// ctx = WithTraceID(ctx, "trace-abc")//// log := LoggerFromContext(ctx, logger)// log.Info("обработка заказа",// "order_id", "ORD-001",// "amount", 299.99,// )//// Результат в JSON:// {"time":"2024-01-15T10:30:00Z","level":"INFO","msg":"обработка заказа",// "request_id":"req-123","trace_id":"trace-abc",// "order_id":"ORD-001","amount":299.99}
Альтернативы slog
Для высоконагруженных систем можно использовать zerolog (zero allocation) или zap (от Uber). Они быстрее slog, но slog — стандарт, и для большинства проектов его достаточно.
Трейсинг (OpenTelemetry)
Distributed Tracing позволяет отследить путь запроса через все сервисы. OpenTelemetry — стандарт для сбора трейсов.
package tracingimport ( "context" "fmt" "net/http" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.24.0" "go.opentelemetry.io/otel/trace")// InitTracer инициализирует OpenTelemetry tracerfunc InitTracer(ctx context.Context, serviceName, collectorURL string) (func(), error) { // Экспортёр — куда отправлять трейсы (Jaeger, Tempo, Zipkin) exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithEndpoint(collectorURL), otlptracegrpc.WithInsecure(), // для локальной разработки ) if err != nil { return nil, fmt.Errorf("ошибка создания экспортёра: %w", err) } // Ресурс — метаданные о сервисе res, err := resource.Merge( resource.Default(), resource.NewWithAttributes( semconv.SchemaURL, semconv.ServiceNameKey.String(serviceName), attribute.String("environment", "production"), ), ) if err != nil { return nil, fmt.Errorf("ошибка создания ресурса: %w", err) } // TracerProvider — фабрика трейсеров tp := sdktrace.NewTracerProvider( sdktrace.WithBatcher(exporter), // батч-отправка трейсов sdktrace.WithResource(res), sdktrace.WithSampler(sdktrace.AlwaysSample()), // сэмплируем все запросы ) // Устанавливаем глобальный TracerProvider otel.SetTracerProvider(tp) // Propagator — распространяет контекст трейса между сервисами // через HTTP-заголовки (traceparent, tracestate) otel.SetTextMapPropagator( propagation.NewCompositeTextMapPropagator( propagation.TraceContext{}, propagation.Baggage{}, ), ) // Функция завершения — вызвать при остановке сервиса shutdown := func() { tp.Shutdown(ctx) } return shutdown, nil}// TracingMiddleware — middleware для HTTP-сервера, создаёт span для каждого запросаfunc TracingMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { tracer := otel.Tracer("http-server") // Извлекаем trace context из входящих заголовков ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header)) // Создаём span для этого запроса ctx, span := tracer.Start(ctx, fmt.Sprintf("%s %s", r.Method, r.URL.Path), trace.WithAttributes( semconv.HTTPMethodKey.String(r.Method), semconv.HTTPURLKey.String(r.URL.String()), attribute.String("http.client_ip", r.RemoteAddr), ), ) defer span.End() // Передаём контекст с трейсом дальше next.ServeHTTP(w, r.WithContext(ctx)) })}// PropagateTrace добавляет trace context в исходящие HTTP-заголовки// Вызывается при запросе к другому сервисуfunc PropagateTrace(ctx context.Context, req *http.Request) { otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header))}// Пример: создание span внутри обработчика//// func (h *Handler) CreateOrder(ctx context.Context, order Order) error {// tracer := otel.Tracer("order-service")//// // Span для бизнес-логики// ctx, span := tracer.Start(ctx, "CreateOrder")// defer span.End()//// span.SetAttributes(// attribute.String("order.user_id", order.UserID),// attribute.Float64("order.total", order.Total),// )//// // Вложенный span для запроса к БД// ctx, dbSpan := tracer.Start(ctx, "db.InsertOrder")// err := h.repo.Save(ctx, order)// dbSpan.End()//// if err != nil {// span.RecordError(err) // записываем ошибку в span// return err// }//// return nil// }
Метрики (Prometheus)
Prometheus — стандарт де-факто для сбора метрик в микросервисах. Сервис экспортирует метрики по HTTP, Prometheus периодически их собирает (pull-model).
RED-метод — три ключевые метрики для каждого сервиса:
Rate — количество запросов в секунду
Errors — процент ошибок
Duration — время обработки запросов
package metricsimport ( "net/http" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp")// Metrics — набор метрик для HTTP-сервисаtype Metrics struct { // Counter — счётчик запросов (только растёт) RequestsTotal *prometheus.CounterVec // Histogram — распределение времени ответа // Позволяет считать перцентили (p50, p95, p99) RequestDuration *prometheus.HistogramVec // Gauge — текущее значение (может расти и уменьшаться) ActiveConnections prometheus.Gauge // Counter ошибок ErrorsTotal *prometheus.CounterVec}// NewMetrics создаёт и регистрирует метрикиfunc NewMetrics(serviceName string) *Metrics { return &Metrics{ // Количество HTTP-запросов с разбивкой по методу, пути и коду ответа RequestsTotal: promauto.NewCounterVec( prometheus.CounterOpts{ Name: serviceName + "_http_requests_total", Help: "Общее количество HTTP-запросов", }, []string{"method", "path", "status_code"}, ), // Время обработки запросов (гистограмма) // Бакеты определяют границы интервалов RequestDuration: promauto.NewHistogramVec( prometheus.HistogramOpts{ Name: serviceName + "_http_request_duration_seconds", Help: "Время обработки HTTP-запросов в секундах", // Бакеты: 5мс, 10мс, 25мс, 50мс, 100мс, 250мс, 500мс, 1с, 2.5с, 5с, 10с Buckets: prometheus.DefBuckets, }, []string{"method", "path"}, ), // Текущее количество активных соединений ActiveConnections: promauto.NewGauge( prometheus.GaugeOpts{ Name: serviceName + "_active_connections", Help: "Количество активных соединений", }, ), // Счётчик ошибок ErrorsTotal: promauto.NewCounterVec( prometheus.CounterOpts{ Name: serviceName + "_errors_total", Help: "Общее количество ошибок", }, []string{"type"}, // db_error, timeout, validation и т.д. ), }}// MetricsMiddleware — middleware для автоматического сбора метрикfunc MetricsMiddleware(m *Metrics, next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() // Увеличиваем счётчик активных соединений m.ActiveConnections.Inc() defer m.ActiveConnections.Dec() // Оборачиваем ResponseWriter для перехвата status code rw := &responseWriter{ResponseWriter: w, statusCode: http.StatusOK} // Выполняем запрос next.ServeHTTP(rw, r) // Записываем метрики duration := time.Since(start).Seconds() statusCode := http.StatusText(rw.statusCode) m.RequestsTotal.WithLabelValues(r.Method, r.URL.Path, statusCode).Inc() m.RequestDuration.WithLabelValues(r.Method, r.URL.Path).Observe(duration) })}// responseWriter — обёртка для перехвата status codetype responseWriter struct { http.ResponseWriter statusCode int}func (rw *responseWriter) WriteHeader(code int) { rw.statusCode = code rw.ResponseWriter.WriteHeader(code)}// SetupMetricsEndpoint регистрирует endpoint для Prometheusfunc SetupMetricsEndpoint(mux *http.ServeMux) { // /metrics — эндпоинт, который Prometheus будет опрашивать mux.Handle("/metrics", promhttp.Handler())}// Пример prometheus.yml для сбора метрик://// scrape_configs:// - job_name: 'order-service'// scrape_interval: 15s// static_configs:// - targets: ['order-service:8080']//// - job_name: 'user-service'// scrape_interval: 15s// static_configs:// - targets: ['user-service:8081']
Корреляция логов, трейсов и метрик
Ключ к эффективной отладке — связать все три столпа через trace_id. Когда вы видите аномалию в метриках, вы находите соответствующие трейсы, а через trace_id — логи всех сервисов, участвовавших в обработке.
package observabilityimport ( "context" "log/slog" "go.opentelemetry.io/otel/trace")// LoggerWithTrace создаёт логгер с trace_id и span_id// Это позволяет искать логи по trace_id из трейсингаfunc LoggerWithTrace(ctx context.Context, logger *slog.Logger) *slog.Logger { spanCtx := trace.SpanContextFromContext(ctx) if spanCtx.HasTraceID() { logger = logger.With( "trace_id", spanCtx.TraceID().String(), "span_id", spanCtx.SpanID().String(), ) } return logger}// Пример использования в обработчике://// func (h *Handler) CreateOrder(w http.ResponseWriter, r *http.Request) {// ctx := r.Context()// log := LoggerWithTrace(ctx, h.logger)//// log.Info("создание заказа", "user_id", "USR-001")// // Вывод: {"trace_id":"abc123","span_id":"def456","msg":"создание заказа","user_id":"USR-001"}//// // Этот trace_id совпадает с трейсом в Jaeger/Tempo// // и с метрикой в Prometheus (если добавить exemplar)// }
Три столпа наблюдаемости
Логи — что произошло (детали). slog → JSON → Loki/ELK
Трейсы — как запрос прошёл через сервисы (путь). OpenTelemetry → Jaeger/Tempo
Метрики — как система работает в целом (здоровье). Prometheus → Grafana
Связывайте их через trace_id для быстрой отладки.
🏠 Домашнее задание
Настройте slog с JSON-выводом и добавьте middleware, которое логирует каждый запрос с method, path, status_code, duration, request_id.
Добавьте Prometheus-метрики в ваш сервис: requests_total, request_duration_seconds, active_connections. Визуализируйте в Grafana.
Настройте OpenTelemetry с экспортом в Jaeger. Создайте span для HTTP-обработчика и вложенный span для запроса к БД.
Реализуйте LoggerWithTrace и убедитесь, что trace_id из логов совпадает с трейсом в Jaeger.
11. Конфигурация микросервисов
12-Factor App: Конфигурация из окружения
Принцип 12-Factor App гласит: конфигурация должна храниться в переменных окружения, а не в коде. Это позволяет использовать один и тот же артефакт (Docker-образ) в разных средах (dev, staging, production).
package configimport ( "fmt" "log/slog" "time" "github.com/caarlos0/env/v10")// Config — конфигурация микросервиса// Все поля заполняются из переменных окруженияtype Config struct { // Сервер ServerHost string `env:"SERVER_HOST" envDefault:"0.0.0.0"` ServerPort int `env:"SERVER_PORT" envDefault:"8080"` ReadTimeout time.Duration `env:"SERVER_READ_TIMEOUT" envDefault:"10s"` WriteTimeout time.Duration `env:"SERVER_WRITE_TIMEOUT" envDefault:"30s"` // База данных DatabaseURL string `env:"DATABASE_URL,required"` // обязательное поле DatabaseMaxConns int `env:"DATABASE_MAX_CONNS" envDefault:"20"` DatabaseMinConns int `env:"DATABASE_MIN_CONNS" envDefault:"5"` // Redis RedisURL string `env:"REDIS_URL" envDefault:"redis://localhost:6379"` RedisTTL time.Duration `env:"REDIS_TTL" envDefault:"1h"` // NATS NatsURL string `env:"NATS_URL" envDefault:"nats://localhost:4222"` NatsClusterID string `env:"NATS_CLUSTER_ID" envDefault:"my-cluster"` // Kafka KafkaBrokers []string `env:"KAFKA_BROKERS" envSeparator:"," envDefault:"localhost:9092"` KafkaGroupID string `env:"KAFKA_GROUP_ID" envDefault:"my-service"` // Наблюдаемость LogLevel string `env:"LOG_LEVEL" envDefault:"info"` OtelEndpoint string `env:"OTEL_ENDPOINT" envDefault:"localhost:4317"` // Приложение Environment string `env:"ENVIRONMENT" envDefault:"development"` ServiceName string `env:"SERVICE_NAME" envDefault:"my-service"`}// Addr возвращает адрес для прослушиванияfunc (c Config) Addr() string { return fmt.Sprintf("%s:%d", c.ServerHost, c.ServerPort)}// Load загружает конфигурацию из переменных окруженияfunc Load() (*Config, error) { cfg := &Config{} // env.Parse заполняет структуру из переменных окружения // Если обязательное поле не найдено — возвращает ошибку if err := env.Parse(cfg); err != nil { return nil, fmt.Errorf("ошибка загрузки конфигурации: %w", err) } return cfg, nil}// LogConfig логирует текущую конфигурацию (без секретов)func LogConfig(logger *slog.Logger, cfg *Config) { logger.Info("конфигурация загружена", "server_addr", cfg.Addr(), "environment", cfg.Environment, "log_level", cfg.LogLevel, "db_max_conns", cfg.DatabaseMaxConns, "redis_ttl", cfg.RedisTTL, "nats_url", cfg.NatsURL, "kafka_brokers", cfg.KafkaBrokers, // НЕ логируем DATABASE_URL и другие секреты! )}
Безопасность конфигурации
Никогда не логируйте секреты (пароли, API-ключи, connection strings с паролями). В продакшене используйте Vault, AWS Secrets Manager или Kubernetes Secrets для хранения чувствительных данных.
🏠 Домашнее задание
Создайте конфигурацию для вашего сервиса с помощью caarlos0/env. Добавьте валидацию: порт должен быть в диапазоне 1024-65535, environment — только development, staging, production.
Реализуйте загрузку конфигурации из YAML-файла с помощью viper, с возможностью перезаписи значений через переменные окружения.
12. Health Checks
В микросервисной архитектуре (особенно в Kubernetes) критически важно, чтобы каждый сервис сообщал о своём состоянии. Kubernetes использует два типа проверок:
Liveness Probe (/health) — сервис жив? Если нет — Kubernetes перезапустит контейнер.
Readiness Probe (/ready) — сервис готов принимать трафик? Если нет — Kubernetes перестанет направлять запросы.
package healthimport ( "context" "database/sql" "encoding/json" "net/http" "sync" "time" "github.com/redis/go-redis/v9")// Status — статус здоровьяtype Status stringconst ( StatusUp Status = "UP" StatusDown Status = "DOWN")// HealthResponse — ответ на health checktype HealthResponse struct { Status Status `json:"status"` Checks map[string]CheckResult `json:"checks,omitempty"` Uptime string `json:"uptime"`}// CheckResult — результат проверки одной зависимостиtype CheckResult struct { Status Status `json:"status"` Message string `json:"message,omitempty"` Duration string `json:"duration"`}// Checker — интерфейс для проверки здоровья зависимостиtype Checker interface { Name() string Check(ctx context.Context) CheckResult}// HealthHandler — обработчик health check запросовtype HealthHandler struct { startTime time.Time checkers []Checker}// NewHealthHandler создаёт обработчик с набором проверокfunc NewHealthHandler(checkers ...Checker) *HealthHandler { return &HealthHandler{ startTime: time.Now(), checkers: checkers, }}// LivenessHandler — /health — сервис жив?// Простая проверка — если процесс отвечает, он живfunc (h *HealthHandler) LivenessHandler(w http.ResponseWriter, r *http.Request) { resp := HealthResponse{ Status: StatusUp, Uptime: time.Since(h.startTime).String(), } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(resp)}// ReadinessHandler — /ready — сервис готов принимать трафик?// Проверяет все зависимости (БД, Redis, брокер)func (h *HealthHandler) ReadinessHandler(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() checks := make(map[string]CheckResult) overallStatus := StatusUp // Параллельная проверка всех зависимостей var mu sync.Mutex var wg sync.WaitGroup for _, checker := range h.checkers { wg.Add(1) go func(c Checker) { defer wg.Done() result := c.Check(ctx) mu.Lock() checks[c.Name()] = result if result.Status == StatusDown { overallStatus = StatusDown } mu.Unlock() }(checker) } wg.Wait() resp := HealthResponse{ Status: overallStatus, Checks: checks, Uptime: time.Since(h.startTime).String(), } w.Header().Set("Content-Type", "application/json") if overallStatus == StatusDown { w.WriteHeader(http.StatusServiceUnavailable) // 503 } else { w.WriteHeader(http.StatusOK) // 200 } json.NewEncoder(w).Encode(resp)}// --- Реализации Checker ---// PostgresChecker проверяет подключение к PostgreSQLtype PostgresChecker struct { db *sql.DB}func NewPostgresChecker(db *sql.DB) *PostgresChecker { return &PostgresChecker{db: db}}func (c *PostgresChecker) Name() string { return "postgresql" }func (c *PostgresChecker) Check(ctx context.Context) CheckResult { start := time.Now() if err := c.db.PingContext(ctx); err != nil { return CheckResult{ Status: StatusDown, Message: err.Error(), Duration: time.Since(start).String(), } } return CheckResult{ Status: StatusUp, Duration: time.Since(start).String(), }}// RedisChecker проверяет подключение к Redistype RedisChecker struct { client *redis.Client}func NewRedisChecker(client *redis.Client) *RedisChecker { return &RedisChecker{client: client}}func (c *RedisChecker) Name() string { return "redis" }func (c *RedisChecker) Check(ctx context.Context) CheckResult { start := time.Now() if err := c.client.Ping(ctx).Err(); err != nil { return CheckResult{ Status: StatusDown, Message: err.Error(), Duration: time.Since(start).String(), } } return CheckResult{ Status: StatusUp, Duration: time.Since(start).String(), }}// Пример регистрации://// healthHandler := health.NewHealthHandler(// health.NewPostgresChecker(db),// health.NewRedisChecker(redisClient),// )//// mux.HandleFunc("/health", healthHandler.LivenessHandler)// mux.HandleFunc("/ready", healthHandler.ReadinessHandler)//// Ответ /ready:// {// "status": "UP",// "checks": {// "postgresql": { "status": "UP", "duration": "1.2ms" },// "redis": { "status": "UP", "duration": "0.5ms" }// },// "uptime": "2h30m15s"// }
Kubernetes Probes
В Kubernetes health checks настраиваются в манифесте деплоймента:
Подробнее о развёртывании в Kubernetes — в 09-deploy.
🏠 Домашнее задание
Добавьте NATSChecker, который проверяет подключение к NATS-серверу.
Реализуйте /health/detailed — эндпоинт, доступный только для внутренних запросов (проверка по IP или токену), который возвращает расширенную информацию: количество горутин, потребление памяти, время работы.
Добавьте graceful shutdown: при получении SIGTERM сервис отвечает 503 на /ready, дожидается завершения текущих запросов и только потом останавливается.
13. Сквозной проект: Todo-сервис на микросервисах
Разобьём Todo-приложение из предыдущих глав на два микросервиса, общающихся через NATS. Подробнее о работе с базами данных — в 04-databases, о конкурентности — в 05-concurrency, о тестировании микросервисов — в 07-testing.
# todo-service/Dockerfile# --- Этап сборки ---FROM golang:1.22-alpine AS builderWORKDIR /app# Копируем go.mod и go.sum для кеширования зависимостейCOPY go.mod go.sum ./RUN go mod download# Копируем исходный кодCOPY . .# Собираем бинарник# CGO_ENABLED=0 — статическая линковка (не нужен libc в alpine)RUN CGO_ENABLED=0 GOOS=linux go build -o /todo-service ./cmd/main.go# --- Этап запуска ---FROM alpine:3.19# Сертификаты для HTTPS-запросовRUN apk --no-cache add ca-certificatesWORKDIR /app# Копируем только бинарник из этапа сборкиCOPY --from=builder /todo-service .# Непривилегированный пользовательRUN adduser -D -g '' appuserUSER appuserEXPOSE 8080CMD ["./todo-service"]
Миграция БД
-- todo-service/migrations/001_create_todos.sqlCREATE TABLE IF NOT EXISTS todos ( id VARCHAR(36) PRIMARY KEY, title VARCHAR(255) NOT NULL, description TEXT DEFAULT '', completed BOOLEAN DEFAULT FALSE, user_id VARCHAR(36) NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW());-- Индекс для поиска по пользователюCREATE INDEX IF NOT EXISTS idx_todos_user_id ON todos (user_id);-- Индекс для фильтрации по статусуCREATE INDEX IF NOT EXISTS idx_todos_completed ON todos (completed);
Запуск проекта
# Запуск всей инфраструктуры и сервисовdocker-compose up --build# Тестирование APIcurl -X POST http://localhost:8080/api/todos \ -H "Content-Type: application/json" \ -d '{"title":"Изучить микросервисы","user_id":"USR-001"}'# В логах notification-service появится:# {"level":"INFO","msg":"УВЕДОМЛЕНИЕ: Создана новая задача","todo_id":"...","title":"Изучить микросервисы"}# Мониторинг NATScurl http://localhost:8222/connz
🏠 Домашнее задание
Реализуйте полностью рабочий проект из этого раздела. Убедитесь, что при создании/обновлении/удалении задачи в логах notification-service появляются соответствующие сообщения.
Добавьте третий сервис — analytics-service, который подсчитывает статистику: сколько задач создано, сколько завершено, среднее время выполнения. Используйте NATS JetStream для гарантированной доставки.
Добавьте Outbox-паттерн в todo-service: вместо прямой публикации в NATS — записывайте событие в таблицу outbox в одной транзакции с бизнес-данными.
Добавьте Prometheus-метрики в оба сервиса и настройте сбор через docker-compose (добавьте контейнер Prometheus + Grafana).
Реализуйте трейсинг через OpenTelemetry: установите Jaeger в docker-compose, добавьте middleware для трейсинга в todo-service и пробросьте trace context через NATS-события в notification-service.
Напишите интеграционные тесты с использованием testcontainers-go: поднимите PostgreSQL и NATS в контейнерах и проверьте полный цикл: создание задачи → публикация события → получение уведомления. Подробнее о тестировании — в 07-testing.