Перейти к содержанию

Graceful shutdown

Каждый сервис обязан корректно завершаться по сигналу: дренировать in-flight HTTP-запросы, доедать in-flight Kafka-сообщения, закрывать пулы. Цель — не терять данные и не отвечать ошибкой на запросы, которые уже начали обрабатываться, когда пришёл SIGTERM.

В k8s деплой с rolling update постоянно присылает SIGTERM старым pod'ам. Если shutdown сделан неаккуратно — каждый релиз это потерянные сообщения, 502-ки от gateway и зависшие coordinator-сессии в Kafka.

Содержание

Trigger: SIGINT / SIGTERM

Сигналы ловим через signal.NotifyContext:

ctx, stop := signal.NotifyContext(context.Background(),
    syscall.SIGINT, syscall.SIGTERM)
defer stop()

ctx отменяется, когда приходит сигнал. Все долгоживущие компоненты смотрят на ctx.Done() и останавливаются.

defer stop() — обязательно, чтобы освободить внутренние ресурсы пакета signal.

Структура main.go shutdown

Типовой скелет:

func run() error {
    ctx, stop := signal.NotifyContext(context.Background(),
        syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    // bootstrap: config, deps, HTTP-server, Kafka-router, workers...
    // см. errgroup паттерн ниже

    g, gctx := errgroup.WithContext(ctx)
    g.Go(func() error { return runHTTP(gctx, srv) })
    g.Go(func() error { return router.Run(gctx) })
    g.Go(func() error { return forwarder.Run(gctx) })
    g.Go(func() error { return purgeWorker.Run(gctx) })

    // ждём либо shutdown-сигнал, либо фатальную ошибку
    if err := g.Wait(); err != nil && !errors.Is(err, context.Canceled) {
        slog.Error("run terminated", "err", err)
    }

    // отдельный ctx на shutdown — чтобы не использовать уже отменённый
    shutdownCtx, cancel := context.WithTimeout(context.Background(),
        30*time.Second)
    defer cancel()

    return shutdownAll(shutdownCtx, srv, router, forwarder, pool, rdb)
}

Важно: shutdownCtxсвежий, с собственным таймаутом. Если использовать отменённый ctx, все Shutdown(ctx) сразу отвалятся с context.Canceled и ничего не дренируется.

Порядок shutdown — обратный к startup

Cleanup делается в обратной последовательности:

  1. Остановить приём новых HTTP-запросов. srv.Shutdown(ctx)net/http сам перестаёт accept'ить новые connections и ждёт завершения in-flight handler'ов.
  2. Остановить Kafka consumer. router.Close() закрывает subscriber; Watermill дренирует in-flight сообщения перед возвратом.
  3. Остановить background workers (outbox forwarder, purge-jobs, cron-like worker'ы). Они смотрят на ctx.Done() и выходят.
  4. Закрыть Kafka publisher. Флашит pending producer-буферы.
  5. Закрыть DB pool. pool.Close() ждёт завершения active-queries, потом закрывает соединения.
  6. Закрыть Redis client.

Если перепутать порядок (сначала закрыть pool, потом остановить HTTP), handler во время дренирования начнёт получать use of closed pool и отвечать 500.

Таймаут на весь shutdown

30 секунд — стандарт. Этого достаточно для drain'а HTTP и Kafka в нормальной нагрузке. Если сервис не уложился — os.Exit(1):

ok := make(chan struct{})
go func() {
    shutdownAll(shutdownCtx, ...)
    close(ok)
}()
select {
case <-ok:
    slog.Info("shutdown complete")
case <-shutdownCtx.Done():
    slog.Error("shutdown timeout, forcing exit")
    os.Exit(1)
}

Exit-код 1 сигнализирует k8s «этот pod не завершился нормально» и попадает в метрики.

Liveness vs Readiness при shutdown

Пока сервис shutdown'ится, надо перестать принимать новые запросы, но доотвечать на существующие. Это делается через readiness:

  • /readyz → 503 сразу, как только пришёл сигнал. Gateway/k8s перестают роутить новый трафик.
  • /healthz → продолжает отвечать 200, пока процесс жив. Иначе k8s решит, что pod мёртвый, и убьёт его SIGKILL'ом, не дождавшись дренирования.

Реализация:

type Health struct {
    ready atomic.Bool
    // ...
}

func (h *Health) Ready(w http.ResponseWriter, r *http.Request) {
    if !h.ready.Load() {
        writeError(w, http.StatusServiceUnavailable, "not_ready", "shutting down")
        return
    }
    // ... обычные ping-проверки
}

// в shutdown-обработчике
health.ready.Store(false)
// даём gateway'ю 2-3 секунды заметить readyz=503
time.Sleep(3 * time.Second)
// и только потом начинаем закрывать
srv.Shutdown(shutdownCtx)

Подробнее про health endpoint'ы — http-api.md.

Kafka consumer drain

Watermill router.Close():

  • закрывает subscriber (новые сообщения не поступают),
  • дожидается завершения in-flight handler'ов (в пределах CloseTimeout),
  • коммитит offset'ы успешных сообщений.

Конфигурация таймаута:

router, err := message.NewRouter(message.RouterConfig{
    CloseTimeout: 20 * time.Second,
}, watermillLogger)

CloseTimeout должен быть меньше общего shutdown-таймаута — иначе другим компонентам не останется времени.

См. events.md про Watermill router в деталях.

HTTP shutdown

srv.Shutdown(ctx):

  • перестаёт accept'ить новые connections немедленно,
  • ждёт ctx.Done() или завершения активных handler'ов,
  • возвращает nil при успешном drain или ошибку по таймауту.
func shutdownHTTP(ctx context.Context, srv *http.Server) error {
    if err := srv.Shutdown(ctx); err != nil {
        return fmt.Errorf("http shutdown: %w", err)
    }
    return nil
}

В handler'е обязательно смотри на ctx.Done() для долгих операций (SQL, HTTP-клиенты, Kafka-publish). Иначе handler будет игнорировать shutdown и досидит до SIGKILL.

Background workers

Каждый worker принимает ctx и выходит по <-ctx.Done():

func (w *Forwarder) Run(ctx context.Context) error {
    ticker := time.NewTicker(w.interval)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return w.drain(context.Background()) // финальный drain перед выходом
        case <-ticker.C:
            if err := w.tick(ctx); err != nil {
                w.log.Error("forwarder tick", "err", err)
            }
        }
    }
}

