Что такое Temporal и зачем он нужен
Представь типичный бизнес-процесс: пользователь оформляет заказ. Нужно проверить данные, зарезервировать товар на складе, списать деньги с карты, отправить посылку и уведомить покупателя. Каждый шаг - это вызов отдельного сервиса или API, и на каждом шаге что-то может пойти не так. Сервер может упасть, сеть может отвалиться, платёжный шлюз может вернуть таймаут.
В обычном приложении придётся самостоятельно решать все эти проблемы: сохранять состояние процесса в базе данных, писать логику повторных попыток, реализовывать откат при ошибках, следить за таймаутами. Код бизнес-логики быстро обрастает инфраструктурным шумом и становится сложным для понимания.
Temporal - это движок durable execution, который берёт всю эту работу на себя. Он гарантирует, что бизнес-процесс дойдёт до конца, даже если по пути случатся сбои. Ты описываешь процесс как обычную функцию на Go, а Temporal обеспечивает его надёжное выполнение.
Ключевая идея Temporal - разделение бизнес-логики и инфраструктурной надёжности. Ты пишешь код так, как будто всё работает идеально: вызвал функцию валидации, вызвал функцию оплаты, вызвал функцию отправки. Temporal незаметно для тебя сохраняет состояние после каждого шага и при сбое восстанавливает процесс ровно с того места, где он остановился.
Чем Temporal отличается от очередей задач
Очереди вроде RabbitMQ, Kafka или asynq решают более узкую задачу - доставить сообщение от отправителя к получателю. Но они не знают про состояние процесса. Если worker обработал три шага из пяти и упал, очередь не может восстановить процесс с четвёртого шага. Придётся самому хранить состояние, реализовывать стейт-машину и писать код восстановления.
Temporal принципиально отличается:
- Состояние workflow хранится на сервере Temporal, а не в коде приложения. Не нужно сохранять промежуточные результаты в базу данных
- Таймеры, ретраи, саги, сигналы и запросы встроены в платформу. Не нужно изобретать их заново для каждого проекта
- Детерминистичное воспроизведение - при перезапуске worker Temporal восстанавливает состояние, “проигрывая” историю событий. Workflow продолжается с того шага, на котором остановился
- Видимость - в любой момент можно запросить текущее состояние workflow, узнать на каком он шаге и что произошло
Архитектура Temporal
Прежде чем писать код, важно понять, из каких частей состоит Temporal и как они взаимодействуют.
┌─────────────────────────────────────────────┐
│ Temporal Server │
│ ┌─────────┐ ┌──────────┐ ┌───────────┐ │
│ │ History │ │ Matching │ │ Frontend │ │
│ │ Service │ │ Service │ │ Service │ │
│ └─────────┘ └──────────┘ └───────────┘ │
│ ↕ Task Queues │
└─────────────────────────────────────────────┘
↕ ↕
┌───────────┐ ┌──────────┐
│ Worker │ │ Client │
│ (Workflow │ │ (запуск, │
│ Activity)│ │ сигналы,│
│ │ │ запросы)│
└───────────┘ └──────────┘
Temporal Server - центральный компонент, который хранит состояние всех workflow и координирует выполнение. Внутри него работают несколько сервисов. History Service записывает каждое событие в историю workflow. Matching Service распределяет задачи по worker-ам через task queue. Frontend Service предоставляет API для клиентов.
Worker - это процесс на твоей стороне, который подключается к Temporal Server и выполняет реальный код workflow и activities. Worker-ов может быть несколько, они могут работать на разных машинах. Если один worker упадёт, Temporal перенаправит задачу на другой.
Client - код, который запускает workflow, отправляет сигналы и делает запросы. Обычно это HTTP-хендлер API или CLI-утилита.
Task Queue - именованная очередь, через которую Temporal Server распределяет задачи по worker-ам. Worker при запуске указывает, какую task queue он слушает. Это позволяет маршрутизировать разные типы задач на разные группы worker-ов.
Ключевые понятия
Workflow - функция на Go, которая описывает бизнес-процесс. Она оркестрирует activities, обрабатывает сигналы, может “засыпать” на часы и дни. Workflow обязана быть детерминистичной - об этом подробно ниже.
Activity - единица реальной работы с побочными эффектами. HTTP-запрос к платёжному шлюзу, запись в базу данных, отправка email - всё это activities. Они могут автоматически повторяться при сбоях.
Workflow ID - уникальный бизнес-идентификатор процесса. Например, order-12345. По нему можно найти workflow, отправить ему сигнал или запросить состояние.
Run ID - технический UUID конкретного запуска workflow. У одного Workflow ID может быть несколько Run ID, если workflow перезапускался.
Workflows
Что такое workflow и почему детерминизм
Workflow - это обычная функция Go, которая принимает workflow.Context и входные данные, а возвращает результат и ошибку. Внешне она выглядит как обычный код, но внутри действуют строгие правила.
Главное правило - детерминизм. Это означает, что если запустить workflow с теми же входными данными дважды, он должен пройти ровно те же шаги в том же порядке. Зачем это нужно? Temporal восстанавливает состояние workflow через replay - он “проигрывает” историю событий, чтобы восстановить все локальные переменные и понять, на каком шаге процесс остановился. Если код workflow изменился между запусками, replay сломается, потому что история событий не будет совпадать с новым кодом.
Из-за требования детерминизма внутри workflow нельзя использовать операции, результат которых может отличаться при повторном выполнении:
import (
"time"
"go.temporal.io/sdk/workflow"
)
// Внутри workflow НЕЛЬЗЯ:
// - вызывать time.Now(), time.Sleep() -> использовать workflow.Now(), workflow.Sleep()
// - делать HTTP-запросы, обращаться к БД -> выносить в activities
// - использовать math/rand -> использовать workflow.SideEffect()
// - работать с файловой системой -> выносить в activities
// - использовать горутины Go -> использовать workflow.Go()
// - использовать каналы Go -> использовать workflow.ChannelКаждый пункт имеет свою причину. time.Now() при replay вернёт другое время, чем при первом запуске. HTTP-запрос может вернуть другой ответ. math/rand вернёт другое число. Всё это нарушит детерминизм. Для каждой из этих операций Temporal предоставляет свой аналог, результат которого записывается в историю и при replay возвращается из неё.
Простой workflow
Рассмотрим workflow обработки заказа. Он последовательно выполняет три шага: валидация, оплата, отправка.
func OrderWorkflow(ctx workflow.Context, order Order) (OrderResult, error) {
logger := workflow.GetLogger(ctx)
logger.Info("order workflow started", "orderID", order.ID)
// Настройка параметров для activities
// Эти параметры определяют таймауты и политику повторных попыток
actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumAttempts: 3,
},
})
// Шаг 1: Валидация заказа
// ExecuteActivity запускает activity и возвращает Future
// .Get() блокирует workflow до получения результата
var validated OrderValidation
err := workflow.ExecuteActivity(actCtx, ValidateOrder, order).Get(ctx, &validated)
if err != nil {
return OrderResult{}, fmt.Errorf("validate: %w", err)
}
// Шаг 2: Списание оплаты
var payment PaymentResult
err = workflow.ExecuteActivity(actCtx, ChargePayment, order).Get(ctx, &payment)
if err != nil {
return OrderResult{}, fmt.Errorf("charge: %w", err)
}
// Шаг 3: Ожидание 24 часа перед отправкой
// workflow.Sleep не блокирует worker - Temporal записывает таймер
// в историю и "разбудит" workflow через 24 часа
workflow.Sleep(ctx, 24*time.Hour)
// Шаг 4: Отправка
var shipment ShipmentResult
err = workflow.ExecuteActivity(actCtx, ShipOrder, order).Get(ctx, &shipment)
if err != nil {
return OrderResult{}, fmt.Errorf("ship: %w", err)
}
return OrderResult{
PaymentID: payment.ID,
TrackingNo: shipment.TrackingNo,
}, nil
}Этот код выглядит как обычная последовательная программа, но за кулисами происходит магия. После каждого ExecuteActivity Temporal записывает результат в историю. Если worker упадёт после оплаты, но до отправки, Temporal запустит workflow на другом worker. При replay он прочитает из истории, что валидация и оплата уже выполнены, и продолжит с шага отправки. Activity оплаты не будет вызвана повторно - Temporal вернёт сохранённый результат.
Обрати внимание на workflow.Sleep(ctx, 24*time.Hour). В обычной программе это заблокировало бы поток на сутки. В Temporal это создаёт таймер на сервере. Worker свободен и может выполнять другие задачи. Через 24 часа Temporal “разбудит” workflow, и он продолжит выполнение.
Детерминизм workflow критически важен. Если код workflow изменился между запусками - например, добавился новый шаг между оплатой и отправкой - Temporal выдаст non-determinism error, потому что история событий не совпадает с текущим кодом. Для безопасных изменений используется версионирование через
workflow.GetVersion().
Версионирование workflow
Когда нужно изменить логику уже запущенных workflow, нельзя просто поменять код - это сломает replay. Temporal предоставляет механизм версионирования:
func OrderWorkflow(ctx workflow.Context, order Order) (OrderResult, error) {
actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
})
// GetVersion возвращает номер версии для этого workflow
// Первый аргумент - идентификатор изменения
// Второй - минимальная поддерживаемая версия
// Третий - текущая версия
v := workflow.GetVersion(ctx, "add-fraud-check", workflow.DefaultVersion, 1)
var validated OrderValidation
err := workflow.ExecuteActivity(actCtx, ValidateOrder, order).Get(ctx, &validated)
if err != nil {
return OrderResult{}, err
}
// Новый шаг добавлен только для версии 1+
// Старые workflow (DefaultVersion) пропустят этот блок
if v >= 1 {
err = workflow.ExecuteActivity(actCtx, CheckFraud, order).Get(ctx, nil)
if err != nil {
return OrderResult{}, err
}
}
// остальные шаги...
return OrderResult{}, nil
}При replay старых workflow GetVersion вернёт DefaultVersion, и блок с CheckFraud будет пропущен. Новые workflow получат версию 1 и выполнят проверку на мошенничество. Так можно безопасно эволюционировать логику, не ломая уже запущенные процессы.
Activities
Что такое activity и зачем она нужна
Activity - это место для кода с побочными эффектами. Всё, что нельзя делать внутри workflow, выносится в activity: HTTP-запросы, работа с базой данных, отправка email, чтение файлов, вызовы внешних API.
В отличие от workflow, activity - это обычная Go-функция с обычным context.Context. Внутри неё можно делать всё что угодно. Temporal запускает activity на worker, и если она упадёт или вернёт ошибку, автоматически повторит попытку на этом или другом worker согласно настроенной RetryPolicy.
import (
"context"
"go.temporal.io/sdk/activity"
)
// Activity валидации - вызывает внешний сервис
func ValidateOrder(ctx context.Context, order Order) (OrderValidation, error) {
// Внутри activity можно писать обычный Go-код
// HTTP-вызовы, обращения к БД, работа с файлами - всё разрешено
resp, err := validationService.Check(ctx, order)
if err != nil {
return OrderValidation{}, err
}
return OrderValidation{Valid: resp.IsValid}, nil
}
// Activity оплаты - вызывает платёжный API
func ChargePayment(ctx context.Context, order Order) (PaymentResult, error) {
result, err := stripeClient.Charge(order.Amount, order.Currency)
if err != nil {
// Если Stripe вернул ошибку, Temporal повторит попытку
// согласно RetryPolicy (через 1 сек, потом через 2 сек, потом через 4 сек)
return PaymentResult{}, err
}
return PaymentResult{ID: result.ID}, nil
}Ключевая идея: activity изолирует побочные эффекты от детерминистичного кода workflow. Результат каждой activity записывается в историю Temporal. При replay workflow activity не выполняется повторно - Temporal возвращает сохранённый результат.
Heartbeat для долгих activity
Если activity выполняется долго - например, обрабатывает большой файл или ждёт внешний процесс - нужен механизм, который сообщит Temporal, что activity ещё жива. Для этого используется heartbeat.
func ProcessLargeFile(ctx context.Context, path string) error {
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
lineCount := 0
for scanner.Scan() {
lineCount++
if lineCount%1000 == 0 {
// RecordHeartbeat сообщает Temporal, что activity работает
// Аргумент - прогресс, который можно использовать при перезапуске
activity.RecordHeartbeat(ctx, lineCount)
}
// обработка строки...
}
return scanner.Err()
}Heartbeat решает две задачи. Во-первых, если worker упадёт, Temporal заметит отсутствие heartbeat и перезапустит activity на другом worker. Без heartbeat Temporal узнает об этом только когда истечёт StartToCloseTimeout, что может быть долго. Во-вторых, значение heartbeat (в примере - номер обработанной строки) сохраняется и доступно при перезапуске через activity.GetHeartbeatDetails(), что позволяет продолжить обработку с того места, где остановились.
Таймауты activity
Activity options контролируют поведение выполнения. Понимание каждого таймаута важно для корректной настройки:
- StartToCloseTimeout - максимальное время выполнения одной попытки activity. Если activity не завершилась за это время, Temporal считает её упавшей и запускает повторную попытку. Это основной таймаут, который нужно указывать всегда
- ScheduleToCloseTimeout - максимальное время от момента, когда Temporal поставил activity в очередь, до момента её успешного завершения. Включает время ожидания в очереди и все повторные попытки. Используется, когда нужно ограничить общее время
- HeartbeatTimeout - если heartbeat не приходит за это время, Temporal считает activity зависшей. Актуально только для activity, которые вызывают
RecordHeartbeat - ScheduleToStartTimeout - максимальное время ожидания в очереди до начала выполнения. Полезно для обнаружения ситуаций, когда worker-ов недостаточно
actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
HeartbeatTimeout: 10 * time.Second,
ScheduleToCloseTimeout: 5 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumAttempts: 5,
},
})Повторные попытки (Retries)
Как работает RetryPolicy
Одно из главных преимуществ Temporal - автоматические повторные попытки. Не нужно писать циклы с time.Sleep и счётчиком попыток. Достаточно описать политику, и Temporal сделает всё сам.
retryPolicy := &temporal.RetryPolicy{
InitialInterval: time.Second, // первая задержка - 1 секунда
BackoffCoefficient: 2.0, // каждая следующая задержка в 2 раза больше
MaximumInterval: time.Minute, // но не больше 1 минуты
MaximumAttempts: 5, // максимум 5 попыток (0 = бесконечно)
NonRetryableErrorTypes: []string{ // эти ошибки не ретраить
"InvalidInput",
"InsufficientFunds",
},
}С такой политикой повторные попытки будут выполняться через 1 сек, 2 сек, 4 сек, 8 сек. Если activity каждый раз возвращает ошибку, после пятой попытки workflow получит ошибку и сможет её обработать.
Обрати внимание на NonRetryableErrorTypes. Не все ошибки имеет смысл повторять. Если пользователь указал невалидные данные или на счёте недостаточно средств, повторная попытка через 2 секунды ничего не изменит. Такие ошибки нужно пометить как non-retryable.
Маркировка ошибок как non-retryable
Внутри activity можно явно указать, что определённая ошибка не подлежит повторным попыткам:
func ChargePayment(ctx context.Context, order Order) error {
err := gateway.Charge(order.Amount)
if err == nil {
return nil
}
// Ошибка "недостаточно средств" - повторять бессмысленно
if errors.Is(err, ErrInsufficientFunds) {
return temporal.NewNonRetryableApplicationError(
"insufficient funds", "InsufficientFunds", err)
}
// Все остальные ошибки (сетевые, таймауты) будут повторяться
// согласно RetryPolicy
return err
}Это важный паттерн. Бизнес-ошибки (невалидные данные, нехватка средств, товар закончился) помечаются как non-retryable и сразу всплывают в workflow для обработки. Инфраструктурные ошибки (сеть, таймауты, временная недоступность сервиса) остаются retryable, и Temporal повторяет попытки.
Сигналы (Signals)
Зачем нужны сигналы
Сигналы решают задачу взаимодействия с работающим workflow извне. Представь: workflow обработки заказа дошёл до шага “ожидание одобрения менеджером”. Менеджер через веб-интерфейс нажимает кнопку “Одобрить”. Как передать это решение в работающий workflow?
Сигнал - это асинхронное сообщение, которое клиент отправляет в workflow. Workflow может ожидать сигнал, обрабатывать его и продолжать выполнение. Сигнал доставляется надёжно - даже если worker в момент отправки недоступен, сигнал будет доставлен, когда worker подключится.
Workflow с ожиданием сигнала
func ApprovalWorkflow(ctx workflow.Context, request Request) (string, error) {
// Создаём канал для приёма сигналов с именем "approval-signal"
// Имя - это строковый идентификатор, по которому клиент отправляет сигнал
approvalCh := workflow.GetSignalChannel(ctx, "approval-signal")
// Выполняем предварительную проверку
var checkResult CheckResult
err := workflow.ExecuteActivity(ctx, PreCheck, request).Get(ctx, &checkResult)
if err != nil {
return "", err
}
// Теперь нужно дождаться решения менеджера, но не вечно
// Selector - аналог select в Go, но для workflow
var approval ApprovalDecision
timerCtx, cancelTimer := workflow.WithCancel(ctx)
selector := workflow.NewSelector(ctx)
// Вариант 1: пришёл сигнал одобрения
selector.AddReceive(approvalCh, func(ch workflow.ReceiveChannel, more bool) {
ch.Receive(ctx, &approval)
cancelTimer() // отменяем таймер - сигнал уже получен
})
// Вариант 2: прошло 72 часа - таймаут
selector.AddFuture(workflow.NewTimer(timerCtx, 72*time.Hour), func(f workflow.Future) {
approval = ApprovalDecision{Approved: false, Reason: "timeout"}
})
// Select блокирует workflow до одного из событий
selector.Select(ctx)
if !approval.Approved {
return "", fmt.Errorf("rejected: %s", approval.Reason)
}
// Одобрено - продолжаем выполнение
err = workflow.ExecuteActivity(ctx, ExecuteRequest, request).Get(ctx, nil)
if err != nil {
return "", err
}
return "completed", nil
}Selector работает как select в Go, но для мира Temporal. Он ожидает либо сигнал из канала, либо срабатывание таймера. Какое событие произойдёт первым, то и обработается. Если менеджер одобрит заказ через час - workflow продолжит работу через час. Если за 72 часа никто не одобрит - workflow получит отказ по таймауту.
Отправка сигнала из клиента
Отправить сигнал в workflow можно из любого места - HTTP-хендлера, CLI-утилиты, другого workflow:
func sendApproval(c client.Client, workflowID string, decision ApprovalDecision) error {
// Первый аргумент - Workflow ID
// Второй - Run ID (пустая строка = текущий запуск)
// Третий - имя сигнала (должно совпадать с именем в GetSignalChannel)
// Четвёртый - данные сигнала
return c.SignalWorkflow(
context.Background(),
workflowID, "",
"approval-signal", decision,
)
}Запросы (Queries)
Зачем нужны запросы
Запросы позволяют прочитать текущее состояние workflow без побочных эффектов. Это нужно для отображения статуса в интерфейсе: на какой фазе находится заказ, какие шаги уже выполнены, какой трекинг-номер присвоен.
В отличие от сигналов, запросы не изменяют состояние workflow. Они просто читают переменные и возвращают результат. Запросы обрабатываются синхронно - клиент получает ответ сразу.
func OrderTrackingWorkflow(ctx workflow.Context, order Order) error {
// Локальные переменные хранят состояние
status := "created"
var trackingNo string
// Регистрируем query handler - функцию, которую клиент может вызвать
// "get-status" - имя запроса
err := workflow.SetQueryHandler(ctx, "get-status", func() (OrderStatus, error) {
return OrderStatus{
Status: status,
TrackingNo: trackingNo,
}, nil
})
if err != nil {
return err
}
// По мере выполнения workflow переменные обновляются
status = "processing"
err = workflow.ExecuteActivity(ctx, ProcessOrder, order).Get(ctx, nil)
if err != nil {
return err
}
status = "shipped"
err = workflow.ExecuteActivity(ctx, ShipOrder, order).Get(ctx, &trackingNo)
if err != nil {
return err
}
status = "delivered"
return nil
}Когда клиент делает query, Temporal вызывает зарегистрированный handler, который читает текущие значения переменных status и trackingNo. Если workflow находится на шаге “shipping”, query вернёт status: "shipped" и номер отслеживания.
// Клиентский код для запроса статуса
resp, err := c.QueryWorkflow(ctx, workflowID, "", "get-status")
if err != nil {
return OrderStatus{}, err
}
var status OrderStatus
err = resp.Get(&status)Query handler должен быть чистой функцией - только чтение переменных, никаких побочных эффектов. Нельзя запускать activities, писать в каналы или изменять состояние workflow внутри query handler.
Таймеры и ожидание
Почему таймеры Temporal отличаются от time.Sleep
В обычном Go-коде time.Sleep(24 * time.Hour) заблокирует горутину на сутки. Если процесс перезапустится, таймер потеряется. В Temporal таймеры работают иначе: workflow.Sleep создаёт запись в истории событий на сервере Temporal. Worker при этом свободен - он может выполнять другие задачи. Через указанное время Temporal отправит задачу worker-у, и workflow продолжит работу.
Это позволяет создавать workflow, которые длятся месяцами. Например, подписка с ежемесячным списанием или процесс онбординга нового клиента с напоминаниями через 3, 7 и 30 дней.
// workflow.Sleep - простое ожидание
workflow.Sleep(ctx, 24*time.Hour)
// workflow.NewTimer - создание таймера для использования в selector
// Полезно, когда нужно ждать таймер ИЛИ сигнал - что придёт первым
timer := workflow.NewTimer(ctx, 5*time.Minute)
// workflow.Await - ожидание условия
// Блокирует workflow до тех пор, пока функция-условие не вернёт true
// Проверяется каждый раз, когда workflow получает событие (сигнал, результат activity)
var approved bool
workflow.Await(ctx, func() bool { return approved })Selector - мультиплексирование событий
Selector - мощный инструмент, который позволяет ожидать сразу несколько событий и реагировать на первое из них. Это аналог конструкции select в Go, но работающий в мире Temporal.
selector := workflow.NewSelector(ctx)
// Ожидаем сигнал из канала
selector.AddReceive(signalCh, func(ch workflow.ReceiveChannel, more bool) {
var data SignalData
ch.Receive(ctx, &data)
// обработка сигнала
})
// Одновременно ожидаем таймер
selector.AddFuture(workflow.NewTimer(ctx, 10*time.Minute), func(f workflow.Future) {
// таймаут - сигнал не пришёл за 10 минут
})
// Select блокирует workflow до первого из событий
selector.Select(ctx)Selector часто используется в паттерне “ожидание с таймаутом”: workflow ждёт внешнее событие (сигнал, результат child workflow), но если событие не приходит за определённое время, выполняется альтернативная логика.
Дочерние Workflows
Когда использовать дочерние workflow
Иногда бизнес-процесс слишком сложен для одного workflow, или один и тот же подпроцесс используется в нескольких местах. В таких случаях workflow может запустить другие workflow как дочерние.
Типичные сценарии для дочерних workflow:
- Параллельная обработка нескольких заказов в рамках одного батча
- Выделение переиспользуемой логики (процесс оплаты используется в нескольких бизнес-процессах)
- Ограничение размера истории событий - для очень длинных процессов история может стать слишком большой, и её стоит разделить на несколько workflow
func ParentWorkflow(ctx workflow.Context, orders []Order) error {
// Запускаем дочерний workflow для каждого заказа
var futures []workflow.ChildWorkflowFuture
for _, order := range orders {
childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
// Workflow ID дочернего workflow - уникальный для каждого заказа
WorkflowID: fmt.Sprintf("order-%d", order.ID),
// Что делать с дочерним workflow, если родитель завершится
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_TERMINATE,
})
// ExecuteChildWorkflow запускает дочерний workflow асинхронно
// и возвращает Future - обещание результата
future := workflow.ExecuteChildWorkflow(childCtx, OrderWorkflow, order)
futures = append(futures, future)
}
// Ожидаем завершения всех дочерних workflows
for i, f := range futures {
var result OrderResult
if err := f.Get(ctx, &result); err != nil {
workflow.GetLogger(ctx).Error("child workflow failed",
"orderID", orders[i].ID, "err", err)
continue // ошибка одного заказа не останавливает остальные
}
}
return nil
}Все дочерние workflow запускаются параллельно - ExecuteChildWorkflow не блокирует выполнение, а возвращает Future. Блокировка происходит только в цикле .Get(), где мы ожидаем результаты.
ParentClosePolicy определяет, что происходит с дочерним workflow при завершении родителя:
- TERMINATE - немедленно завершить дочерний workflow. Используется, когда дочерний workflow не имеет смысла без родителя
- ABANDON - оставить дочерний workflow работать независимо. Используется, когда дочерний процесс должен завершиться в любом случае
- REQUEST_CANCEL - запросить отмену дочернего workflow. Даёт дочернему workflow шанс корректно завершиться
Saga Pattern
Проблема распределённых транзакций
В монолитной системе можно обернуть несколько операций в одну транзакцию БД: если что-то пошло не так, всё откатится. В распределённой системе такой возможности нет. Резервирование товара на складе, списание денег и создание доставки - это вызовы разных сервисов, и нет единой транзакции, которая их объединяет.
Saga решает эту проблему через компенсационные действия. Каждый успешный шаг регистрирует операцию отката. Если последующий шаг падает, все предыдущие шаги откатываются в обратном порядке.
func OrderSagaWorkflow(ctx workflow.Context, order Order) error {
actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 3},
})
// Список компенсаций - функций отката
// При ошибке они выполнятся в обратном порядке (стек)
var compensations []func(workflow.Context) error
// Шаг 1: Резервирование товара
var reservation ReservationResult
err := workflow.ExecuteActivity(actCtx, ReserveInventory, order).Get(ctx, &reservation)
if err != nil {
// Первый шаг - компенсировать нечего
return fmt.Errorf("reserve: %w", err)
}
// Регистрируем компенсацию: если следующие шаги упадут, отпустим резерв
compensations = append(compensations, func(ctx workflow.Context) error {
return workflow.ExecuteActivity(actCtx, ReleaseInventory, reservation.ID).Get(ctx, nil)
})
// Шаг 2: Списание оплаты
var payment PaymentResult
err = workflow.ExecuteActivity(actCtx, ChargePayment, order).Get(ctx, &payment)
if err != nil {
// Оплата не прошла - откатываем шаг 1 (отпускаем резерв)
compensate(ctx, compensations)
return fmt.Errorf("charge: %w", err)
}
// Регистрируем компенсацию: возврат денег
compensations = append(compensations, func(ctx workflow.Context) error {
return workflow.ExecuteActivity(actCtx, RefundPayment, payment.ID).Get(ctx, nil)
})
// Шаг 3: Отправка
err = workflow.ExecuteActivity(actCtx, ShipOrder, order).Get(ctx, nil)
if err != nil {
// Отправка не прошла - откатываем шаги 2 и 1
// RefundPayment, затем ReleaseInventory
compensate(ctx, compensations)
return fmt.Errorf("ship: %w", err)
}
// Шаг 4: Уведомление (не критично)
_ = workflow.ExecuteActivity(actCtx, NotifyCustomer, order).Get(ctx, nil)
return nil
}
// compensate выполняет компенсации в обратном порядке
func compensate(ctx workflow.Context, compensations []func(workflow.Context) error) {
for i := len(compensations) - 1; i >= 0; i-- {
if err := compensations[i](ctx); err != nil {
// Логируем ошибку компенсации, но продолжаем
// Temporal обеспечит ретраи для самих activities компенсации
workflow.GetLogger(ctx).Error("compensation failed", "err", err)
}
}
}Разберём сценарий ошибки на шаге 3. К этому моменту товар зарезервирован (шаг 1) и деньги списаны (шаг 2). Отправка провалилась. Функция compensate выполнит компенсации в обратном порядке: сначала RefundPayment (вернёт деньги), затем ReleaseInventory (отпустит резерв). Каждая компенсация - это activity, которая тоже может ретраиться при сбоях.
Компенсация шага 4 (NotifyCustomer) не регистрируется, потому что уведомление - некритичный шаг. Его ошибка игнорируется оператором
_. Если уведомление не отправилось, откатывать предыдущие шаги не нужно.
Worker
Что делает worker и как его настроить
Worker - это Go-процесс, который подключается к Temporal Server и говорит: “Я готов выполнять задачи из очереди order-processing”. Temporal Server раздаёт задачи worker-ам, а worker выполняет зарегистрированные workflow и activity функции.
Worker не хранит состояние - он stateless. Можно запустить несколько worker-ов на разных машинах, и Temporal будет распределять задачи между ними. Если один worker упадёт, задачи автоматически перенаправятся на другие.
package main
import (
"log"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
func main() {
// Подключение к Temporal Server
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
Namespace: "default",
})
if err != nil {
log.Fatal("unable to create client", err)
}
defer c.Close()
// Создание worker для конкретной task queue
// Все workflow и activities регистрируются на одной task queue
w := worker.New(c, "order-processing", worker.Options{
// Максимум 10 activities выполняются одновременно на этом worker
MaxConcurrentActivityExecutionSize: 10,
// Максимум 5 workflow tasks обрабатываются одновременно
MaxConcurrentWorkflowTaskExecutionSize: 5,
})
// Регистрация - worker узнаёт какие функции он умеет выполнять
w.RegisterWorkflow(OrderWorkflow)
w.RegisterWorkflow(OrderSagaWorkflow)
w.RegisterWorkflow(ApprovalWorkflow)
w.RegisterActivity(ValidateOrder)
w.RegisterActivity(ChargePayment)
w.RegisterActivity(RefundPayment)
w.RegisterActivity(ReserveInventory)
w.RegisterActivity(ReleaseInventory)
w.RegisterActivity(ShipOrder)
w.RegisterActivity(NotifyCustomer)
// Run блокирует выполнение и обрабатывает задачи из task queue
// InterruptCh() позволяет корректно завершить worker по SIGINT/SIGTERM
if err := w.Run(worker.InterruptCh()); err != nil {
log.Fatal("unable to start worker", err)
}
}Можно запустить несколько worker-ов для разных task queue. Например, тяжёлые activity обработки файлов можно вынести на отдельную task queue с worker-ами на мощных машинах, а лёгкие API-вызовы - на другую:
// Worker для API-операций
w1 := worker.New(c, "api-tasks", worker.Options{
MaxConcurrentActivityExecutionSize: 50,
})
w1.RegisterActivity(ValidateOrder)
w1.RegisterActivity(ChargePayment)
// Worker для тяжёлых задач (на другой машине)
w2 := worker.New(c, "heavy-tasks", worker.Options{
MaxConcurrentActivityExecutionSize: 3,
})
w2.RegisterActivity(ProcessLargeFile)
w2.RegisterActivity(GenerateReport)В workflow при вызове activity можно указать конкретную task queue:
heavyCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
TaskQueue: "heavy-tasks",
StartToCloseTimeout: 10 * time.Minute,
})
workflow.ExecuteActivity(heavyCtx, ProcessLargeFile, path).Get(ctx, nil)Клиент Temporal
Запуск workflow и взаимодействие с ним
Клиент - это код, который инициирует бизнес-процессы и взаимодействует с ними. Обычно клиент живёт в HTTP-хендлере API или в CLI-утилите.
import (
"context"
"fmt"
"go.temporal.io/sdk/client"
)
func createTemporalClient() (client.Client, error) {
return client.Dial(client.Options{
HostPort: "localhost:7233",
Namespace: "default",
})
}
// Запуск нового workflow
func startOrder(c client.Client, order Order) (string, error) {
options := client.StartWorkflowOptions{
// Workflow ID - бизнес-идентификатор
// Temporal гарантирует уникальность: если workflow с таким ID уже запущен,
// повторный запуск вернёт ошибку (защита от дублей)
ID: fmt.Sprintf("order-%s", order.ID),
TaskQueue: "order-processing",
}
// ExecuteWorkflow запускает workflow и возвращает handle для отслеживания
we, err := c.ExecuteWorkflow(context.Background(), options,
OrderProcessingWorkflow, order)
if err != nil {
return "", err
}
fmt.Printf("Workflow started: ID=%s RunID=%s\n", we.GetID(), we.GetRunID())
return we.GetID(), nil
}Workflow ID играет роль ключа идемпотентности. Если пользователь дважды нажмёт кнопку “Оформить заказ”, второй вызов ExecuteWorkflow с тем же ID вернёт ошибку, а не запустит дублирующий процесс.
Ожидание результата
// Синхронное ожидание результата workflow
func waitForOrder(c client.Client, workflowID string) (OrderStatus, error) {
// GetWorkflow получает handle по Workflow ID
we := c.GetWorkflow(context.Background(), workflowID, "")
// Get блокирует до завершения workflow и возвращает результат
var status OrderStatus
err := we.Get(context.Background(), &status)
return status, err
}Отмена workflow
// Запрос на отмену workflow
func cancelOrder(c client.Client, workflowID string) error {
return c.CancelWorkflow(context.Background(), workflowID, "")
}При отмене workflow получает ошибку ctx.Err() == context.Canceled. Если workflow написан корректно, он может выполнить cleanup-логику перед завершением.
Расширенные возможности
SideEffect - недетерминистичные операции
Иногда в workflow нужен результат операции, которая не является детерминистичной, но при этом не тянет на полноценную activity. Например, генерация UUID или получение текущего времени. Для этого используется SideEffect:
// SideEffect выполняется один раз, результат записывается в историю
// При replay возвращается сохранённый результат
var requestID string
encodedID := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return uuid.New().String()
})
encodedID.Get(&requestID)SideEffect выполняет функцию один раз и записывает результат в историю. При replay функция не вызывается - результат берётся из истории. Это сохраняет детерминизм.
workflow.Go - конкурентность внутри workflow
Внутри workflow нельзя использовать горутины Go (go func()), но можно использовать их аналог от Temporal:
func ParallelWorkflow(ctx workflow.Context, items []Item) error {
actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
})
// Запускаем обработку каждого элемента конкурентно
var futures []workflow.Future
for _, item := range items {
item := item // capture
future, settable := workflow.NewFuture(ctx)
workflow.Go(ctx, func(gCtx workflow.Context) {
var result ItemResult
err := workflow.ExecuteActivity(actCtx, ProcessItem, item).Get(gCtx, &result)
if err != nil {
settable.SetError(err)
} else {
settable.Set(result, nil)
}
})
futures = append(futures, future)
}
// Ожидаем все результаты
for _, f := range futures {
var result ItemResult
if err := f.Get(ctx, &result); err != nil {
return err
}
}
return nil
}Continue-As-New
Если workflow выполняется очень долго (например, бесконечный цикл обработки), история событий может стать слишком большой. Continue-As-New позволяет “перезапустить” workflow с чистой историей, сохранив состояние:
func RecurringWorkflow(ctx workflow.Context, state WorkflowState) error {
for i := 0; i < 100; i++ {
// Обработка очередной итерации
err := workflow.ExecuteActivity(ctx, ProcessBatch, state.Offset).Get(ctx, nil)
if err != nil {
return err
}
state.Offset += 100
state.Iterations++
workflow.Sleep(ctx, time.Minute)
}
// После 100 итераций перезапускаем workflow с обновлённым состоянием
// История событий обнуляется, но логическая цепочка сохраняется
return workflow.NewContinueAsNewError(ctx, RecurringWorkflow, state)
}Continue-As-New необходим для workflow, которые работают неограниченно долго. Без него история будет расти бесконечно, что приведёт к проблемам с производительностью. Практическое правило - делайте Continue-As-New каждые несколько тысяч событий.
Update (Temporal 1.21+)
Update - это комбинация сигнала и запроса. В отличие от сигнала, Update синхронный - клиент ожидает результат. В отличие от запроса, Update может изменять состояние workflow.
// Регистрация update handler в workflow
err := workflow.SetUpdateHandler(ctx, "add-item", func(ctx workflow.Context, item OrderItem) (OrderStatus, error) {
// Валидация - можно отклонить update
if item.Quantity <= 0 {
return OrderStatus{}, fmt.Errorf("invalid quantity")
}
// Изменение состояния workflow
order.Items = append(order.Items, item)
status.Phase = "updated"
return status, nil
})// Клиентский вызов update
handle, err := c.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: workflowID,
UpdateName: "add-item",
Args: []interface{}{newItem},
})
var result OrderStatus
err = handle.Get(ctx, &result)Полный пример - Order Processing
Этот пример объединяет все концепции: workflow с сигналами одобрения, activities, saga-компенсации, query для отслеживания статуса. Он показывает, как все части Temporal работают вместе в реальном бизнес-процессе.
Типы данных
type Order struct {
ID string
UserID string
Amount int64
Currency string
Items []OrderItem
}
type OrderItem struct {
ProductID string
Quantity int
}
type OrderStatus struct {
Phase string
Approved bool
PaymentID string
TrackingNo string
}Workflow
func OrderProcessingWorkflow(ctx workflow.Context, order Order) (OrderStatus, error) {
status := OrderStatus{Phase: "validating"}
// Query handler - позволяет клиенту в любой момент узнать статус заказа
workflow.SetQueryHandler(ctx, "status", func() (OrderStatus, error) {
return status, nil
})
actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 3},
})
// === Фаза 1: Валидация ===
if err := workflow.ExecuteActivity(actCtx, ValidateOrder, order).Get(ctx, nil); err != nil {
status.Phase = "validation_failed"
return status, err
}
// === Фаза 2: Одобрение (только для крупных заказов) ===
if order.Amount > 10000 {
status.Phase = "awaiting_approval"
approvalCh := workflow.GetSignalChannel(ctx, "manager-approval")
selector := workflow.NewSelector(ctx)
var approved bool
selector.AddReceive(approvalCh, func(ch workflow.ReceiveChannel, more bool) {
ch.Receive(ctx, &approved)
})
selector.AddFuture(workflow.NewTimer(ctx, 48*time.Hour), func(f workflow.Future) {
approved = false
})
selector.Select(ctx)
status.Approved = approved
if !approved {
status.Phase = "rejected"
return status, fmt.Errorf("order not approved")
}
} else {
status.Approved = true
}
// === Фаза 3: Saga - резервирование -> оплата -> отправка ===
var compensations []func(workflow.Context) error
status.Phase = "reserving"
var reservationID string
if err := workflow.ExecuteActivity(actCtx, ReserveInventory, order).Get(ctx, &reservationID); err != nil {
status.Phase = "reservation_failed"
return status, err
}
compensations = append(compensations, func(ctx workflow.Context) error {
return workflow.ExecuteActivity(actCtx, ReleaseInventory, reservationID).Get(ctx, nil)
})
status.Phase = "charging"
var paymentID string
if err := workflow.ExecuteActivity(actCtx, ChargePayment, order).Get(ctx, &paymentID); err != nil {
compensate(ctx, compensations)
status.Phase = "payment_failed"
return status, err
}
status.PaymentID = paymentID
compensations = append(compensations, func(ctx workflow.Context) error {
return workflow.ExecuteActivity(actCtx, RefundPayment, paymentID).Get(ctx, nil)
})
status.Phase = "shipping"
var trackingNo string
if err := workflow.ExecuteActivity(actCtx, ShipOrder, order).Get(ctx, &trackingNo); err != nil {
compensate(ctx, compensations)
status.Phase = "shipping_failed"
return status, err
}
status.TrackingNo = trackingNo
// === Фаза 4: Уведомление (не критично) ===
_ = workflow.ExecuteActivity(actCtx, NotifyCustomer, order).Get(ctx, nil)
status.Phase = "completed"
return status, nil
}Клиентский код
// Запуск workflow
func startOrder(c client.Client, order Order) (string, error) {
options := client.StartWorkflowOptions{
ID: fmt.Sprintf("order-%s", order.ID),
TaskQueue: "order-processing",
}
we, err := c.ExecuteWorkflow(context.Background(), options,
OrderProcessingWorkflow, order)
if err != nil {
return "", err
}
return we.GetID(), nil
}
// Одобрение заказа менеджером
func approveOrder(c client.Client, orderID string, approved bool) error {
return c.SignalWorkflow(context.Background(),
fmt.Sprintf("order-%s", orderID), "",
"manager-approval", approved)
}
// Запрос текущего статуса
func getOrderStatus(c client.Client, orderID string) (OrderStatus, error) {
resp, err := c.QueryWorkflow(context.Background(),
fmt.Sprintf("order-%s", orderID), "", "status")
if err != nil {
return OrderStatus{}, err
}
var status OrderStatus
err = resp.Get(&status)
return status, err
}Temporal workflow гарантирует, что заказ будет обработан до конца даже при падении worker-ов, перезапусках серверов или сетевых проблемах. Состояние (фаза, paymentID, trackingNo) восстанавливается автоматически через replay истории событий. Saga обеспечивает корректный откат при ошибках, а сигналы позволяют внешним системам взаимодействовать с процессом.
Тестирование workflow
Зачем отдельный фреймворк для тестирования
Workflow нельзя протестировать обычным go test без Temporal Server - ведь workflow.ExecuteActivity, workflow.Sleep и другие функции работают через SDK Temporal. Для тестирования Temporal предоставляет тестовое окружение, которое эмулирует сервер в памяти. Таймеры при этом не ждут реальное время - тестовое окружение “перематывает” время вперёд.
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.temporal.io/sdk/testsuite"
)
func TestOrderWorkflow_Success(t *testing.T) {
// Создаём тестовое окружение
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
// Мокаем activities - вместо реальных вызовов возвращаем фиксированные результаты
env.OnActivity(ValidateOrder, mock.Anything, mock.Anything).
Return(OrderValidation{Valid: true}, nil)
env.OnActivity(ChargePayment, mock.Anything, mock.Anything).
Return(PaymentResult{ID: "pay-123"}, nil)
env.OnActivity(ShipOrder, mock.Anything, mock.Anything).
Return(ShipmentResult{TrackingNo: "TRACK-456"}, nil)
// Запускаем workflow
order := Order{ID: "1", Amount: 5000}
env.ExecuteWorkflow(OrderProcessingWorkflow, order)
// Проверяем результат
assert.True(t, env.IsWorkflowCompleted())
assert.NoError(t, env.GetWorkflowError())
var result OrderStatus
assert.NoError(t, env.GetWorkflowResult(&result))
assert.Equal(t, "completed", result.Phase)
assert.Equal(t, "pay-123", result.PaymentID)
assert.Equal(t, "TRACK-456", result.TrackingNo)
}Тестирование с сигналами
func TestOrderWorkflow_Approval(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
// Мокаем activities
env.OnActivity(ValidateOrder, mock.Anything, mock.Anything).Return(nil, nil)
env.OnActivity(ReserveInventory, mock.Anything, mock.Anything).Return("res-1", nil)
env.OnActivity(ChargePayment, mock.Anything, mock.Anything).Return("pay-1", nil)
env.OnActivity(ShipOrder, mock.Anything, mock.Anything).Return("TRACK-1", nil)
env.OnActivity(NotifyCustomer, mock.Anything, mock.Anything).Return(nil)
// Регистрируем отложенный сигнал
// Через 1 час после начала workflow отправляем одобрение
env.RegisterDelayedCallback(func() {
env.SignalWorkflow("manager-approval", true)
}, time.Hour)
// Запускаем workflow с крупным заказом (>10000 - требует одобрения)
order := Order{ID: "2", Amount: 50000}
env.ExecuteWorkflow(OrderProcessingWorkflow, order)
assert.True(t, env.IsWorkflowCompleted())
assert.NoError(t, env.GetWorkflowError())
var result OrderStatus
env.GetWorkflowResult(&result)
assert.True(t, result.Approved)
assert.Equal(t, "completed", result.Phase)
}Тестирование таймаутов
func TestOrderWorkflow_ApprovalTimeout(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
env.OnActivity(ValidateOrder, mock.Anything, mock.Anything).Return(nil, nil)
// Не отправляем сигнал - проверяем что workflow отклонит заказ по таймауту
order := Order{ID: "3", Amount: 50000}
env.ExecuteWorkflow(OrderProcessingWorkflow, order)
// Workflow завершился с ошибкой - таймаут одобрения
assert.True(t, env.IsWorkflowCompleted())
assert.Error(t, env.GetWorkflowError())
}Тестовое окружение автоматически перематывает таймеры. workflow.NewTimer(ctx, 48*time.Hour) в тесте не ждёт двое суток, а срабатывает мгновенно, когда тестовое окружение понимает, что других событий нет.
Наблюдаемость и отладка
Temporal Web UI
Temporal поставляется с веб-интерфейсом, где можно увидеть все workflow, их состояние, историю событий и результаты. Это главный инструмент отладки. Для каждого workflow видно:
- Текущее состояние (Running, Completed, Failed, Cancelled, Timed Out)
- Полную историю событий - какие activities были запущены, с какими параметрами, что вернули
- Входные данные и результат workflow
- Pending activities и signals
Temporal CLI (tctl)
# Просмотр запущенных workflow
tctl workflow list
# Детали конкретного workflow
tctl workflow describe -w order-12345
# История событий
tctl workflow show -w order-12345
# Отправка сигнала из CLI
tctl workflow signal -w order-12345 --name manager-approval --input '"true"'
# Query статуса
tctl workflow query -w order-12345 --query_type status
# Отмена workflow
tctl workflow cancel -w order-12345
# Завершение workflow (принудительное)
tctl workflow terminate -w order-12345Частые ошибки и подводные камни
Non-determinism error
Самая частая ошибка при работе с Temporal. Возникает, когда код workflow изменился так, что replay не может восстановить состояние. Типичные причины:
- Добавление или удаление вызова activity без
GetVersion - Изменение порядка вызовов activity
- Использование
time.Now(),math/randили других недетерминистичных операций - Изменение типов параметров activity
Решение - всегда использовать workflow.GetVersion() при изменении логики workflow, у которого есть запущенные экземпляры.
Activity timeout слишком маленький
Если activity не укладывается в StartToCloseTimeout, Temporal считает её упавшей и запускает повторную попытку. При этом первая попытка может всё ещё работать. Это может привести к двойным списаниям или дублированию данных. Решение - ставить таймаут с запасом и использовать идемпотентные операции.
Забытый heartbeat для долгих activity
Если activity работает дольше StartToCloseTimeout, но не отправляет heartbeat, Temporal считает её зависшей. Для долгих операций необходимо использовать activity.RecordHeartbeat и настраивать HeartbeatTimeout.
Слишком большая история событий
Workflow, который работает месяцами с тысячами итераций, накапливает огромную историю. Это замедляет replay и увеличивает нагрузку на Temporal Server. Решение - использовать Continue-As-New для периодического сброса истории.
Блокировка горутин Go внутри workflow
Использование стандартных горутин и каналов Go внутри workflow ломает детерминизм. Только workflow.Go() и workflow.Channel безопасны для использования в workflow.