Работа с БД: pgx и миграции
Все сервисы работают с Postgres через pgx/v5 и pgxpool. В каждом
сервис-репо в pkg/db/ лежит pgx-wrapper — его и используй, не переизобретай.
Пока нет выделенной shared-библиотеки, pkg/db/ во всех сервисах должен
быть одинаковым; синхронизация — за владельцами.
Жизненный цикл одного транзакционного вызова (InTx + outbox):
Ключевые свойства: одна бизнес-операция + запись в outbox коммитятся атомарно; соединение возвращается в pool только после Commit/Rollback; внутри транзакции никаких внешних I/O.
Содержание
- Инициализация пула
- PgBouncer и pgx
- Передача пула в репозитории
- Запросы
- Транзакции
- Миграции
- Правила именования в SQL
- Индексы
- ErrNotFound и sentinel
- Что не делать
- См. также
Инициализация пула
Пул создаётся один раз в main.go и передаётся в репозитории через DI:
database, err := pkgdb.New(ctx, pkgdb.Config{
DSN: cfg.DB.DSN(),
PoolMax: cfg.DB.PoolMax,
PoolMin: cfg.DB.PoolMin,
})
if err != nil {
return fmt.Errorf("db: %w", err)
}
defer database.Close()pkgdb.New делает pgxpool.ParseConfig → pgxpool.NewWithConfig → Ping.
Если ping упал — сервис не стартует. Это правильно: мёртвая БД = мёртвый
сервис.
Размеры пула задаются env-переменными (типичные значения для сервисов среднего трафика):
DB_POOL_MIN=2
DB_POOL_MAX=20Не хардкодь их в коде. Конфиг загружается в internal/config/.
PgBouncer и pgx
В prod между сервисом и Postgres стоит PgBouncer в режиме transaction pooling. Это экономит backend-процессы Postgres (один физический коннект обслуживает запросы от многих pgx-клиентов по очереди), но накладывает ограничения на то, что можно делать в сессии. Без правильной настройки pgx молча деградирует или ломается на ровном месте.
Правила для сервисов, ходящих через PgBouncer:
-
Выключить implicit prepared statement cache. Именованные prepared statements привязаны к физическому коннекту Postgres; при transaction pooling следующий запрос может уйти на другой коннект, где этого statement нет — получишь
prepared statement "stmtcache_..." does not exist(SQLSTATE26000). Решение — режимexec(без prepare):cfg, err := pgxpool.ParseConfig(dsn) if err != nil { return nil, fmt.Errorf("parse dsn: %w", err) } cfg.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeExec pool, err := pgxpool.NewWithConfig(ctx, cfg)QueryExecModeExecотсылает запрос с параметрами в одном сообщении Simple Query — prepared statement не создаётся вовсе. Цена — небольшой overhead на парсинг запроса на сервере (~0.1 ms), он того стоит. -
Альтернатива для host-pool forwarder’а. Компонентам, которые выигрывают от prepared statements (высокий QPS, короткие запросы к тем же таблицам — например,
watermill-sqlforwarder, миграционный runner), поднимается отдельный пул напрямую в Postgres, мимо PgBouncer (DB_DIRECT_DSN). Это выделенный пул с маленькимPOOL_MAX(2–4), исключительно для таких задач. Остальной трафик по-прежнему идёт через PgBouncer. -
Никаких session-level state в SQL. Transaction pooling не гарантирует, что следующий запрос попадёт на тот же коннект PostgreSQL. Поэтому запрещено:
SETбезLOCAL— используйSET LOCAL ...внутри транзакции;LISTEN/NOTIFY— сессия будет порвана на любойCOMMIT;- session-level advisory locks (
pg_advisory_lock) — только transaction-level (pg_advisory_xact_lock/pg_try_advisory_xact_lock), они снимаются автоматически наCOMMIT/ROLLBACK, см. §Advisory lock; SET TRANSACTIONдоBEGIN— PgBouncer не переносит его.
-
statement_timeout— через DSN, не черезSET. Задаётся черезoptions=-c%20statement_timeout%3D30sв DSN или в PgBouncer-level конфиге, а не в коде черезSET statement_timeout.
Если трафик не идёт через PgBouncer (локальная разработка, сервис
подключается напрямую) — DefaultQueryExecMode можно не менять,
prepared statements будут работать штатно. Но код сервиса должен
работать одинаково в обоих окружениях — не пиши код, который
завязан на кэш prepared statements и падает без него.
Передача пула в репозитории
Репозиторий получает пул через конструктор:
package postgres
import "github.com/jackc/pgx/v5/pgxpool"
type UserRepo struct {
pool *pgxpool.Pool
}
func NewUserRepo(pool *pgxpool.Pool) *UserRepo {
return &UserRepo{pool: pool}
}Репозиторий — тонкий: голый SQL + сканирование в struct. Никакой бизнес-логики.
Запросы
Контекст всегда первый
func (r *UserRepo) Get(ctx context.Context, id int64) (*domain.User, error) {
const q = `SELECT id, email, name, created_at FROM users WHERE id = $1`
var u domain.User
err := r.pool.QueryRow(ctx, q, id).
Scan(&u.ID, &u.Email, &u.Name, &u.CreatedAt)
if err != nil {
if pkgdb.IsNoRows(err) {
return nil, pkgdb.ErrNotFound
}
return nil, fmt.Errorf("get user %d: %w", id, err)
}
return &u, nil
}- В prod pgx работает в режиме
QueryExecModeExec(без prepared statements) из-за PgBouncer transaction pooling — см. §PgBouncer и pgx. Не опирайся на statement cache в логике кода; плейсхолдеры и параметризация от этого не страдают. - Плейсхолдеры —
$1,$2, … Никогда не конкатенируй user input в SQL. pgx.ErrNoRowsтранслируем вpkgdb.ErrNotFound, чтобы service-слой не зависел от pgx.
Константы запросов
Пиши SQL в const в той же функции или на верхнем уровне файла. Не склеивай
через fmt.Sprintf с динамическими кусками SQL — используй плейсхолдеры.
Исключение: если имя таблицы схемы должно быть параметризуемо (например, в
RunMigrations), тогда используй fmt.Sprintf только для идентификаторов,
которые пришли из кода, не от пользователя.
Батч-запросы
Для множественного insert/update используй pgx.Batch:
batch := &pgx.Batch{}
for _, id := range ids {
batch.Queue(`UPDATE outbox SET published_at = NOW() WHERE id = $1`, id)
}
br := r.pool.SendBatch(ctx, batch)
defer br.Close()
for range ids {
if _, err := br.Exec(); err != nil {
return fmt.Errorf("batch exec: %w", err)
}
}Error handling в batch
br.Exec() / br.Query() возвращают ошибку на каждый элемент
батча по порядку. Игнорировать эти ошибки нельзя — неуспешный INSERT
молча пропадёт, и про него никто не узнает.
Два режима обработки:
- All-or-nothing. Открываем транзакцию, выполняем
tx.SendBatch, при первой же ошибке возвращаем — defer делает rollback. Вся пачка либо применилась, либо не применилась. - Best-effort с accumulate. Собираем ошибки через
errors.Join(errs...), продолжаем остальные элементы. Использовать только вне транзакции — внутри tx первая же ошибка отравляет всю транзакцию, все последующие команды вернутcurrent transaction is aborted.
batch := &pgx.Batch{}
for _, x := range items {
batch.Queue("INSERT INTO reviews (place_id, user_id) VALUES ($1, $2)",
x.PlaceID, x.UserID)
}
br := tx.SendBatch(ctx, batch)
defer br.Close()
var errs []error
for i := range items {
if _, err := br.Exec(); err != nil {
errs = append(errs, fmt.Errorf("item %d (id=%s): %w", i, items[i].ID, err))
// all-or-nothing: здесь return errs[0], снаружи — rollback.
}
}
if err := br.Close(); err != nil {
errs = append(errs, fmt.Errorf("batch close: %w", err))
}
return errors.Join(errs...)Критично: br.Close() обязателен в любом сценарии (включая early
return), иначе утечёт connection из пула и prepared statements
останутся висеть на сервере. Используй defer br.Close() сразу после
SendBatch.
Если items > 1000 — разбивай на чанки по 500–1000 элементов
вручную. pgx.Batch сам чанкование не делает, а слишком большой
батч держит connection занятым минутами и раздувает WAL.
Транзакции
Используй db.InTx helper — он обрабатывает rollback при любой ошибке,
включая ошибку commit’а.
Реализация
func (d *DB) InTx(ctx context.Context, fn func(tx pgx.Tx) error) error {
tx, err := d.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return fmt.Errorf("begin: %w", err)
}
defer func() { _ = tx.Rollback(ctx) }()
if err := fn(tx); err != nil {
return err
}
if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("commit: %w", err)
}
return nil
}Эта функция живёт в pkg/db/db.go каждого сервис-репо — не переписывай её в
internal/. Если нужна правка, правь в pkg/db/db.go и продублируй в
остальные сервис-репо (задача владельцев); после стабилизации пакет вынесется
в отдельный репозиторий-библиотеку и будет подтягиваться через Go-модули.
Использование
func (s *AuthService) Register(ctx context.Context, req RegisterRequest) (*domain.User, error) {
var user *domain.User
err := s.db.InTx(ctx, func(tx pgx.Tx) error {
u, err := s.users.CreateTx(ctx, tx, req.Email, req.PasswordHash)
if err != nil {
return err
}
if err := s.publisher.Publish(ctx, tx, "user.registered", u.ID, UserRegisteredPayload{...}); err != nil {
return err
}
user = u
return nil
})
if err != nil {
return nil, fmt.Errorf("register: %w", err)
}
return user, nil
}Правила внутри InTx:
- Любой write, связанный с бизнес-операцией, идёт через тот же
tx. - Outbox-запись (событие) идёт в том же tx — это предотвращает dual-write (когда БД обновлена, а событие не опубликовано, или наоборот).
- Не вызывай внешние HTTP/Kafka прямо в
InTx. Для событий — только outbox. Для внешних вызовов — после успешного commit’а. - Держи tx коротким. Никаких
time.Sleep, никакой долгой валидации.
Миграции
Инструмент и формат имён
Миграции управляются через Atlas — versioned,
forward-only. Каждая миграция — один файл .sql, down-файлов нет:
migrations/
├── 001_init.sql
├── 002_add_device_id.sql
├── 003_backfill_device_id.sql
├── 004_device_id_not_null.sql
└── atlas.sum # integrity-хеши всех миграций (коммитится)- Создавай через
atlas migrate new <name> --env ci; после правки файла —atlas migrate hash --env ci(пере-хешируетatlas.sum). - Нумерация сплошная, генерируется Atlas. Не пропускай и не переставляй.
- Forward-only: down-файлов нет. Откат в prod — новая миграция
(fix-forward), см.
../how-to/rollback-migration. - Имя —
snake_case, описывает что делает, не почему:add_device_id, а неfix_login_bug. atlas.sumкоммитится вместе с миграцией. Не редактируй уже применённую миграцию —atlas migrate validateпоймает расхождение хеша.
Как применяется (детально — в ../how-to/add-migration):
миграции пакуются в образ и применяются PreSync-Job’ом Argo CD
(atlas migrate apply) до раскатки Deployment, под выделенной ролью
<svc>_migrator (только она имеет DDL; app-роль — DML-only). Сам сервис
миграции на старте не прогоняет.
Конкурентность: Atlas берёт lock сам
Раньше каждая up-миграция начиналась с pg_advisory_xact_lock(...), чтобы
два пода не применяли миграции одновременно. С Atlas это не нужно:
- миграции применяет один PreSync-Job, а не каждый pod на старте;
atlas migrate applyсам берёт session-уровневый advisory lock на время прогона — параллельный apply корректно ждёт.
Поэтому файлы миграций — это чистый forward-DDL без BEGIN/COMMIT и без
advisory lock: Atlas оборачивает каждую миграцию в транзакцию сам.
Подключение Job’а идёт через session-pool алиас PgBouncer <db>_migrate
(на transaction-pool session advisory lock работает некорректно) — см.
../how-to/add-migration и
§PgBouncer и pgx.
Advisory lock в прикладном коде
В самих миграциях advisory lock больше не пишем (см. выше — Atlas берёт его сам). Но в прикладном коде advisory lock’и нужны: взаимное исключение периодических задач, leader-only воркеры и т.п. Здесь свои правила.
pg_advisory_xact_lock($1) блокирует до получения — без явного
таймаута. Для фоновой задачи это иногда ок, но для пользовательского
запроса опасно: под spike нагрузкой ждущий запрос висит минутами,
handler съедает goroutine и connection.
Правило: всегда используй pg_try_advisory_xact_lock или комбинацию
SET LOCAL lock_timeout + pg_advisory_xact_lock.
pg_try_advisory_xact_lock($1)возвращаетfalseсразу, если lock занят. Подходит для idempotent периодических задач: «не получилось сейчас — пропустим, следующий tick подхватит».lock_timeoutболее универсален — Postgres броситcanceling statement due to lock timeoutчерез N секунд, код получит SQLSTATE55P03.
err := s.db.InTx(ctx, func(tx pgx.Tx) error {
if _, err := tx.Exec(ctx, "SET LOCAL lock_timeout = '5s'"); err != nil {
return fmt.Errorf("set lock_timeout: %w", err)
}
if _, err := tx.Exec(ctx, "SELECT pg_advisory_xact_lock($1)", lockID); err != nil {
// 55P03 = lock_not_available; lock_timeout срабатывает тоже как 55P03.
return fmt.Errorf("acquire advisory lock %d: %w", lockID, err)
}
// ... полезная работа под lock'ом
return nil
})Deadlock detection: если два процесса берут advisory lock + row lock в
разном порядке, Postgres детектирует deadlock (SQLSTATE 40P01) и
убивает одну из транзакций. 40P01 — retryable (см.
../patterns/retry-and-circuit-breaker).
Метрика:
var advisoryLockWait = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "advisory_lock_wait_seconds",
Buckets: []float64{0.01, 0.05, 0.1, 0.5, 1, 2, 5},
},
[]string{"purpose"},
)Засекается от момента отправки SELECT pg_advisory_xact_lock до
возврата. Alert на p99 > 1s per purpose — lock contention.
Expand-contract
Для любых breaking-изменений (добавление обязательной колонки, переименование)
применяем expand-contract в три отдельные миграции. Файлы — чистый
forward-DDL, без BEGIN/COMMIT (Atlas оборачивает сам):
Миграция 1 — expand. Добавить новую колонку/таблицу как nullable, чтобы старый код продолжал работать:
-- 002_add_device_id.sql
ALTER TABLE sessions ADD COLUMN device_id UUID;Миграция 2 — backfill. Заполнить значения для существующих строк (для больших таблиц — отдельным Job’ом, см. ниже):
-- 003_backfill_device_id.sql
UPDATE sessions SET device_id = gen_random_uuid() WHERE device_id IS NULL;Миграция 3 — contract. Добавить constraint — только после того, как
весь работающий код уже пишет device_id:
-- 004_device_id_not_null.sql
ALTER TABLE sessions ALTER COLUMN device_id SET NOT NULL;Никогда не объединяй три шага в одну миграцию — это ломает rolling deploy;
atlas migrate lint поймает breaking-изменение и завалит PR.
Для CREATE INDEX CONCURRENTLY (нельзя в транзакции) добавь в начало файла
директиву -- atlas:txmode none, иначе Atlas обернёт миграцию в транзакцию
и Postgres откажет.
Большие backfill’ы
Если backfill касается > 100k строк — не делай его в одной transaction.
Вынеси в отдельный скрипт/миграцию с батч-UPDATE по 1000–10000 строк.
Обязательно прогоняй EXPLAIN ANALYZE перед merge.
Правила именования в SQL
Таблицы — множественное число
CREATE TABLE users (...);
CREATE TABLE reviews (...);
CREATE TABLE upload_sessions (...);Не user, не review. Всегда users, reviews, upload_sessions.
Обязательные колонки времени
Каждая таблица, которую мы читаем/пишем из кода, имеет:
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()Исключение: чисто lookup-таблицы (справочники уровней и т.п.), которые
никогда не обновляются — там updated_at опционален.
Soft-delete
Для сущностей, которые нельзя физически удалять (user’ы, reviews, adresses):
deleted_at TIMESTAMPTZ,Все запросы, которые возвращают «живые» строки, включают WHERE deleted_at IS NULL.
Индексы для этих запросов — частичные:
CREATE INDEX idx_users_active ON users(id) WHERE deleted_at IS NULL;Внешние ID — BIGINT без FK
Если поле ссылается на сущность из другого сервиса — храним как BIGINT
без REFERENCES:
CREATE TABLE reviews (
id BIGSERIAL PRIMARY KEY,
place_id BIGINT NOT NULL, -- из places-service, без FK
user_id BIGINT NOT NULL, -- из user-service, без FK
...
);FK только внутри своей БД сервиса. Пересечение баз сервисов через FK — невозможно (у каждого сервиса своя БД) и запрещено: это создало бы скрытую связность и сломало бы независимый деплой.
NOT NULL по умолчанию
Любая колонка NOT NULL, пока не доказано обратное. NULL — это отдельное
семантическое состояние («значение неизвестно»), используй его только когда
оно действительно нужно.
Индексы
-
Индекс под каждый WHERE-предикат, который встречается в проде более чем в одном запросе.
-
Индекс под JOIN-колонки.
-
Прогоняй
EXPLAIN ANALYZEперед merge любой новой миграции с индексом или схемой:EXPLAIN ANALYZE SELECT r.* FROM reviews r WHERE r.place_id = 42 AND r.deleted_at IS NULL ORDER BY r.created_at DESC LIMIT 20;Ищи
Seq Scanна больших таблицах,Hash Joinбез индекса, дикие оценки rows — всё это повод добавить/поправить индекс. -
Композитный индекс — когда у тебя запрос с AND по нескольким колонкам. Порядок колонок — самая селективная первой.
-
Частичный индекс (
WHERE deleted_at IS NULL) — когда большинство строк не соответствует условию.
⚠ Частичный индекс используется только если query содержит ту же
WHERE-клаузу.Пример:
CREATE INDEX idx_reviews_active ON reviews (place_id) WHERE deleted_at IS NULL.
SELECT * FROM reviews WHERE place_id = $1 AND deleted_at IS NULL— индекс используется ✓.SELECT * FROM reviews WHERE place_id = $1— индекс НЕ используется ✗. Planner не знает, что фильтрdeleted_at IS NULLподразумевается неявно, и выбирает seq-scan или другой индекс.Правило: все запросы по soft-deletable таблицам обязаны явно включать
AND deleted_at IS NULL(илиIS NOT NULLдля запроса удалённых). Заверни это в repo-метод, чтобы нельзя было забыть в следующем запросе.
ErrNotFound и sentinel
Репозиторий никогда не возвращает pgx.ErrNoRows наружу:
if err != nil {
if pkgdb.IsNoRows(err) {
return nil, pkgdb.ErrNotFound
}
return nil, fmt.Errorf("get user %d: %w", id, err)
}Service-слой импортирует только pkgdb.ErrNotFound или собственный
sentinel (domain.ErrUserNotFound) — но не pgx.
Что не делать
- Не используй
database/sql— в проекте толькоpgx. - Не оборачивай pgx в ORM. Никакого gorm, ent, sqlx.
- Не делай cross-database
JOINмежду сервисами. Каждый сервис работает со своей БД; данные чужих сервисов — только через HTTP/Kafka. - Не делай внешних HTTP-вызовов внутри транзакции.
- Не клади бизнес-логику в SQL-функции и триггеры. Всё в Go.
См. также
../how-to/add-migration— как добавить новую миграцию.../how-to/rollback-migration— как откатить миграцию в prod через forward-fix.../troubleshooting/migration-fails— что делать, если миграция упала.../troubleshooting/db-slow-query— диагностика медленных запросов,EXPLAIN ANALYZE,pg_stat_statements.../patterns/outbox—InTx+ outbox, как запись БД и публикация в Kafka идут атомарно.error-handling— sentinel’ы, error wrapping, маппинг в HTTP-коды.