Skip to Content
ConventionsPostgres и pgx

Работа с БД: pgx и миграции

Все сервисы работают с Postgres через pgx/v5 и pgxpool. В каждом сервис-репо в pkg/db/ лежит pgx-wrapper — его и используй, не переизобретай. Пока нет выделенной shared-библиотеки, pkg/db/ во всех сервисах должен быть одинаковым; синхронизация — за владельцами.

Жизненный цикл одного транзакционного вызова (InTx + outbox):

Ключевые свойства: одна бизнес-операция + запись в outbox коммитятся атомарно; соединение возвращается в pool только после Commit/Rollback; внутри транзакции никаких внешних I/O.

Содержание

Инициализация пула

Пул создаётся один раз в main.go и передаётся в репозитории через DI:

database, err := pkgdb.New(ctx, pkgdb.Config{ DSN: cfg.DB.DSN(), PoolMax: cfg.DB.PoolMax, PoolMin: cfg.DB.PoolMin, }) if err != nil { return fmt.Errorf("db: %w", err) } defer database.Close()

pkgdb.New делает pgxpool.ParseConfigpgxpool.NewWithConfigPing. Если ping упал — сервис не стартует. Это правильно: мёртвая БД = мёртвый сервис.

Размеры пула задаются env-переменными (типичные значения для сервисов среднего трафика):

DB_POOL_MIN=2 DB_POOL_MAX=20

Не хардкодь их в коде. Конфиг загружается в internal/config/.

PgBouncer и pgx

В prod между сервисом и Postgres стоит PgBouncer в режиме transaction pooling. Это экономит backend-процессы Postgres (один физический коннект обслуживает запросы от многих pgx-клиентов по очереди), но накладывает ограничения на то, что можно делать в сессии. Без правильной настройки pgx молча деградирует или ломается на ровном месте.

Правила для сервисов, ходящих через PgBouncer:

  • Выключить implicit prepared statement cache. Именованные prepared statements привязаны к физическому коннекту Postgres; при transaction pooling следующий запрос может уйти на другой коннект, где этого statement нет — получишь prepared statement "stmtcache_..." does not exist (SQLSTATE 26000). Решение — режим exec (без prepare):

    cfg, err := pgxpool.ParseConfig(dsn) if err != nil { return nil, fmt.Errorf("parse dsn: %w", err) } cfg.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeExec pool, err := pgxpool.NewWithConfig(ctx, cfg)

    QueryExecModeExec отсылает запрос с параметрами в одном сообщении Simple Query — prepared statement не создаётся вовсе. Цена — небольшой overhead на парсинг запроса на сервере (~0.1 ms), он того стоит.

  • Альтернатива для host-pool forwarder’а. Компонентам, которые выигрывают от prepared statements (высокий QPS, короткие запросы к тем же таблицам — например, watermill-sql forwarder, миграционный runner), поднимается отдельный пул напрямую в Postgres, мимо PgBouncer (DB_DIRECT_DSN). Это выделенный пул с маленьким POOL_MAX (2–4), исключительно для таких задач. Остальной трафик по-прежнему идёт через PgBouncer.

  • Никаких session-level state в SQL. Transaction pooling не гарантирует, что следующий запрос попадёт на тот же коннект PostgreSQL. Поэтому запрещено:

    • SET без LOCAL — используй SET LOCAL ... внутри транзакции;
    • LISTEN / NOTIFY — сессия будет порвана на любой COMMIT;
    • session-level advisory locks (pg_advisory_lock) — только transaction-level (pg_advisory_xact_lock / pg_try_advisory_xact_lock), они снимаются автоматически на COMMIT / ROLLBACK, см. §Advisory lock;
    • SET TRANSACTION до BEGIN — PgBouncer не переносит его.
  • statement_timeout — через DSN, не через SET. Задаётся через options=-c%20statement_timeout%3D30s в DSN или в PgBouncer-level конфиге, а не в коде через SET statement_timeout.

