Перейти к содержанию

Работа с БД: pgx и миграции

Все сервисы работают с Postgres через pgx/v5 и pgxpool. В каждом сервис-репо в pkg/db/ лежит pgx-wrapper — его и используй, не переизобретай. Пока нет выделенной shared-библиотеки, pkg/db/ во всех сервисах должен быть одинаковым; синхронизация — за владельцами.

Жизненный цикл одного транзакционного вызова (InTx + outbox):

sequenceDiagram
    autonumber
    participant S as service.Method
    participant DB as pgxpool
    participant TX as pgx.Tx
    participant R as repo (Postgres)
    participant OB as outbox publisher

    S->>DB: Acquire()
    DB-->>S: conn
    S->>TX: BeginTx(ctx)
    S->>R: CreateTx(ctx, tx, ...)
    R->>TX: INSERT INTO ...
    TX-->>R: ok
    S->>OB: PublishTx(ctx, tx, msg)
    OB->>TX: INSERT INTO outbox ...
    TX-->>OB: ok
    alt успех
        S->>TX: Commit(ctx)
        TX-->>DB: release conn
    else ошибка
        S->>TX: Rollback(ctx)
        TX-->>DB: release conn
    end
    S-->>S: return

Ключевые свойства: одна бизнес-операция + запись в outbox коммитятся атомарно; соединение возвращается в pool только после Commit/Rollback; внутри транзакции никаких внешних I/O.

Содержание

Инициализация пула

Пул создаётся один раз в main.go и передаётся в репозитории через DI:

database, err := pkgdb.New(ctx, pkgdb.Config{
    DSN:     cfg.DB.DSN(),
    PoolMax: cfg.DB.PoolMax,
    PoolMin: cfg.DB.PoolMin,
})
if err != nil {
    return fmt.Errorf("db: %w", err)
}
defer database.Close()

pkgdb.New делает pgxpool.ParseConfigpgxpool.NewWithConfigPing. Если ping упал — сервис не стартует. Это правильно: мёртвая БД = мёртвый сервис.

Размеры пула задаются env-переменными (типичные значения для сервисов среднего трафика):

DB_POOL_MIN=2
DB_POOL_MAX=20

Не хардкодь их в коде. Конфиг загружается в internal/config/.

Передача пула в репозитории

Репозиторий получает пул через конструктор:

package postgres

import "github.com/jackc/pgx/v5/pgxpool"

type UserRepo struct {
    pool *pgxpool.Pool
}

func NewUserRepo(pool *pgxpool.Pool) *UserRepo {
    return &UserRepo{pool: pool}
}

Репозиторий — тонкий: голый SQL + сканирование в struct. Никакой бизнес-логики.

Запросы

Контекст всегда первый

func (r *UserRepo) Get(ctx context.Context, id int64) (*domain.User, error) {
    const q = `SELECT id, email, name, created_at FROM auth.users WHERE id = $1`

    var u domain.User
    err := r.pool.QueryRow(ctx, q, id).
        Scan(&u.ID, &u.Email, &u.Name, &u.CreatedAt)
    if err != nil {
        if pkgdb.IsNoRows(err) {
            return nil, pkgdb.ErrNotFound
        }
        return nil, fmt.Errorf("get user %d: %w", id, err)
    }
    return &u, nil
}
  • pgx автоматически использует prepared statements — тебе не нужно делать это руками.
  • Плейсхолдеры — $1, $2, … Никогда не конкатенируй user input в SQL.
  • pgx.ErrNoRows транслируем в pkgdb.ErrNotFound, чтобы service-слой не зависел от pgx.

Константы запросов

Пиши SQL в const в той же функции или на верхнем уровне файла. Не склеивай через fmt.Sprintf с динамическими кусками SQL — используй плейсхолдеры.

Исключение: если имя таблицы схемы должно быть параметризуемо (например, в RunMigrations), тогда используй fmt.Sprintf только для идентификаторов, которые пришли из кода, не от пользователя.

Батч-запросы

Для множественного insert/update используй pgx.Batch:

batch := &pgx.Batch{}
for _, id := range ids {
    batch.Queue(`UPDATE auth.outbox SET published_at = NOW() WHERE id = $1`, id)
}
br := r.pool.SendBatch(ctx, batch)
defer br.Close()
for range ids {
    if _, err := br.Exec(); err != nil {
        return fmt.Errorf("batch exec: %w", err)
    }
}

Транзакции

Используй db.InTx helper — он обрабатывает rollback при любой ошибке, включая ошибку commit'а.

Реализация

func (d *DB) InTx(ctx context.Context, fn func(tx pgx.Tx) error) error {
    tx, err := d.BeginTx(ctx, pgx.TxOptions{})
    if err != nil {
        return fmt.Errorf("begin: %w", err)
    }
    defer func() { _ = tx.Rollback(ctx) }()

    if err := fn(tx); err != nil {
        return err
    }
    if err := tx.Commit(ctx); err != nil {
        return fmt.Errorf("commit: %w", err)
    }
    return nil
}

Эта функция живёт в pkg/db/db.go каждого сервис-репо — не переписывай её в internal/. Если нужна правка, правь в pkg/db/db.go и продублируй в остальные сервис-репо (задача владельцев); после стабилизации пакет вынесется в отдельный репозиторий-библиотеку и будет подтягиваться через Go-модули.

Использование

func (s *AuthService) Register(ctx context.Context, req RegisterRequest) (*domain.User, error) {
    var user *domain.User
    err := s.db.InTx(ctx, func(tx pgx.Tx) error {
        u, err := s.users.CreateTx(ctx, tx, req.Email, req.PasswordHash)
        if err != nil {
            return err
        }
        if err := s.publisher.Publish(ctx, tx, "user.registered", u.ID, UserRegisteredPayload{...}); err != nil {
            return err
        }
        user = u
        return nil
    })
    if err != nil {
        return nil, fmt.Errorf("register: %w", err)
    }
    return user, nil
}

Правила внутри InTx:

  • Любой write, связанный с бизнес-операцией, идёт через тот же tx.
  • Outbox-запись (событие) идёт в том же tx — это предотвращает dual-write (когда БД обновлена, а событие не опубликовано, или наоборот).
  • Не вызывай внешние HTTP/Kafka прямо в InTx. Для событий — только outbox. Для внешних вызовов — после успешного commit'а.
  • Держи tx коротким. Никаких time.Sleep, никакой долгой валидации.

Миграции

Инструмент и формат имён

Миграции управляются через golang-migrate. Каждая миграция — пара файлов:

migrations/
├── 001_init.up.sql
├── 001_init.down.sql
├── 002_add_device_id.up.sql
├── 002_add_device_id.down.sql
├── 003_backfill_device_id.up.sql
├── 003_backfill_device_id.down.sql
└── 004_device_id_not_null.up.sql
    004_device_id_not_null.down.sql
  • Нумерация трёхзначная, сплошная: 001, 002, … Не пропускай номера.
  • down.sql — обязателен. Даже если это просто комментарий «rollback вручную, потому что drop column невозможен» — файл должен существовать, чтобы migrate мог двигаться туда-обратно в тестах.
  • Имя — snake_case, описывает что делает, не почему: add_device_id, а не fix_login_bug.

Advisory lock

Каждая up-миграция начинается с advisory lock. Это предотвращает ситуацию, когда два пода одновременно стартуют и пытаются применить миграции:

-- 002_add_device_id.up.sql
BEGIN;
SELECT pg_advisory_xact_lock(hashtext('auth_migrations'));

ALTER TABLE auth.sessions ADD COLUMN device_id UUID;

COMMIT;

hashtext('<service>_migrations') — одинаковый для всех миграций одного сервиса, поэтому pod #2 подождёт, пока pod #1 не отпустит lock. Lock снимается автоматически при COMMIT/ROLLBACK.

Expand-contract

Для любых breaking-изменений (добавление обязательной колонки, переименование) применяем expand-contract в три миграции:

Миграция 1 — expand. Добавить новую колонку/таблицу как nullable, чтобы старый код продолжал работать:

-- 002_add_device_id.up.sql
BEGIN;
SELECT pg_advisory_xact_lock(hashtext('auth_migrations'));
ALTER TABLE auth.sessions ADD COLUMN device_id UUID;
COMMIT;

Миграция 2 — backfill. Заполнить значения для существующих строк:

