Graceful shutdown¶
Каждый сервис обязан корректно завершаться по сигналу: дренировать in-flight HTTP-запросы, доедать in-flight Kafka-сообщения, закрывать пулы. Цель — не терять данные и не отвечать ошибкой на запросы, которые уже начали обрабатываться, когда пришёл SIGTERM.
В k8s деплой с rolling update постоянно присылает SIGTERM старым pod'ам. Если shutdown сделан неаккуратно — каждый релиз это потерянные сообщения, 502-ки от gateway и зависшие coordinator-сессии в Kafka.
Содержание¶
- Trigger: SIGINT / SIGTERM
- Структура main.go shutdown
- Порядок shutdown — обратный к startup
- Таймаут на весь shutdown
- Liveness vs Readiness при shutdown
- Kafka consumer drain
- HTTP shutdown
- Background workers
- Errgroup паттерн
- Что не делать в shutdown
- Testing shutdown
- k8s
terminationGracePeriodSeconds - Чеклист на ревью shutdown-кода
- См. также
Trigger: SIGINT / SIGTERM¶
Сигналы ловим через signal.NotifyContext:
ctx, stop := signal.NotifyContext(context.Background(),
syscall.SIGINT, syscall.SIGTERM)
defer stop()
ctx отменяется, когда приходит сигнал. Все долгоживущие компоненты
смотрят на ctx.Done() и останавливаются.
defer stop() — обязательно, чтобы освободить внутренние ресурсы
пакета signal.
Структура main.go shutdown¶
Типовой скелет:
func run() error {
ctx, stop := signal.NotifyContext(context.Background(),
syscall.SIGINT, syscall.SIGTERM)
defer stop()
// bootstrap: config, deps, HTTP-server, Kafka-router, workers...
// см. errgroup паттерн ниже
g, gctx := errgroup.WithContext(ctx)
g.Go(func() error { return runHTTP(gctx, srv) })
g.Go(func() error { return router.Run(gctx) })
g.Go(func() error { return forwarder.Run(gctx) })
g.Go(func() error { return purgeWorker.Run(gctx) })
// ждём либо shutdown-сигнал, либо фатальную ошибку
if err := g.Wait(); err != nil && !errors.Is(err, context.Canceled) {
slog.Error("run terminated", "err", err)
}
// отдельный ctx на shutdown — чтобы не использовать уже отменённый
shutdownCtx, cancel := context.WithTimeout(context.Background(),
30*time.Second)
defer cancel()
return shutdownAll(shutdownCtx, srv, router, forwarder, pool, rdb)
}
Важно: shutdownCtx — свежий, с собственным таймаутом. Если
использовать отменённый ctx, все Shutdown(ctx) сразу отвалятся с
context.Canceled и ничего не дренируется.
Порядок shutdown — обратный к startup¶
Cleanup делается в обратной последовательности:
- Остановить приём новых HTTP-запросов.
srv.Shutdown(ctx)—net/httpсам перестаёт accept'ить новые connections и ждёт завершения in-flight handler'ов. - Остановить Kafka consumer.
router.Close()закрывает subscriber; Watermill дренирует in-flight сообщения перед возвратом. - Остановить background workers (outbox forwarder, purge-jobs,
cron-like worker'ы). Они смотрят на
ctx.Done()и выходят. - Закрыть Kafka publisher. Флашит pending producer-буферы.
- Закрыть DB pool.
pool.Close()ждёт завершения active-queries, потом закрывает соединения. - Закрыть Redis client.
Если перепутать порядок (сначала закрыть pool, потом остановить HTTP),
handler во время дренирования начнёт получать use of closed pool и
отвечать 500.
Таймаут на весь shutdown¶
30 секунд — стандарт. Этого достаточно для drain'а HTTP и Kafka в
нормальной нагрузке. Если сервис не уложился — os.Exit(1):
ok := make(chan struct{})
go func() {
shutdownAll(shutdownCtx, ...)
close(ok)
}()
select {
case <-ok:
slog.Info("shutdown complete")
case <-shutdownCtx.Done():
slog.Error("shutdown timeout, forcing exit")
os.Exit(1)
}
Exit-код 1 сигнализирует k8s «этот pod не завершился нормально» и попадает в метрики.
Liveness vs Readiness при shutdown¶
Пока сервис shutdown'ится, надо перестать принимать новые запросы, но доотвечать на существующие. Это делается через readiness:
/readyz→ 503 сразу, как только пришёл сигнал. Gateway/k8s перестают роутить новый трафик./healthz→ продолжает отвечать 200, пока процесс жив. Иначе k8s решит, что pod мёртвый, и убьёт его SIGKILL'ом, не дождавшись дренирования.
Реализация:
type Health struct {
ready atomic.Bool
// ...
}
func (h *Health) Ready(w http.ResponseWriter, r *http.Request) {
if !h.ready.Load() {
writeError(w, http.StatusServiceUnavailable, "not_ready", "shutting down")
return
}
// ... обычные ping-проверки
}
// в shutdown-обработчике
health.ready.Store(false)
// даём gateway'ю 2-3 секунды заметить readyz=503
time.Sleep(3 * time.Second)
// и только потом начинаем закрывать
srv.Shutdown(shutdownCtx)
Подробнее про health endpoint'ы — http-api.md.
Kafka consumer drain¶
Watermill router.Close():
- закрывает subscriber (новые сообщения не поступают),
- дожидается завершения in-flight handler'ов (в пределах
CloseTimeout), - коммитит offset'ы успешных сообщений.
Конфигурация таймаута:
router, err := message.NewRouter(message.RouterConfig{
CloseTimeout: 20 * time.Second,
}, watermillLogger)
CloseTimeout должен быть меньше общего shutdown-таймаута — иначе
другим компонентам не останется времени.
См. events.md про Watermill router в деталях.
HTTP shutdown¶
srv.Shutdown(ctx):
- перестаёт accept'ить новые connections немедленно,
- ждёт
ctx.Done()или завершения активных handler'ов, - возвращает
nilпри успешном drain или ошибку по таймауту.
func shutdownHTTP(ctx context.Context, srv *http.Server) error {
if err := srv.Shutdown(ctx); err != nil {
return fmt.Errorf("http shutdown: %w", err)
}
return nil
}
В handler'е обязательно смотри на ctx.Done() для долгих операций
(SQL, HTTP-клиенты, Kafka-publish). Иначе handler будет игнорировать
shutdown и досидит до SIGKILL.
Background workers¶
Каждый worker принимает ctx и выходит по <-ctx.Done():
func (w *Forwarder) Run(ctx context.Context) error {
ticker := time.NewTicker(w.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return w.drain(context.Background()) // финальный drain перед выходом
case <-ticker.C:
if err := w.tick(ctx); err != nil {
w.log.Error("forwarder tick", "err", err)
}
}
}
}
Правила:
- Любой долгий цикл обязан иметь
case <-ctx.Done(). - Перед выходом — финальный flush/drain (например, отправить накопленные batch'и).
- Не делай бесконечных retry без учёта
ctx— зависнешь на shutdown.
Errgroup паттерн¶
Все долгоживущие компоненты запускаются через
golang.org/x/sync/errgroup:
g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("http serve: %w", err)
}
return nil
})
g.Go(func() error {
return router.Run(gctx)
})
g.Go(func() error {
return forwarder.Run(gctx)
})
if err := g.Wait(); err != nil {
slog.Error("run failed", "err", err)
}
Что это даёт:
- Падение любого из компонентов отменяет общий
gctx— остальные тоже начинают shutdown. Это правильно: сервис без kafka- consumer'а не должен продолжать принимать HTTP-запросы и делать вид, что всё нормально. g.Wait()вернёт первую нетривиальную ошибку.- Добавить нового worker'а — одна строчка
g.Go(...).
Что не делать в shutdown¶
os.Exit(0)вdefer. Это обрывает остальныеdeferи cleanup не завершается.- Долгие ожидания внешних систем без таймаута.
kafka.Publish()без context'а может висеть навсегда, если broker недоступен — сервис не остановится никогда. - Удаление временных файлов, которые нужны для post-mortem (core dump, перенаправленные логи). Они переживают pod.
- Отправку «прощальных» событий. Если сервис падает, Kafka может быть уже недоступен; событие уйдёт в outbox и его отправит forwarder следующей инстанции — этого достаточно. Дополнительных «прощаний» не нужно.
log.Fatalв shutdown-хэндлере. Он вызываетos.Exit(1)минуя остальные defer'ы. Логируй ошибку и возвращай её наверх.
Testing shutdown¶
Integration-тест shutdown:
- Старт сервиса in-process.
- Посылка in-flight HTTP-запроса на endpoint с искусственной задержкой (например, 2 секунды).
- Параллельно — отправка SIGTERM процессу (через
p.Signal(os.Interrupt)для in-process или черезdocker kill -s TERM). - Проверки:
/readyz→ 503 в пределах секунды.- in-flight запрос завершается успешно (200, не 502/503).
/healthz→ 200 до самого конца.- процесс завершается за < 30 секунд с exit-кодом 0.
Шаблон в репозитории сервиса user, файл
cmd/server/shutdown_test.go (если есть; иначе скопируй из другого
сервиса и адаптируй).
k8s terminationGracePeriodSeconds¶
В Deployment манифесте выставляй поле так, чтобы оно было больше
shutdown-таймаута сервиса:
Если k8s-grace-period меньше, чем сервисный таймаут, pod получит SIGKILL до того, как дренирование закончится. Буфер в 10–15 секунд покрывает retry-loop'ы gateway'я, который перестал получать readyz=200.
Соответствующий pre-stop hook (опционально):
preStop выполняется до SIGTERM: k8s-endpoint controller успевает
распространить «pod не ready» по iptables раньше, чем сервис начнёт
шатдаун. Это сглаживает окно в 2-3 секунды, когда gateway всё ещё шлёт
трафик.
Чеклист на ревью shutdown-кода¶
-
signal.NotifyContextвmain.go. - Все worker'ы и router запущены через
errgroup.WithContext. - Shutdown-ctx создаётся свежим с явным таймаутом 30s.
-
/readyzпереключается в 503 до начала закрытия HTTP. -
/healthzостаётся 200 до самого конца. - Порядок cleanup: HTTP → Kafka consumer → workers → Kafka publisher → DB pool → Redis.
-
os.Exit(1)срабатывает при превышении shutdown-таймаута. - В каждом worker'е есть
case <-ctx.Done(). - Integration-тест на shutdown присутствует.
- В k8s-манифесте
terminationGracePeriodSeconds> shutdown timeout сервиса.
См. также¶
events.md— Watermill router drain, Kafka consumer lifecycle.../patterns/outbox.md— корректный drain forwarder'а при shutdown.configuration.md—terminationGracePeriodSecondsи shutdown-таймауты.