Если трафик не идёт через PgBouncer (локальная разработка, сервис подключается напрямую) — DefaultQueryExecMode можно не менять, prepared statements будут работать штатно. Но код сервиса должен работать одинаково в обоих окружениях — не пиши код, который завязан на кэш prepared statements и падает без него.

Передача пула в репозитории

Репозиторий получает пул через конструктор:

package postgres import "github.com/jackc/pgx/v5/pgxpool" type UserRepo struct { pool *pgxpool.Pool } func NewUserRepo(pool *pgxpool.Pool) *UserRepo { return &UserRepo{pool: pool} }

Репозиторий — тонкий: голый SQL + сканирование в struct. Никакой бизнес-логики.

Запросы

Контекст всегда первый

func (r *UserRepo) Get(ctx context.Context, id int64) (*domain.User, error) { const q = `SELECT id, email, name, created_at FROM users WHERE id = $1` var u domain.User err := r.pool.QueryRow(ctx, q, id). Scan(&u.ID, &u.Email, &u.Name, &u.CreatedAt) if err != nil { if pkgdb.IsNoRows(err) { return nil, pkgdb.ErrNotFound } return nil, fmt.Errorf("get user %d: %w", id, err) } return &u, nil }
  • В prod pgx работает в режиме QueryExecModeExec (без prepared statements) из-за PgBouncer transaction pooling — см. §PgBouncer и pgx. Не опирайся на statement cache в логике кода; плейсхолдеры и параметризация от этого не страдают.
  • Плейсхолдеры — $1, $2, … Никогда не конкатенируй user input в SQL.
  • pgx.ErrNoRows транслируем в pkgdb.ErrNotFound, чтобы service-слой не зависел от pgx.

Константы запросов

Пиши SQL в const в той же функции или на верхнем уровне файла. Не склеивай через fmt.Sprintf с динамическими кусками SQL — используй плейсхолдеры.

Исключение: если имя таблицы схемы должно быть параметризуемо (например, в RunMigrations), тогда используй fmt.Sprintf только для идентификаторов, которые пришли из кода, не от пользователя.

Батч-запросы

Для множественного insert/update используй pgx.Batch:

batch := &pgx.Batch{} for _, id := range ids { batch.Queue(`UPDATE outbox SET published_at = NOW() WHERE id = $1`, id) } br := r.pool.SendBatch(ctx, batch) defer br.Close() for range ids { if _, err := br.Exec(); err != nil { return fmt.Errorf("batch exec: %w", err) } }

Error handling в batch

br.Exec() / br.Query() возвращают ошибку на каждый элемент батча по порядку. Игнорировать эти ошибки нельзя — неуспешный INSERT молча пропадёт, и про него никто не узнает.

Два режима обработки:

  • All-or-nothing. Открываем транзакцию, выполняем tx.SendBatch, при первой же ошибке возвращаем — defer делает rollback. Вся пачка либо применилась, либо не применилась.
  • Best-effort с accumulate. Собираем ошибки через errors.Join(errs...), продолжаем остальные элементы. Использовать только вне транзакции — внутри tx первая же ошибка отравляет всю транзакцию, все последующие команды вернут current transaction is aborted.