Правила:

  • Любой долгий цикл обязан иметь case <-ctx.Done().
  • Перед выходом — финальный flush/drain (например, отправить накопленные batch'и).
  • Не делай бесконечных retry без учёта ctx — зависнешь на shutdown.

Errgroup паттерн

Все долгоживущие компоненты запускаются через golang.org/x/sync/errgroup:

g, gctx := errgroup.WithContext(ctx)

g.Go(func() error {
    if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
        return fmt.Errorf("http serve: %w", err)
    }
    return nil
})

g.Go(func() error {
    return router.Run(gctx)
})

g.Go(func() error {
    return forwarder.Run(gctx)
})

if err := g.Wait(); err != nil {
    slog.Error("run failed", "err", err)
}

Что это даёт:

  • Падение любого из компонентов отменяет общий gctx — остальные тоже начинают shutdown. Это правильно: сервис без kafka- consumer'а не должен продолжать принимать HTTP-запросы и делать вид, что всё нормально.
  • g.Wait() вернёт первую нетривиальную ошибку.
  • Добавить нового worker'а — одна строчка g.Go(...).

Что не делать в shutdown

  • os.Exit(0) в defer. Это обрывает остальные defer и cleanup не завершается.
  • Долгие ожидания внешних систем без таймаута. kafka.Publish() без context'а может висеть навсегда, если broker недоступен — сервис не остановится никогда.
  • Удаление временных файлов, которые нужны для post-mortem (core dump, перенаправленные логи). Они переживают pod.
  • Отправку «прощальных» событий. Если сервис падает, Kafka может быть уже недоступен; событие уйдёт в outbox и его отправит forwarder следующей инстанции — этого достаточно. Дополнительных «прощаний» не нужно.
  • log.Fatal в shutdown-хэндлере. Он вызывает os.Exit(1) минуя остальные defer'ы. Логируй ошибку и возвращай её наверх.

Testing shutdown

Integration-тест shutdown:

  1. Старт сервиса in-process.
  2. Посылка in-flight HTTP-запроса на endpoint с искусственной задержкой (например, 2 секунды).
  3. Параллельно — отправка SIGTERM процессу (через p.Signal(os.Interrupt) для in-process или через docker kill -s TERM).
  4. Проверки:
  5. /readyz → 503 в пределах секунды.
  6. in-flight запрос завершается успешно (200, не 502/503).
  7. /healthz → 200 до самого конца.
  8. процесс завершается за < 30 секунд с exit-кодом 0.

Шаблон в репозитории сервиса user, файл cmd/server/shutdown_test.go (если есть; иначе скопируй из другого сервиса и адаптируй).

k8s terminationGracePeriodSeconds

В Deployment манифесте выставляй поле так, чтобы оно было больше shutdown-таймаута сервиса:

spec:
  terminationGracePeriodSeconds: 45 # при shutdown 30s + буфер 15s

Если k8s-grace-period меньше, чем сервисный таймаут, pod получит SIGKILL до того, как дренирование закончится. Буфер в 10–15 секунд покрывает retry-loop'ы gateway'я, который перестал получать readyz=200.

Соответствующий pre-stop hook (опционально):

lifecycle:
  preStop:
    exec:
      command: ["/bin/sh", "-c", "sleep 3"]

preStop выполняется до SIGTERM: k8s-endpoint controller успевает распространить «pod не ready» по iptables раньше, чем сервис начнёт шатдаун. Это сглаживает окно в 2-3 секунды, когда gateway всё ещё шлёт трафик.

Чеклист на ревью shutdown-кода

  • signal.NotifyContext в main.go.
  • Все worker'ы и router запущены через errgroup.WithContext.
  • Shutdown-ctx создаётся свежим с явным таймаутом 30s.
  • /readyz переключается в 503 до начала закрытия HTTP.
  • /healthz остаётся 200 до самого конца.
  • Порядок cleanup: HTTP → Kafka consumer → workers → Kafka publisher → DB pool → Redis.
  • os.Exit(1) срабатывает при превышении shutdown-таймаута.
  • В каждом worker'е есть case <-ctx.Done().
  • Integration-тест на shutdown присутствует.
  • В k8s-манифесте terminationGracePeriodSeconds > shutdown timeout сервиса.

См. также