-- 003_backfill_device_id.up.sql
BEGIN;
SELECT pg_advisory_xact_lock(hashtext('auth_migrations'));
UPDATE auth.sessions SET device_id = gen_random_uuid() WHERE device_id IS NULL;
COMMIT;

Миграция 3 — contract. Добавить constraint:

-- 004_device_id_not_null.up.sql
BEGIN;
SELECT pg_advisory_xact_lock(hashtext('auth_migrations'));
ALTER TABLE auth.sessions ALTER COLUMN device_id SET NOT NULL;
COMMIT;

Никогда не объединяй три шага в одну миграцию — это ломает rolling deploy.

Большие backfill'ы

Если backfill касается > 100k строк — не делай его в одной transaction. Вынеси в отдельный скрипт/миграцию с батч-UPDATE по 1000–10000 строк. Обязательно прогоняй EXPLAIN ANALYZE перед merge.

Правила именования в SQL

Таблицы — множественное число

CREATE TABLE auth.users (...);
CREATE TABLE review.reviews (...);
CREATE TABLE media.upload_sessions (...);

Не user, не review. Всегда users, reviews, upload_sessions.

Обязательные колонки времени

Каждая таблица, которую мы читаем/пишем из кода, имеет:

created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()

Исключение: чисто lookup-таблицы (справочники уровней и т.п.), которые никогда не обновляются — там updated_at опционален.

Soft-delete

Для сущностей, которые нельзя физически удалять (user'ы, reviews, adresses):

deleted_at TIMESTAMPTZ,

Все запросы, которые возвращают «живые» строки, включают WHERE deleted_at IS NULL. Индексы для этих запросов — частичные:

CREATE INDEX idx_users_active ON auth.users(id) WHERE deleted_at IS NULL;

Внешние ID — BIGINT без FK

Если поле ссылается на сущность из другого сервиса — храним как BIGINT без REFERENCES:

CREATE TABLE review.reviews (
    id       BIGSERIAL PRIMARY KEY,
    place_id BIGINT    NOT NULL,  -- из places-service, без FK
    user_id  BIGINT    NOT NULL,  -- из user-service, без FK
    ...
);

FK только внутри своего сервиса/схемы. Пересечение баз сервисов через FK — нельзя: это создаёт скрытую связность и ломает независимый деплой.

NOT NULL по умолчанию

Любая колонка NOT NULL, пока не доказано обратное. NULL — это отдельное семантическое состояние («значение неизвестно»), используй его только когда оно действительно нужно.

Индексы

  • Индекс под каждый WHERE-предикат, который встречается в проде более чем в одном запросе.
  • Индекс под JOIN-колонки.
  • Прогоняй EXPLAIN ANALYZE перед merge любой новой миграции с индексом или схемой:
EXPLAIN ANALYZE
SELECT r.* FROM review.reviews r
WHERE r.place_id = 42 AND r.deleted_at IS NULL
ORDER BY r.created_at DESC LIMIT 20;

Ищи Seq Scan на больших таблицах, Hash Join без индекса, дикие оценки rows — всё это повод добавить/поправить индекс.

  • Композитный индекс — когда у тебя запрос с AND по нескольким колонкам. Порядок колонок — самая селективная первой.
  • Частичный индекс (WHERE deleted_at IS NULL) — когда большинство строк не соответствует условию.

ErrNotFound и sentinel

Репозиторий никогда не возвращает pgx.ErrNoRows наружу:

if err != nil {
    if pkgdb.IsNoRows(err) {
        return nil, pkgdb.ErrNotFound
    }
    return nil, fmt.Errorf("get user %d: %w", id, err)
}

Service-слой импортирует только pkgdb.ErrNotFound или собственный sentinel (domain.ErrUserNotFound) — но не pgx.

Что не делать

  • Не используй database/sql — в проекте только pgx.
  • Не оборачивай pgx в ORM. Никакого gorm, ent, sqlx.
  • Не делай cross-schema JOIN между сервисами. Каждый сервис работает только со своей схемой.
  • Не делай внешних HTTP-вызовов внутри транзакции.
  • Не клади бизнес-логику в SQL-функции и триггеры. Всё в Go.

См. также