batch := &pgx.Batch{} for _, x := range items { batch.Queue("INSERT INTO reviews (place_id, user_id) VALUES ($1, $2)", x.PlaceID, x.UserID) } br := tx.SendBatch(ctx, batch) defer br.Close() var errs []error for i := range items { if _, err := br.Exec(); err != nil { errs = append(errs, fmt.Errorf("item %d (id=%s): %w", i, items[i].ID, err)) // all-or-nothing: здесь return errs[0], снаружи — rollback. } } if err := br.Close(); err != nil { errs = append(errs, fmt.Errorf("batch close: %w", err)) } return errors.Join(errs...)

Критично: br.Close() обязателен в любом сценарии (включая early return), иначе утечёт connection из пула и prepared statements останутся висеть на сервере. Используй defer br.Close() сразу после SendBatch.

Если items > 1000 — разбивай на чанки по 500–1000 элементов вручную. pgx.Batch сам чанкование не делает, а слишком большой батч держит connection занятым минутами и раздувает WAL.

Транзакции

Используй db.InTx helper — он обрабатывает rollback при любой ошибке, включая ошибку commit’а.

Реализация

func (d *DB) InTx(ctx context.Context, fn func(tx pgx.Tx) error) error { tx, err := d.BeginTx(ctx, pgx.TxOptions{}) if err != nil { return fmt.Errorf("begin: %w", err) } defer func() { _ = tx.Rollback(ctx) }() if err := fn(tx); err != nil { return err } if err := tx.Commit(ctx); err != nil { return fmt.Errorf("commit: %w", err) } return nil }

Эта функция живёт в pkg/db/db.go каждого сервис-репо — не переписывай её в internal/. Если нужна правка, правь в pkg/db/db.go и продублируй в остальные сервис-репо (задача владельцев); после стабилизации пакет вынесется в отдельный репозиторий-библиотеку и будет подтягиваться через Go-модули.

Использование

func (s *AuthService) Register(ctx context.Context, req RegisterRequest) (*domain.User, error) { var user *domain.User err := s.db.InTx(ctx, func(tx pgx.Tx) error { u, err := s.users.CreateTx(ctx, tx, req.Email, req.PasswordHash) if err != nil { return err } if err := s.publisher.Publish(ctx, tx, "user.registered", u.ID, UserRegisteredPayload{...}); err != nil { return err } user = u return nil }) if err != nil { return nil, fmt.Errorf("register: %w", err) } return user, nil }

Правила внутри InTx:

  • Любой write, связанный с бизнес-операцией, идёт через тот же tx.
  • Outbox-запись (событие) идёт в том же tx — это предотвращает dual-write (когда БД обновлена, а событие не опубликовано, или наоборот).
  • Не вызывай внешние HTTP/Kafka прямо в InTx. Для событий — только outbox. Для внешних вызовов — после успешного commit’а.
  • Держи tx коротким. Никаких time.Sleep, никакой долгой валидации.

Миграции

Инструмент и формат имён

Миграции управляются через Atlas versioned, forward-only. Каждая миграция — один файл .sql, down-файлов нет:

migrations/ ├── 001_init.sql ├── 002_add_device_id.sql ├── 003_backfill_device_id.sql ├── 004_device_id_not_null.sql └── atlas.sum # integrity-хеши всех миграций (коммитится)
  • Создавай через atlas migrate new <name> --env ci; после правки файла — atlas migrate hash --env ci (пере-хеширует atlas.sum).
  • Нумерация сплошная, генерируется Atlas. Не пропускай и не переставляй.
  • Forward-only: down-файлов нет. Откат в prod — новая миграция (fix-forward), см. ../how-to/rollback-migration.
  • Имя — snake_case, описывает что делает, не почему: add_device_id, а не fix_login_bug.
  • atlas.sum коммитится вместе с миграцией. Не редактируй уже применённую миграцию — atlas migrate validate поймает расхождение хеша.

Как применяется (детально — в ../how-to/add-migration): миграции пакуются в образ и применяются PreSync-Job’ом Argo CD (atlas migrate apply) до раскатки Deployment, под выделенной ролью <svc>_migrator (только она имеет DDL; app-роль — DML-only). Сам сервис миграции на старте не прогоняет.

Конкурентность: Atlas берёт lock сам

Раньше каждая up-миграция начиналась с pg_advisory_xact_lock(...), чтобы два пода не применяли миграции одновременно. С Atlas это не нужно:

  • миграции применяет один PreSync-Job, а не каждый pod на старте;
  • atlas migrate apply сам берёт session-уровневый advisory lock на время прогона — параллельный apply корректно ждёт.

Поэтому файлы миграций — это чистый forward-DDL без BEGIN/COMMIT и без advisory lock: Atlas оборачивает каждую миграцию в транзакцию сам. Подключение Job’а идёт через session-pool алиас PgBouncer <db>_migrate (на transaction-pool session advisory lock работает некорректно) — см. ../how-to/add-migration и §PgBouncer и pgx.

Advisory lock в прикладном коде

В самих миграциях advisory lock больше не пишем (см. выше — Atlas берёт его сам). Но в прикладном коде advisory lock’и нужны: взаимное исключение периодических задач, leader-only воркеры и т.п. Здесь свои правила.

pg_advisory_xact_lock($1) блокирует до получения — без явного таймаута. Для фоновой задачи это иногда ок, но для пользовательского запроса опасно: под spike нагрузкой ждущий запрос висит минутами, handler съедает goroutine и connection.

Правило: всегда используй pg_try_advisory_xact_lock или комбинацию SET LOCAL lock_timeout + pg_advisory_xact_lock.

  • pg_try_advisory_xact_lock($1) возвращает false сразу, если lock занят. Подходит для idempotent периодических задач: «не получилось сейчас — пропустим, следующий tick подхватит».
  • lock_timeout более универсален — Postgres бросит canceling statement due to lock timeout через N секунд, код получит SQLSTATE 55P03.
err := s.db.InTx(ctx, func(tx pgx.Tx) error { if _, err := tx.Exec(ctx, "SET LOCAL lock_timeout = '5s'"); err != nil { return fmt.Errorf("set lock_timeout: %w", err) } if _, err := tx.Exec(ctx, "SELECT pg_advisory_xact_lock($1)", lockID); err != nil { // 55P03 = lock_not_available; lock_timeout срабатывает тоже как 55P03. return fmt.Errorf("acquire advisory lock %d: %w", lockID, err) } // ... полезная работа под lock'ом return nil })

Deadlock detection: если два процесса берут advisory lock + row lock в разном порядке, Postgres детектирует deadlock (SQLSTATE 40P01) и убивает одну из транзакций. 40P01 — retryable (см. ../patterns/retry-and-circuit-breaker).

Метрика:

var advisoryLockWait = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: "advisory_lock_wait_seconds", Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1, 2, 5}, }, []string{"purpose"}, )

Засекается от момента отправки SELECT pg_advisory_xact_lock до возврата. Alert на p99 > 1s per purpose — lock contention.

Expand-contract

Для любых breaking-изменений (добавление обязательной колонки, переименование) применяем expand-contract в три отдельные миграции. Файлы — чистый forward-DDL, без BEGIN/COMMIT (Atlas оборачивает сам):

Миграция 1 — expand. Добавить новую колонку/таблицу как nullable, чтобы старый код продолжал работать:

-- 002_add_device_id.sql ALTER TABLE sessions ADD COLUMN device_id UUID;

Миграция 2 — backfill. Заполнить значения для существующих строк (для больших таблиц — отдельным Job’ом, см. ниже):

-- 003_backfill_device_id.sql UPDATE sessions SET device_id = gen_random_uuid() WHERE device_id IS NULL;

Миграция 3 — contract. Добавить constraint — только после того, как весь работающий код уже пишет device_id:

-- 004_device_id_not_null.sql ALTER TABLE sessions ALTER COLUMN device_id SET NOT NULL;

Никогда не объединяй три шага в одну миграцию — это ломает rolling deploy; atlas migrate lint поймает breaking-изменение и завалит PR.

Для CREATE INDEX CONCURRENTLY (нельзя в транзакции) добавь в начало файла директиву -- atlas:txmode none, иначе Atlas обернёт миграцию в транзакцию и Postgres откажет.

Большие backfill’ы

Если backfill касается > 100k строк — не делай его в одной transaction. Вынеси в отдельный скрипт/миграцию с батч-UPDATE по 1000–10000 строк. Обязательно прогоняй EXPLAIN ANALYZE перед merge.

Правила именования в SQL

Таблицы — множественное число

CREATE TABLE users (...); CREATE TABLE reviews (...); CREATE TABLE upload_sessions (...);

Не user, не review. Всегда users, reviews, upload_sessions.

Обязательные колонки времени

Каждая таблица, которую мы читаем/пишем из кода, имеет:

created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()

Исключение: чисто lookup-таблицы (справочники уровней и т.п.), которые никогда не обновляются — там updated_at опционален.

Soft-delete

Для сущностей, которые нельзя физически удалять (user’ы, reviews, adresses):

deleted_at TIMESTAMPTZ,

Все запросы, которые возвращают «живые» строки, включают WHERE deleted_at IS NULL. Индексы для этих запросов — частичные:

CREATE INDEX idx_users_active ON users(id) WHERE deleted_at IS NULL;

Внешние ID — BIGINT без FK

Если поле ссылается на сущность из другого сервиса — храним как BIGINT без REFERENCES:

CREATE TABLE reviews ( id BIGSERIAL PRIMARY KEY, place_id BIGINT NOT NULL, -- из places-service, без FK user_id BIGINT NOT NULL, -- из user-service, без FK ... );

FK только внутри своей БД сервиса. Пересечение баз сервисов через FK — невозможно (у каждого сервиса своя БД) и запрещено: это создало бы скрытую связность и сломало бы независимый деплой.

NOT NULL по умолчанию

Любая колонка NOT NULL, пока не доказано обратное. NULL — это отдельное семантическое состояние («значение неизвестно»), используй его только когда оно действительно нужно.

Индексы

  • Индекс под каждый WHERE-предикат, который встречается в проде более чем в одном запросе.

  • Индекс под JOIN-колонки.

  • Прогоняй EXPLAIN ANALYZE перед merge любой новой миграции с индексом или схемой:

    EXPLAIN ANALYZE SELECT r.* FROM reviews r WHERE r.place_id = 42 AND r.deleted_at IS NULL ORDER BY r.created_at DESC LIMIT 20;

    Ищи Seq Scan на больших таблицах, Hash Join без индекса, дикие оценки rows — всё это повод добавить/поправить индекс.

  • Композитный индекс — когда у тебя запрос с AND по нескольким колонкам. Порядок колонок — самая селективная первой.

  • Частичный индекс (WHERE deleted_at IS NULL) — когда большинство строк не соответствует условию.

⚠ Частичный индекс используется только если query содержит ту же WHERE-клаузу.

Пример: CREATE INDEX idx_reviews_active ON reviews (place_id) WHERE deleted_at IS NULL.

  • SELECT * FROM reviews WHERE place_id = $1 AND deleted_at IS NULL — индекс используется ✓.
  • SELECT * FROM reviews WHERE place_id = $1 — индекс НЕ используется ✗. Planner не знает, что фильтр deleted_at IS NULL подразумевается неявно, и выбирает seq-scan или другой индекс.

Правило: все запросы по soft-deletable таблицам обязаны явно включать AND deleted_at IS NULL (или IS NOT NULL для запроса удалённых). Заверни это в repo-метод, чтобы нельзя было забыть в следующем запросе.

ErrNotFound и sentinel

Репозиторий никогда не возвращает pgx.ErrNoRows наружу:

if err != nil { if pkgdb.IsNoRows(err) { return nil, pkgdb.ErrNotFound } return nil, fmt.Errorf("get user %d: %w", id, err) }

Service-слой импортирует только pkgdb.ErrNotFound или собственный sentinel (domain.ErrUserNotFound) — но не pgx.

Что не делать

  • Не используй database/sql — в проекте только pgx.
  • Не оборачивай pgx в ORM. Никакого gorm, ent, sqlx.
  • Не делай cross-database JOIN между сервисами. Каждый сервис работает со своей БД; данные чужих сервисов — только через HTTP/Kafka.
  • Не делай внешних HTTP-вызовов внутри транзакции.
  • Не клади бизнес-логику в SQL-функции и триггеры. Всё в Go.

См. также

Last updated on