Работа с БД: pgx и миграции¶
Все сервисы работают с Postgres через pgx/v5 и pgxpool. В каждом
сервис-репо в pkg/db/ лежит pgx-wrapper — его и используй, не переизобретай.
Пока нет выделенной shared-библиотеки, pkg/db/ во всех сервисах должен
быть одинаковым; синхронизация — за владельцами.
Жизненный цикл одного транзакционного вызова (InTx + outbox):
sequenceDiagram
autonumber
participant S as service.Method
participant DB as pgxpool
participant TX as pgx.Tx
participant R as repo (Postgres)
participant OB as outbox publisher
S->>DB: Acquire()
DB-->>S: conn
S->>TX: BeginTx(ctx)
S->>R: CreateTx(ctx, tx, ...)
R->>TX: INSERT INTO ...
TX-->>R: ok
S->>OB: PublishTx(ctx, tx, msg)
OB->>TX: INSERT INTO outbox ...
TX-->>OB: ok
alt успех
S->>TX: Commit(ctx)
TX-->>DB: release conn
else ошибка
S->>TX: Rollback(ctx)
TX-->>DB: release conn
end
S-->>S: return
Ключевые свойства: одна бизнес-операция + запись в outbox коммитятся атомарно; соединение возвращается в pool только после Commit/Rollback; внутри транзакции никаких внешних I/O.
Содержание¶
- Инициализация пула
- Передача пула в репозитории
- Запросы
- Транзакции
- Миграции
- Правила именования в SQL
- Индексы
- ErrNotFound и sentinel
- Что не делать
- См. также
Инициализация пула¶
Пул создаётся один раз в main.go и передаётся в репозитории через DI:
database, err := pkgdb.New(ctx, pkgdb.Config{
DSN: cfg.DB.DSN(),
PoolMax: cfg.DB.PoolMax,
PoolMin: cfg.DB.PoolMin,
})
if err != nil {
return fmt.Errorf("db: %w", err)
}
defer database.Close()
pkgdb.New делает pgxpool.ParseConfig → pgxpool.NewWithConfig → Ping.
Если ping упал — сервис не стартует. Это правильно: мёртвая БД = мёртвый
сервис.
Размеры пула задаются env-переменными (типичные значения для сервисов среднего трафика):
Не хардкодь их в коде. Конфиг загружается в internal/config/.
Передача пула в репозитории¶
Репозиторий получает пул через конструктор:
package postgres
import "github.com/jackc/pgx/v5/pgxpool"
type UserRepo struct {
pool *pgxpool.Pool
}
func NewUserRepo(pool *pgxpool.Pool) *UserRepo {
return &UserRepo{pool: pool}
}
Репозиторий — тонкий: голый SQL + сканирование в struct. Никакой бизнес-логики.
Запросы¶
Контекст всегда первый¶
func (r *UserRepo) Get(ctx context.Context, id int64) (*domain.User, error) {
const q = `SELECT id, email, name, created_at FROM auth.users WHERE id = $1`
var u domain.User
err := r.pool.QueryRow(ctx, q, id).
Scan(&u.ID, &u.Email, &u.Name, &u.CreatedAt)
if err != nil {
if pkgdb.IsNoRows(err) {
return nil, pkgdb.ErrNotFound
}
return nil, fmt.Errorf("get user %d: %w", id, err)
}
return &u, nil
}
pgxавтоматически использует prepared statements — тебе не нужно делать это руками.- Плейсхолдеры —
$1,$2, … Никогда не конкатенируй user input в SQL. pgx.ErrNoRowsтранслируем вpkgdb.ErrNotFound, чтобы service-слой не зависел от pgx.
Константы запросов¶
Пиши SQL в const в той же функции или на верхнем уровне файла. Не склеивай
через fmt.Sprintf с динамическими кусками SQL — используй плейсхолдеры.
Исключение: если имя таблицы схемы должно быть параметризуемо (например, в
RunMigrations), тогда используй fmt.Sprintf только для идентификаторов,
которые пришли из кода, не от пользователя.
Батч-запросы¶
Для множественного insert/update используй pgx.Batch:
batch := &pgx.Batch{}
for _, id := range ids {
batch.Queue(`UPDATE auth.outbox SET published_at = NOW() WHERE id = $1`, id)
}
br := r.pool.SendBatch(ctx, batch)
defer br.Close()
for range ids {
if _, err := br.Exec(); err != nil {
return fmt.Errorf("batch exec: %w", err)
}
}
Транзакции¶
Используй db.InTx helper — он обрабатывает rollback при любой ошибке,
включая ошибку commit'а.
Реализация¶
func (d *DB) InTx(ctx context.Context, fn func(tx pgx.Tx) error) error {
tx, err := d.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return fmt.Errorf("begin: %w", err)
}
defer func() { _ = tx.Rollback(ctx) }()
if err := fn(tx); err != nil {
return err
}
if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("commit: %w", err)
}
return nil
}
Эта функция живёт в pkg/db/db.go каждого сервис-репо — не переписывай её в
internal/. Если нужна правка, правь в pkg/db/db.go и продублируй в
остальные сервис-репо (задача владельцев); после стабилизации пакет вынесется
в отдельный репозиторий-библиотеку и будет подтягиваться через Go-модули.
Использование¶
func (s *AuthService) Register(ctx context.Context, req RegisterRequest) (*domain.User, error) {
var user *domain.User
err := s.db.InTx(ctx, func(tx pgx.Tx) error {
u, err := s.users.CreateTx(ctx, tx, req.Email, req.PasswordHash)
if err != nil {
return err
}
if err := s.publisher.Publish(ctx, tx, "user.registered", u.ID, UserRegisteredPayload{...}); err != nil {
return err
}
user = u
return nil
})
if err != nil {
return nil, fmt.Errorf("register: %w", err)
}
return user, nil
}
Правила внутри InTx:
- Любой write, связанный с бизнес-операцией, идёт через тот же
tx. - Outbox-запись (событие) идёт в том же tx — это предотвращает dual-write (когда БД обновлена, а событие не опубликовано, или наоборот).
- Не вызывай внешние HTTP/Kafka прямо в
InTx. Для событий — только outbox. Для внешних вызовов — после успешного commit'а. - Держи tx коротким. Никаких
time.Sleep, никакой долгой валидации.
Миграции¶
Инструмент и формат имён¶
Миграции управляются через golang-migrate.
Каждая миграция — пара файлов:
migrations/
├── 001_init.up.sql
├── 001_init.down.sql
├── 002_add_device_id.up.sql
├── 002_add_device_id.down.sql
├── 003_backfill_device_id.up.sql
├── 003_backfill_device_id.down.sql
└── 004_device_id_not_null.up.sql
004_device_id_not_null.down.sql
- Нумерация трёхзначная, сплошная:
001,002, … Не пропускай номера. down.sql— обязателен. Даже если это просто комментарий «rollback вручную, потому что drop column невозможен» — файл должен существовать, чтобы migrate мог двигаться туда-обратно в тестах.- Имя —
snake_case, описывает что делает, не почему:add_device_id, а неfix_login_bug.
Advisory lock¶
Каждая up-миграция начинается с advisory lock. Это предотвращает ситуацию, когда два пода одновременно стартуют и пытаются применить миграции:
-- 002_add_device_id.up.sql
BEGIN;
SELECT pg_advisory_xact_lock(hashtext('auth_migrations'));
ALTER TABLE auth.sessions ADD COLUMN device_id UUID;
COMMIT;
hashtext('<service>_migrations') — одинаковый для всех миграций одного
сервиса, поэтому pod #2 подождёт, пока pod #1 не отпустит lock. Lock снимается
автоматически при COMMIT/ROLLBACK.
Expand-contract¶
Для любых breaking-изменений (добавление обязательной колонки, переименование) применяем expand-contract в три миграции:
Миграция 1 — expand. Добавить новую колонку/таблицу как nullable, чтобы старый код продолжал работать:
-- 002_add_device_id.up.sql
BEGIN;
SELECT pg_advisory_xact_lock(hashtext('auth_migrations'));
ALTER TABLE auth.sessions ADD COLUMN device_id UUID;
COMMIT;
Миграция 2 — backfill. Заполнить значения для существующих строк:
-- 003_backfill_device_id.up.sql
BEGIN;
SELECT pg_advisory_xact_lock(hashtext('auth_migrations'));
UPDATE auth.sessions SET device_id = gen_random_uuid() WHERE device_id IS NULL;
COMMIT;
Миграция 3 — contract. Добавить constraint:
-- 004_device_id_not_null.up.sql
BEGIN;
SELECT pg_advisory_xact_lock(hashtext('auth_migrations'));
ALTER TABLE auth.sessions ALTER COLUMN device_id SET NOT NULL;
COMMIT;
Никогда не объединяй три шага в одну миграцию — это ломает rolling deploy.
Большие backfill'ы¶
Если backfill касается > 100k строк — не делай его в одной transaction.
Вынеси в отдельный скрипт/миграцию с батч-UPDATE по 1000–10000 строк.
Обязательно прогоняй EXPLAIN ANALYZE перед merge.
Правила именования в SQL¶
Таблицы — множественное число¶
CREATE TABLE auth.users (...);
CREATE TABLE review.reviews (...);
CREATE TABLE media.upload_sessions (...);
Не user, не review. Всегда users, reviews, upload_sessions.
Обязательные колонки времени¶
Каждая таблица, которую мы читаем/пишем из кода, имеет:
Исключение: чисто lookup-таблицы (справочники уровней и т.п.), которые
никогда не обновляются — там updated_at опционален.
Soft-delete¶
Для сущностей, которые нельзя физически удалять (user'ы, reviews, adresses):
Все запросы, которые возвращают «живые» строки, включают WHERE deleted_at IS NULL.
Индексы для этих запросов — частичные:
Внешние ID — BIGINT без FK¶
Если поле ссылается на сущность из другого сервиса — храним как BIGINT
без REFERENCES:
CREATE TABLE review.reviews (
id BIGSERIAL PRIMARY KEY,
place_id BIGINT NOT NULL, -- из places-service, без FK
user_id BIGINT NOT NULL, -- из user-service, без FK
...
);
FK только внутри своего сервиса/схемы. Пересечение баз сервисов через FK — нельзя: это создаёт скрытую связность и ломает независимый деплой.
NOT NULL по умолчанию¶
Любая колонка NOT NULL, пока не доказано обратное. NULL — это отдельное
семантическое состояние («значение неизвестно»), используй его только когда
оно действительно нужно.
Индексы¶
- Индекс под каждый WHERE-предикат, который встречается в проде более чем в одном запросе.
- Индекс под JOIN-колонки.
- Прогоняй
EXPLAIN ANALYZEперед merge любой новой миграции с индексом или схемой:
EXPLAIN ANALYZE
SELECT r.* FROM review.reviews r
WHERE r.place_id = 42 AND r.deleted_at IS NULL
ORDER BY r.created_at DESC LIMIT 20;
Ищи Seq Scan на больших таблицах, Hash Join без индекса, дикие
оценки rows — всё это повод добавить/поправить индекс.
- Композитный индекс — когда у тебя запрос с AND по нескольким колонкам. Порядок колонок — самая селективная первой.
- Частичный индекс (
WHERE deleted_at IS NULL) — когда большинство строк не соответствует условию.
ErrNotFound и sentinel¶
Репозиторий никогда не возвращает pgx.ErrNoRows наружу:
if err != nil {
if pkgdb.IsNoRows(err) {
return nil, pkgdb.ErrNotFound
}
return nil, fmt.Errorf("get user %d: %w", id, err)
}
Service-слой импортирует только pkgdb.ErrNotFound или собственный
sentinel (domain.ErrUserNotFound) — но не pgx.
Что не делать¶
- Не используй
database/sql— в проекте толькоpgx. - Не оборачивай pgx в ORM. Никакого gorm, ent, sqlx.
- Не делай cross-schema
JOINмежду сервисами. Каждый сервис работает только со своей схемой. - Не делай внешних HTTP-вызовов внутри транзакции.
- Не клади бизнес-логику в SQL-функции и триггеры. Всё в Go.
См. также¶
../how-to/add-migration.md— как добавить новую миграцию.../how-to/rollback-migration.md— как откатить миграцию в prod через forward-fix.../troubleshooting/migration-fails.md— что делать, если миграция упала.../troubleshooting/db-slow-query.md— диагностика медленных запросов,EXPLAIN ANALYZE,pg_stat_statements.../patterns/outbox.md—InTx+ outbox, как запись БД и публикация в Kafka идут атомарно.error-handling.md— sentinel'ы, error wrapping, маппинг в HTTP-коды.