From 8b631580a41141aa6d02273a68cf6d0f8f6ff4df Mon Sep 17 00:00:00 2001 From: danielshih Date: Fri, 8 May 2026 01:22:05 +0000 Subject: [PATCH 1/4] feat: enhance online migration with auto-creation and cleanup of publications and slots - Added support for auto-creating publications during online migration if they do not exist. - Implemented cleanup of auto-created publications and replication slots after successful cutover. - Introduced CLI flags to control the auto-creation of publications and retention of replication slots. - Updated documentation to reflect changes in migration phases and resource management. - Added integration tests to verify the lifecycle of publications and slots during online migrations. --- .github/workflows/ci.yml | 42 ++ Agent.md | 609 +++++------------- README.md | 88 ++- examples/online_migration/src/main.rs | 2 + pg_dbmigrator/src/bin/pg_dbmigrator/args.rs | 56 ++ pg_dbmigrator/src/config.rs | 28 + pg_dbmigrator/src/dump.rs | 53 +- pg_dbmigrator/src/native_apply.rs | 88 +++ pg_dbmigrator/src/orchestrator.rs | 107 ++- pg_dbmigrator/src/preflight.rs | 102 +++ pg_dbmigrator/tests/postgres_integration.rs | 106 +++ tests/integration/run_all.sh | 2 + .../run_online_auto_pub_lifecycle.sh | 87 +++ tests/integration/run_online_keep_slot.sh | 67 ++ tests/integration/seed.sql | 3 +- 15 files changed, 957 insertions(+), 483 deletions(-) create mode 100755 tests/integration/run_online_auto_pub_lifecycle.sh create mode 100755 tests/integration/run_online_keep_slot.sh diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c82446c..eedca4d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -282,6 +282,46 @@ jobs: if: always() run: docker compose -f docker-compose.test.yml down -v + integration-online-auto-pub-lifecycle: + name: Integration test (online, auto-create publication + post-cutover cleanup) + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 + - uses: ./.github/actions/setup-integration + + - name: Run online auto-pub lifecycle e2e + run: bash tests/integration/run_online_auto_pub_lifecycle.sh + + - name: Show docker logs on failure + if: failure() + run: docker compose -f docker-compose.test.yml logs + + - name: Tear down + if: always() + run: docker compose -f docker-compose.test.yml down -v + + integration-online-keep-slot: + name: Integration test (online, keep-slot flag retains slot + pre-existing publication) + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 + - uses: ./.github/actions/setup-integration + + - name: Run online keep-slot e2e + run: bash tests/integration/run_online_keep_slot.sh + + - name: Show docker logs on failure + if: failure() + run: docker compose -f docker-compose.test.yml logs + + - name: Tear down + if: always() + run: docker compose -f docker-compose.test.yml down -v + publish: name: Publish to crates.io if: startsWith(github.ref, 'refs/tags/v') @@ -299,6 +339,8 @@ jobs: - integration-online-cancel-resume - integration-online-multi-resume-sustained - integration-online-sequence-sync + - integration-online-auto-pub-lifecycle + - integration-online-keep-slot runs-on: ubuntu-latest permissions: contents: write diff --git a/Agent.md b/Agent.md index 54b0b04..46ee76c 100644 --- a/Agent.md +++ b/Agent.md @@ -1,505 +1,232 @@ -# Agent.md — pg_dbmigrator Development Guide +# Agent.md — pg_dbmigrator -> This document is intended for both AI coding agents and human contributors. -> It captures the architecture, conventions, and non-negotiable rules for the -> `pg_dbmigrator` Rust PostgreSQL migration tool. -> **Read this file in full before modifying any source.** +> Read this before modifying source. Covers architecture, invariants, and rules. ---- +## 1. Architecture + +**Modes**: `Offline` (pg_dump → pg_restore) | `Online` (slot+snapshot → dump → restore → CREATE SUBSCRIPTION → WAL apply → cutover) + +**Pipeline**: `Validate → SourceVacuum → PrepareSnapshot* → Dump → Restore → Analyze → StreamApply* → Lag* → CaughtUp* → Cutover* → SourceCleanup* → Complete` +(`*` = online-only; SourceVacuum/Analyze skippable via `--skip-source-vacuum`/`--skip-analyze`) + +### Offline mode -## 1. Project Overview +1. Pre-flight: verify pg_dump/pg_restore on PATH, validate config +2. `VACUUM ANALYZE` on source (skip with `--skip-source-vacuum`) +3. `pg_dump` (directory format, parallel `--jobs`, `--compress=lz4:1`) +4. `pg_restore` — split-section by default (pre-data → data → post-data for index-free COPY) +5. `ANALYZE` on target (skip with `--skip-analyze`) +### Online mode -| Mode | Behaviour | -| --------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `Offline` | `pg_dump` → `pg_restore`. One-shot copy. Sequences are carried inside the dump, no extra step. | -| `Online` | Create a logical replication slot with `EXPORT_SNAPSHOT` → snapshot-consistent dump → restore → `CREATE SUBSCRIPTION` on target so PostgreSQL's apply worker streams WAL → on cutover, sync sequence values. | +1. **Validate**: pre-flight source (`wal_level=logical`, `max_replication_slots > 0`, `max_wal_senders > 0`); ensure publication exists (auto-create if missing) +2. **PrepareSnapshot**: `pg_walstream` creates replication slot with `EXPORT_SNAPSHOT` — snapshot kept alive by holding the stream connection open +3. **Dump**: `pg_dump --snapshot=` for a consistent baseline +4. **Restore**: `pg_restore` into target (same as offline) +5. **Analyze**: `ANALYZE` on target +6. **StreamApply**: orchestrator drops the pg_walstream stream connection, then issues `CREATE SUBSCRIPTION ... WITH (create_slot=false, slot_name='', enabled=true, copy_data=false)` on the target. PG's built-in apply worker streams WAL from `confirmed_flush_lsn`. +7. **Lag polling**: polls `pg_current_wal_flush_lsn()` on source every `poll_interval`, emits `Lag` heartbeat (lag_bytes, source_lsn, applied_lsn) +8. **CaughtUp**: when lag ≤ `lag_threshold_bytes`, one-shot advisory event fires +9. **Cutover**: operator presses Ctrl+C → sequence sync → source cleanup → done -### Apply path +### Cutover logic -After `pg_dump` / `pg_restore` the orchestrator drops the -[`pg_walstream`] stream connection (whose only job was to keep the exported -snapshot alive across the dump) and issues -`CREATE SUBSCRIPTION ... WITH (create_slot=false, slot_name='', -enabled=true, copy_data=false)` on the target. The pre-existing slot is -re-used — `create_slot=false` is the critical bit — and the built-in apply -worker streams WAL from the source's `confirmed_flush_lsn`. +1. First Ctrl+C → `CutoverHandle::request()` → apply loop exits on next poll +2. `ALTER SUBSCRIPTION ... DISABLE` + `DROP SUBSCRIPTION` (unless `--keep-subscription`) +3. `sync_sequences()` — setval() on all target sequences (PG logical replication doesn't replay nextval()) +4. Source cleanup: drop auto-created publication + drop slot (unless `--keep-slot`) +5. Return `MigrationOutcome::cutover_triggered() == true` +6. Second Ctrl+C → `CancellationToken` cancel (abort escape hatch) -`pg_walstream` is now used **only** as a slot-creation helper inside -[snapshot.rs](pg_dbmigrator/src/snapshot.rs). There is no longer an -in-process apply path; the previous `OnlineApplyEngine` enum and -`--apply-engine` CLI flag have been removed. +Cutover is **always operator-driven** — `lag_threshold_bytes` is purely advisory, never triggers cutover. -[`pg_walstream`]: https://github.com/isdaniel/pg-walstream +### Online resume flow + +`--resume` after interrupt: load token → skip dump/restore → `disable_target_subscription` → `wait_for_slot_inactive` → re-enable subscription in place (preserves replication origin) → apply resumes from slot's LSN. Supports multiple consecutive cancel+resume cycles without data loss. + +### Publication/subscription lifecycle + +- **Before dump**: if `auto_create_publication` (default true) and publication missing → `CREATE PUBLICATION FOR ALL TABLES` (or scoped to tables/schemas) +- **Apply phase**: subscription created by `run_native_apply`, torn down after cutover (unless `--keep-subscription`) +- **After cutover**: if pub was auto-created → `DROP PUBLICATION IF EXISTS`; if `drop_slot_on_cutover` (default true) → drop slot. All cleanup is best-effort (warnings only). +- CLI: `--no-auto-create-publication`, `--keep-slot`, `--keep-subscription` ### Workspace layout ``` -Cargo.toml # workspace root, central versions/deps -.cargo/config.toml # `cargo pg_dbmigrator` alias -crates/ - pg_dbmigrator/ # library crate (package: pg_dbmigrator) - pg_dbmigrator-cli/ # CLI crate (package: pg_dbmigrator-cli, bin: pg_dbmigrator) -examples/offline_migration/ # integration example for the library -examples/online_migration/ +Cargo.toml # workspace root (central versions/deps) +pg_dbmigrator/src/lib.rs # library entry +pg_dbmigrator/src/bin/pg_dbmigrator/{main,args}.rs # CLI +pg_dbmigrator/tests/postgres_integration.rs # Rust integration tests (needs PG) +examples/{offline,online}_migration/ # example binaries +tests/integration/ # shell e2e tests (Docker PG 17) +docker-compose.test.yml # source :55432, target :55433 ``` -> **Single source of truth.** Versions, edition, authors, and external -> dependency versions live in the workspace [Cargo.toml](Cargo.toml) under -> `[workspace.package]` / `[workspace.dependencies]`. Sub-crates reference -> them via `xxx.workspace = true`. **Do not** hard-code versions in sub-crate -> manifests. - -### Library module map - -| File | Responsibility | -| -------------------------------------------------------------------------- | --------------------------------------------------------------------------- | -| [pg_dbmigrator/src/lib.rs](pg_dbmigrator/src/lib.rs) | Crate entry point, re-exports, `#![deny]`/`#![warn]` lints | -| [pg_dbmigrator/src/config.rs](pg_dbmigrator/src/config.rs) | `MigrationConfig` / `EndpointConfig` / `OnlineOptions` and validation. Performance defaults: `split_sections`, `dump_compress`, `no_sync`, `no_comments`, `no_security_labels` | -| [pg_dbmigrator/src/error.rs](pg_dbmigrator/src/error.rs) | `MigrationError` (`thiserror`) + `Result` alias | -| [pg_dbmigrator/src/dump.rs](pg_dbmigrator/src/dump.rs) | `pg_dump` wrapper, `CommandRunner` trait, pure argv builder | -| [pg_dbmigrator/src/restore.rs](pg_dbmigrator/src/restore.rs) | `pg_restore` / `psql` wrapper | -| [pg_dbmigrator/src/analyze.rs](pg_dbmigrator/src/analyze.rs) | Pre-dump `VACUUM ANALYZE` on source + post-restore `ANALYZE` on target | -| [pg_dbmigrator/src/snapshot.rs](pg_dbmigrator/src/snapshot.rs) | Replication slot creation + exported snapshot retrieval | -| [pg_dbmigrator/src/native_apply.rs](pg_dbmigrator/src/native_apply.rs) | `CREATE SUBSCRIPTION` apply path + `pg_replication_slots` lag polling, `ApplyStats`, `parse_pg_lsn`, `wait_for_slot_inactive`, `disable_target_subscription` | -| [pg_dbmigrator/src/orchestrator.rs](pg_dbmigrator/src/orchestrator.rs) | `Migrator`, wires all stages together | -| [pg_dbmigrator/src/progress.rs](pg_dbmigrator/src/progress.rs) | `ProgressReporter` trait + Tracing/Collecting implementations | -| [pg_dbmigrator/src/preflight.rs](pg_dbmigrator/src/preflight.rs) | Pre-migration checks (target empty, source `wal_level=logical`, slot/sender capacity) | -| [pg_dbmigrator/src/sequences.rs](pg_dbmigrator/src/sequences.rs) | Source→target sequence value sync at cutover (closes the PG logical-replication sequence gap) | - -### Pipeline stages (`MigrationStage`) - -`Validate → SourceVacuum → PrepareSnapshot* → Dump → Restore → Analyze → StreamApply* → Lag* → CaughtUp* → Cutover* → Complete` -(stages marked `*` are Online-only; `SourceVacuum` and `Analyze` are -skippable via `--skip-source-vacuum` / `--skip-analyze`). Any new stage must -be added in **both** [progress.rs](pg_dbmigrator/src/progress.rs) (the enum) -and [orchestrator.rs](pg_dbmigrator/src/orchestrator.rs) / -[native_apply.rs](pg_dbmigrator/src/native_apply.rs) (the reporting site). - -### Online cutover model - -The customer drives cutover with **SIGINT (Ctrl+C)**, mirroring the Azure -DMS "Cut over" button: - -1. After `Restore`, `native_apply::run_native_apply` issues - `CREATE SUBSCRIPTION` on the target and polls - `pg_replication_slots.confirmed_flush_lsn` against - `pg_current_wal_flush_lsn()` on the source every - `CutoverConfig::poll_interval`. -2. Each poll emits a `Lag` heartbeat (`lag_bytes`, `source_lsn`, - `received_lsn`, `applied_lsn` in `detail`) — the operator's - continuous bytes-behind read-out. -3. When the lag drops at or below `CutoverConfig::lag_threshold_bytes`, a - one-shot `CaughtUp` event is emitted. -4. The CLI (`pg_dbmigrator/src/bin/pg_dbmigrator/main.rs`) installs a SIGINT handler that - calls `CutoverHandle::request()` on the first Ctrl+C. The apply loop - sees the request on its next poll, runs `ALTER SUBSCRIPTION ... DISABLE` - and (unless `--keep-subscription` is set) `DROP SUBSCRIPTION`, emits - `Cutover`, and `Migrator::run` returns with - `MigrationOutcome::cutover_triggered() == true`. -5. A second SIGINT cancels via the `CancellationToken` (escape hatch). See - `classify_sigint` in `pg_dbmigrator/src/bin/pg_dbmigrator/main.rs`. - -Cutover is **always operator-driven**: the apply loop never exits on -`CaughtUp` alone. The `lag_threshold_bytes` knob is purely advisory — -it controls when the one-shot `CaughtUp` event fires, never whether the -loop terminates. - -### Online resume flow (cancel + resume) - -When the migrator is interrupted (SIGTERM / crash) during the apply phase -and restarted with `--resume`: - -1. The orchestrator loads the resume token (dump+restore already marked - complete) and **skips** dump+restore entirely. -2. Instead of dropping the existing subscription (which would lose the - replication origin and cause duplicate-key violations on replay), it - calls `disable_target_subscription` — issuing `ALTER SUBSCRIPTION ... DISABLE` - on the target so the old apply worker releases the slot. -3. `wait_for_slot_inactive` polls `pg_replication_slots.active` on the - source until the slot is free (30 s timeout with 500 ms polling). -4. `run_native_apply` detects the subscription already exists and - re-enables it in place (`ALTER SUBSCRIPTION ... ENABLE`), preserving - the replication origin. The apply worker resumes from the slot's - `confirmed_flush_lsn` — no data loss, no conflicts. - -This flow supports **multiple consecutive cancel+resume cycles** without -data loss. Key invariant: the slot on the source is never dropped during -a cancel — only the subscription's apply worker is stopped. - -### Sequence sync at cutover (online only) - -PostgreSQL logical replication does **not** replicate sequence values — -the target's sequences stay frozen at whatever `pg_dump`/`pg_restore` -baked in. Without intervention, the first INSERT after cutover can -collide with rows the apply worker streamed from the source. - -[`sequences.rs`](pg_dbmigrator/src/sequences.rs) closes that gap: - -1. After the operator presses Ctrl+C and `run_native_apply` returns with - `cutover_triggered = true`, the orchestrator calls - `sync_sequences(source, target, schemas)`. -2. `collect_source_sequences` reads `pg_class` joined with - `pg_sequence_last_value(...)`; `apply_sequences_to_target` runs - `setval('"schema"."name"'::regclass, $1::bigint, true)` on the target - for each one. -3. Per-sequence failures are logged with `warn!` and counted, but never - abort the migration — a managed-PG role-permission issue should not - roll back an otherwise-successful cutover. The aggregate count is - emitted as a `Cutover` progress event. -4. The behaviour can be turned off via - `OnlineOptions.sync_sequences_on_cutover = false` (CLI flag - `--no-sequence-sync`). - -The SQL layer escapes both halves of the qualified name: `quote_ident` -for any embedded `"`, then `quote_literal` for any embedded `'` so the -resulting `'...'::regclass` string literal parses cleanly. - -### Filtering schemas / tables - -`MigrationConfig.exclude_schemas` and `exclude_tables` propagate to -`pg_dump` as `--exclude-schema=` / `--exclude-table=` arguments. Use -them to skip very large or transient tables (or schemas owned by -background services) that should not be part of the cutover. Online -mode still captures their changes via the slot if they are in the -publication, so combine with a narrower `pg_dbmigrator_pub` definition -for full coverage. - -### Performance defaults - -The migrator ships with aggressive-but-safe performance defaults that -reduce total migration time by 30–60 % compared to vanilla `pg_dump`/ -`pg_restore` without any user tuning: - -| Default | Effect | Override | -| ------------------------ | --------------------------------------------------- | -------------------------- | -| `split_sections = true` | Index-free COPY + parallel index rebuild | `--no-split-sections` | -| `dump_compress = lz4:1` | 3–5× dump size reduction, negligible CPU overhead | `--dump-compress none` | -| `no_sync = true` | Skip fsync on transient dump archive | `--keep-sync` | -| `no_comments = true` | Skip `COMMENT ON` — rarely needed post-migration | `--keep-comments` | -| `no_security_labels = true` | Skip SE-Linux labels | `--keep-security-labels` | -| `no_publications = true` | Skip publication DDL from source | `--keep-publications` | -| `no_subscriptions = true`| Skip subscription DDL from source | `--keep-subscriptions` | -| `jobs = min(cpu_count, 8)` | Parallel dump/restore up to 8 workers | `--jobs N` | -| `skip_source_vacuum = false` | Pre-dump VACUUM ANALYZE on source (clean pages, fresh stats) | `--skip-source-vacuum` | -| `skip_analyze = false` | Post-restore ANALYZE on target (fresh query-planner stats) | `--skip-analyze` | - -Optional opt-in flags: -- `--no-table-access-method` (PG 15+) — omit `USING ` from CREATE TABLE -- `--dump-compress zstd:3` — better ratio than lz4, slightly more CPU - -### Integration test coverage - -All integration tests live in `tests/integration/` as **shell scripts** (not -Rust integration tests). This is deliberate — the tests exercise the full -binary end-to-end against Docker-composed PostgreSQL 17 instances (source on -`:55432`, target on `:55433`). The Rust library contains only pure unit tests -that never require a running database. - -| Script | Scenario | -| ----------------------------------- | ---------------------------------------------------------------------------------------------- | -| `run_offline.sh` | Simple offline dump → restore | -| `run_offline_split_sections.sh` | Offline with `--split-sections` (pre-data/data/post-data) | -| `run_offline_resume.sh` | Offline cancel + resume (resume token skips completed stages) | -| `run_offline_sigint_cancel.sh` | SIGINT mid-pg_dump → fast cancellation | -| `run_online.sh` | Full online: seed → dump → restore → apply → two CaughtUp transitions → cutover | -| `run_online_updates.sh` | Online with UPDATE/DELETE/INSERT during dump+restore window, cutover, data equality check | -| `run_online_sustained.sh` | 60s sustained mutations during apply, pre-cutover equality gate, post-cutover stability | -| `run_online_lag_cadence.sh` | Adaptive fast/slow poll cadence near catch-up | -| `run_online_cancel_resume.sh` | Cancel mid-apply (SIGTERM) + resume (skip dump/restore, re-attach slot, data equality) | -| `run_online_multi_resume_sustained.sh` | 2× cancel+resume + 30s sustained mutations + pre-cutover equality + cutover | -| `run_online_sequence_sync.sh` | Sequence sync at cutover (no post-cutover PK collision) | +### Module map + +| Module | Role | +|--------|------| +| `config.rs` | `MigrationConfig`/`OnlineOptions`, validation, performance defaults | +| `error.rs` | `MigrationError` (thiserror) + `Result` | +| `dump.rs` | pg_dump wrapper, `CommandRunner` trait, argv builder, stderr progress parsing | +| `restore.rs` | pg_restore/psql wrapper | +| `analyze.rs` | Pre-dump VACUUM ANALYZE + post-restore ANALYZE | +| `snapshot.rs` | Replication slot creation + exported snapshot | +| `native_apply.rs` | CREATE SUBSCRIPTION, lag polling, `ApplyStats`, `drop_source_{publication,slot}` | +| `orchestrator.rs` | `Migrator` — wires all stages | +| `progress.rs` | `ProgressReporter` trait + Tracing/Collecting impls | +| `preflight.rs` | Source validation, `ensure_publication_exists` (auto-create) | +| `sequences.rs` | Source→target setval at cutover | +| `resume.rs` | Resume token persistence | +| `tls.rs` | TLS-aware connection helper | + +### Critical invariants + +1. **Slot before dump**: `prepare_replication_slot` MUST run before pg_dump. `START_REPLICATION` MUST be deferred until AFTER dump completes (else snapshot is invalidated). +2. **Cutover is operator-driven**: apply loop NEVER exits on CaughtUp alone. +3. **Resume preserves replication origin**: subscription is disabled/re-enabled (not dropped/recreated) to avoid duplicate-key violations. +4. **Sequence sync at cutover**: migrator runs setval() on all target sequences after cutover. +5. **Publication lifecycle**: auto-created publications are tracked (`pub_auto_created`) and dropped after cutover; pre-existing ones are never dropped. --- ## 2. Design Principles (mandatory reading) 1. **Clean library / CLI separation** - - The library only returns `pg_dbmigrator::Result` (`MigrationError`). - - Only the CLI is allowed to use `anyhow`; it converts library errors into - terminal output and exit codes. - - **No** `unwrap()` / `expect()` / `panic!()` in production code paths. - Panics are reserved for true invariant violations (i.e. bugs). + - Library returns `pg_dbmigrator::Result` (`MigrationError`) only. + - Only CLI/examples may use `anyhow`. + - No `unwrap()`/`expect()`/`panic!()` in production paths. 2. **Side effects vs. pure logic** - - Anything that spawns a process or opens a socket must sit behind a - trait (e.g. `CommandRunner`, `ProgressReporter`). - - Pure functions (`build_pg_dump_args`, `build_pg_restore_args`, - `make_create_subscription_sql`, `parse_pg_lsn`) **must not** perform I/O so - that unit tests can validate them without PostgreSQL. + - Anything spawning a process or opening a socket sits behind a trait (`CommandRunner`, `ProgressReporter`). + - Pure functions (`build_pg_dump_args`, `build_pg_restore_args`, `make_create_subscription_sql`, `parse_pg_lsn`) must not perform I/O — unit-testable without PostgreSQL. 3. **Configuration as data** - - All `*Config` structs derive `Serialize + Deserialize + Clone + Debug`. - - Cross-field invariants live in `validate(&self) -> Result<()>`; - `Migrator::run` calls `config.validate()` first. - - When adding optional fields, provide a `Default` so existing call sites - keep working with `..Default::default()`. + - All `*Config` structs: `Serialize + Deserialize + Clone + Debug`. + - Cross-field invariants in `validate(&self) -> Result<()>`. + - New optional fields must have a `Default`. 4. **Cancellation everywhere** - - Every long-running async function takes a `CancellationToken` and - checks `cancel.is_cancelled()` before each loop iteration. - - Cancellation is part of the success path; if you must return an error - for it, use `MigrationError::Cancelled` so the upper layer can detect it. + - Every long-running async function takes `CancellationToken`, checks `cancel.is_cancelled()` before each iteration. + - Use `MigrationError::Cancelled` for cancellation errors. 5. **Connection-string secrecy** - - `EndpointConfig` keeps the original libpq URI verbatim. **Always** call - `endpoint.redacted()` before logging it (the password is masked). - - When you add new fields that may carry secrets, extend `redacted()` and - add a unit test for it. - -6. **Contract with `pg_walstream`** - - Slot creation (`prepare_replication_slot`) must happen **before** - `pg_dump`. `START_REPLICATION` must only be called **after** the dump - completes — otherwise the exported snapshot is invalidated. - The orchestrator already follows this order; do not reshuffle it. - - We pin `pg_walstream = 0.6.2` via a path dep to the sibling workspace - `../pg-walstream`. Before bumping the version, verify compatibility of - the `ChangeEvent` and `EventType` enums. + - Always call `endpoint.redacted()` before logging. + - Passwords via `PGPASSWORD` env var (never in argv where `ps` exposes them). + +6. **Contract with pg_walstream** + - Slot creation before dump. START_REPLICATION only after dump. + - Pinned at `0.6.2` via path dep to `../pg-walstream`. Verify `ChangeEvent`/`EventType` compatibility before bumping. 7. **No unsolicited optimisation** - - Do not silently turn sync APIs into async, swap `Vec` for - `SmallVec`, or pull in new crates. **Make only the changes the task - requires (or that correctness requires).** + - Make only the changes the task requires. No silent async conversions, container swaps, or new crate additions. --- ## 3. Code Style -> Core Rust style guidance lives in [`rust-skills`] / [`rust-async`] / -> [`rust-docs`]. The rules below are **specific to this project**. - -[`rust-skills`]: https://example.invalid -[`rust-async`]: https://example.invalid -[`rust-docs`]: https://example.invalid - -### 3.1 Crate-level lints - -[lib.rs](pg_dbmigrator/src/lib.rs#L42-L43) sets: - -```rust -#![deny(rust_2018_idioms)] -#![warn(missing_debug_implementations)] -``` - -When adding a new `pub` type: -- Default to `#[derive(Debug, Clone)]` unless there is a concrete reason not - to (trait object, contains `tokio_postgres::Client`, etc.). -- If the type contains a non-`Debug` member (e.g. `LogicalReplicationStream`), - attach `#[allow(missing_debug_implementations)]` and explain why in a doc - comment (see - [snapshot.rs](pg_dbmigrator/src/snapshot.rs#L26-L29)). - -### 3.2 Error handling - -- Library errors always go through - [`MigrationError`](pg_dbmigrator/src/error.rs#L17): - - String-bearing variants must be built via the helpers - `MigrationError::config(...)`, `::external(...)`, `::apply(...)`. Do not - construct the enum directly. - - When introducing a new error category, add a variant **and** a helper - **and** a unit test. -- CLI/example code uses `anyhow::Result` plus `.context("...")`. -- **Never** `unwrap()` in production paths. Tests may use it for brevity. - -### 3.3 Logging - -- Use `tracing` macros (`info!` / `debug!` / `warn!` / `error!`). - **Forbidden**: `println!` / `eprintln!` (the only exception is `--help`, - which clap prints itself). -- Prefer structured fields: - `info!(slot = %opts.slot_name, "preparing slot")` over baked-in formatting. -- Connection strings must be passed through `redacted()` before they reach - any log line. - -### 3.4 Async - -- The whole library targets `tokio` as the runtime; tests use - `#[tokio::test]` (with `(flavor = "multi_thread")` when needed). -- Public traits with async methods use `#[async_trait]` (already in - workspace deps). -- Long-running loops, `select!`, and waits must be cancellable via the - `CancellationToken`. **Never** call `std::thread::sleep`. - -### 3.5 Naming and module organisation - -- One responsibility per module. Each file starts with a `//!` doc comment - describing **why the module exists** and **what it deliberately does not - do**. See [native_apply.rs](pg_dbmigrator/src/native_apply.rs#L1-L19) for the pattern. -- File ordering: `use` → public types → public functions → private helpers - → `#[cfg(test)] mod tests`. -- Do not introduce `mod.rs`-style folders; keep the flat `lib.rs` + sibling - files layout. - -### 3.6 Doc comments - -- Every `pub` item has a `///` comment that covers purpose, required call - ordering, and edge cases. -- Configuration fields are documented **per field** (see - [config.rs](pg_dbmigrator/src/config.rs#L11-L33)). -- Crate- and module-level docs include an `# Examples` block, marked - `no_run` so doc-tests do not actually connect to PostgreSQL (see - [lib.rs](pg_dbmigrator/src/lib.rs#L19-L37)). +- **Logging**: `tracing` structured fields (`info!(slot = %name, "msg")`). No `println!`/`eprintln!`. +- **Types**: `#[derive(Debug, Clone)]` on all `pub` types unless impossible (trait objects, etc.). +- **Errors**: use `MigrationError::config()`/`::external()`/`::apply()` helpers. Never construct enum directly. +- **Async**: tokio runtime, `#[async_trait]`, `CancellationToken` in all loops. Never `std::thread::sleep`. +- **SQL safety**: `quote_ident`/`quote_literal` from pg_walstream. No string concatenation for values. +- **Module layout**: `use` → public types → public functions → private helpers → `#[cfg(test)] mod tests`. +- **Doc comments**: every `pub` item gets `///` covering purpose, call ordering, edge cases. --- -## 4. Testing Discipline - -- **Every `.rs` file must have a `#[cfg(test)] mod tests`**, even if it - only holds two or three cases. -- **All new and modified code must have unit tests.** The project enforces - a minimum **85 % code coverage** via Codecov. Before submitting, verify - that your changes do not drop the overall coverage below this threshold. -- Test naming convention: `_`, e.g. - `validate_rejects_zero_jobs`, - `build_args_includes_jobs_only_for_directory_format`. -- Do not depend on a real PostgreSQL instance: - - Use `RecordingRunner` for dump/restore tests (see - [orchestrator.rs](pg_dbmigrator/src/orchestrator.rs#L246-L276)). - - Use `CollectingReporter` for progress tests - ([progress.rs](pg_dbmigrator/src/progress.rs#L83)). - - Use `StaticLagProvider` for lag-loop cadence tests. -- Modifying any pure function (`build_pg_dump_args`, - `build_pg_restore_args`, `make_create_subscription_sql`, `parse_pg_lsn`) - **requires** updating or adding the corresponding tests in the same PR. -- Integration scenarios live in `tests/integration/` as shell scripts; - and smoke tests and may connect to a real database. -- **After all code changes, you MUST verify that both unit tests AND - integration tests pass before considering the task complete:** - ```bash - # Unit tests (must pass without a running PostgreSQL) - cargo test --workspace - - # Integration tests (requires Docker) - make integration - ``` - -### Test commands +## 4. Testing + +- Every `.rs` file has `#[cfg(test)] mod tests` +- Min **85% code coverage** (Codecov enforced) +- Test naming: `_` (e.g. `validate_rejects_zero_jobs`) +- No real PG in unit tests — use `RecordingRunner`, `CollectingReporter`, `StaticLagProvider` +- Integration tests: shell scripts in `tests/integration/`, registered in `run_all.sh` and `.github/workflows/ci.yml` + +### Integration tests + +| Script | Scenario | +|--------|----------| +| `run_offline.sh` | Simple dump → restore | +| `run_offline_split_sections.sh` | pre-data/data/post-data phases | +| `run_offline_resume.sh` | Cancel + resume token | +| `run_offline_sigint_cancel.sh` | SIGINT mid-dump → fast cancel | +| `run_offline_analyze.sh` | VACUUM ANALYZE + ANALYZE | +| `run_online.sh` | Full online: two CaughtUp + cutover | +| `run_online_updates.sh` | DML during dump+restore | +| `run_online_sustained.sh` | 60s mutations + equality gate | +| `run_online_lag_cadence.sh` | Adaptive poll cadence | +| `run_online_cancel_resume.sh` | Cancel mid-apply + resume | +| `run_online_multi_resume_sustained.sh` | 2× cancel+resume + mutations | +| `run_online_sequence_sync.sh` | Sequence sync (no PK collision) | +| `run_online_auto_pub_lifecycle.sh` | Auto-create pub + post-cutover cleanup | +| `run_online_keep_slot.sh` | keep-slot flag + pre-existing pub retained | + +### Verify before commit ```bash -# Whole workspace -cargo test --workspace - -# Library only -cargo test -p pg_dbmigrator - -# With logs -RUST_LOG=debug cargo test -p pg_dbmigrator -- --nocapture +cargo fmt --all +cargo clippy --workspace --all-targets -- -D warnings +cargo test --workspace # unit tests, no PG required +make integration # e2e, requires Docker ``` -> `cargo test --workspace` must pass **without** a running PostgreSQL. -> If a new test genuinely needs a live database, move it under examples/ -> or mark it `#[ignore]`. - -### CI pipeline - -`.github/workflows/ci.yml` runs on every push/PR: - -1. **build-and-unit-test** — fmt, clippy, build, `cargo test --workspace` -2. **integration-offline** (+ split-sections, resume, SIGINT cancel variants) -3. **integration-online** (+ updates, sustained, lag-cadence, cancel-resume, multi-resume, sequence-sync) -4. **publish** — gated on tag `v*.*.*`, publishes `pg_dbmigrator` then `pg_dbmigrator-cli` to crates.io - -All integration jobs use `.github/actions/setup-integration` which starts -the Docker Compose stack (`docker-compose.test.yml`: source PG 17 on `:55432`, -target PG 17 on `:55433`) and installs PostgreSQL client tools. - --- ## 5. Standard Workflow for New Features -When you receive a new requirement, work in this order: - -1. **Locate the affected module** using the module map in §1. -2. **Start with config** — extend `MigrationConfig` / `OnlineOptions`, - provide `Default`, add `validate()` rules, add unit tests. -3. **Then update pure functions** — e.g. teach `build_pg_dump_args` about - the new field; add tests for each new branch. -4. **Wire the orchestrator last** — call the new logic from - `Migrator::run_offline` / `run_online` and emit progress events via - `self.report(stage, message)`. -5. **Mirror it in the CLI** — add `#[arg(...)]` in - [args.rs](pg_dbmigrator/src/bin/pg_dbmigrator/args.rs) and map it in `into_config`. Use - kebab-case for flag names. -6. **Add unit tests for all new/modified code** — ensure coverage stays - above **85 %**. Every new public function, config field, and CLI flag - must have at least one dedicated test. -7. **Add or update integration tests** — if the feature changes end-to-end - behaviour, add a script in `tests/integration/` following existing - patterns, register it in `run_all.sh`, and add a CI job in - `.github/workflows/ci.yml`. -8. **Update README + examples** if user-visible behaviour changes. -9. **Run lint and ALL tests — nothing ships until green**: - ```bash - cargo fmt --all - cargo clippy --workspace --all-targets -- -D warnings - cargo test --workspace - make integration # requires Docker - ``` +1. **Config** — extend `MigrationConfig`/`OnlineOptions` + `Default` + `validate()` + unit tests +2. **Pure functions** — update argv/SQL builders + tests for each branch +3. **Orchestrator** — wire logic in `Migrator::run_offline`/`run_online`, emit progress events via `self.report(stage, message)` +4. **CLI** — add `#[arg(...)]` in args.rs, map in `into_config` (kebab-case flags) + tests +5. **Unit tests** — ≥85% coverage for all new/modified code +6. **Integration tests** — add script in `tests/integration/`, register in `run_all.sh` + CI job in `.github/workflows/ci.yml` +7. **Documentation** — update README + examples if user-visible behaviour changes +8. **Validate** — run full lint+test suite (see §4) --- ## 6. Hard Rules (do-not list) -- ❌ Do not pull `anyhow` into the library (CLI/examples only). -- ❌ Do not `unwrap()` / `expect()` / `panic!()` in production paths. -- ❌ Do not `println!` to stdout/stderr; use `tracing`. -- ❌ Do not log passwords or full connection strings. -- ❌ Do not bypass `CommandRunner` and call `tokio::process::Command` - directly. -- ❌ Do not invoke `START_REPLICATION` before `pg_dump` finishes (it would - invalidate the snapshot). -- ❌ Do not hard-code external crate versions in sub-crate manifests. -- ❌ Do not “tidy up” unrelated code while doing your task. -- ❌ Do not add a new dependency without checking license, maintenance - status, and whether the workspace already provides an equivalent. - ---- - -## 7. Security Considerations - -- Connection strings may contain passwords: any serialisation, log line, or - error message must go through `redacted()`. -- `pg_dump` / `pg_restore` are external processes. We pass the password via - the `PGPASSWORD` environment variable (see - [dump.rs](pg_dbmigrator/src/dump.rs#L165-L172)). **Do not** put it in - argv where `ps` could expose it. -- SQL injection surface: `native_apply` and `sequences` use `quote_ident` - / `quote_literal` from `pg_walstream` for identifiers and string - literals. **Do not** switch to string concatenation for values. -- Cancellation means the user wants to stop. After cancel, do not send - feedback, write files, or take other actions that could leave partial - state behind. +- ❌ No `anyhow` in library (CLI/examples only) +- ❌ No `unwrap()`/`expect()`/`panic!()` in production paths +- ❌ No `println!`/`eprintln!` — use `tracing` +- ❌ No logging passwords or full connection strings (use `redacted()`) +- ❌ No bypassing `CommandRunner` for direct `tokio::process::Command` +- ❌ No `START_REPLICATION` before pg_dump finishes +- ❌ No hard-coded crate versions in crate manifest (use workspace deps) +- ❌ No unrelated cleanup in task PRs +- ❌ No new deps without license/maintenance/equivalence check --- -## 8. Change-management Checklist (before commit / PR) +## 7. Change-management Checklist (before commit / PR) - [ ] `cargo fmt --all` clean - [ ] `cargo clippy --workspace --all-targets -- -D warnings` clean - [ ] `cargo test --workspace` green (all unit tests pass) -- [ ] `make integration` green (all integration tests pass; requires Docker) -- [ ] Unit tests added for all new and modified code (target ≥ 85 % coverage) +- [ ] `make integration` green (requires Docker) +- [ ] Unit tests added for all new/modified code (≥ 85% coverage) - [ ] Every modified or new `pub` API has `///` documentation - [ ] Pure functions you changed have matching tests -- [ ] If config/CLI behaviour changed, README, examples, and this file are - updated -- [ ] No un-redacted connection strings or passwords appear in logs +- [ ] If config/CLI behaviour changed → update README, examples, and this file +- [ ] No un-redacted connection strings or passwords in logs --- -## 9. Tips for AI Agents - -- This is a **small, self-contained** Rust workspace. Before editing, - use semantic search and grep to confirm the blast radius, then read full - files for context. -- Read the entire file before editing it (most files are 200–400 lines and - fit in a single read). -- After editing, validate with the editor's diagnostics and run the - matching `cargo test` subset. -- If asked to add a new stage / new mode, update **all** of: - the `MigrationStage` enum, the `Migrator` entry point, the CLI args, - the README, and this file. -- If asked to change `pg_walstream` behaviour, note that the crate lives in - a sibling workspace (`../pg-walstream`) and is a separate project. - **Do not** vendor or fork its source into this repo; propose the change - upstream instead. +## 8. Tips for AI Agents + +- **Small workspace** — most files are 200–400 lines. Read the entire file before editing. Use grep to confirm blast radius. +- **Validate after editing** — run `cargo test -p pg_dbmigrator` after every change. +- **New stage / mode** → must update ALL of: `MigrationStage` enum, `Migrator` entry point, CLI args, README, and this file. +- **pg_walstream** lives in sibling workspace (`../pg-walstream`). Pinned at `0.6.2` via path dep. Do NOT vendor/fork — propose changes upstream. +- **Library/CLI boundary** — library returns `pg_dbmigrator::Result` only. `anyhow` is CLI/examples only. +- **Versions** — all dependency versions live in workspace `Cargo.toml` under `[workspace.dependencies]`. Crate manifests use `xxx.workspace = true`. + +--- + +## 9. CI Pipeline + +Jobs: `build-and-unit-test` → per-script integration jobs (parallel) → `publish` (on tag `v*.*.*`) +All integration jobs use `.github/actions/setup-integration` (Docker Compose + PG 17 client tools). diff --git a/README.md b/README.md index 6c41df2..b2487e2 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ keeps PostgreSQL's built-in logical replication apply worker pulling from the source so the operator can cut over with near-zero downtime. The online path issues `CREATE SUBSCRIPTION` on the target attached to a -slot we created with `EXPORT_SNAPSHOT` before `pg_dump` ran. +slot we created with `EXPORT_SNAPSHOT` before `pg_dump` ran. ## Modes @@ -24,9 +24,14 @@ slot we created with `EXPORT_SNAPSHOT` before `pg_dump` ran. ### Online migration phases ``` -Validate → SourceVacuum → PrepareSnapshot → Dump → Restore → Analyze → StreamApply → (Lag heartbeat …) → CaughtUp → Cutover → Complete +Validate → SourceVacuum → PrepareSnapshot → Dump → Restore → Analyze → StreamApply → (Lag heartbeat …) → CaughtUp → Cutover → SourceCleanup → Complete ``` +* `Validate` pre-flights the source (`wal_level = 'logical'`, + `max_replication_slots > 0`, `max_wal_senders > 0`) and ensures the + required publication exists — auto-creating it if missing (see + [Publication lifecycle](#publication--replication-resource-lifecycle) + below). * `SourceVacuum` runs `VACUUM ANALYZE` on the source to reclaim dead tuples and refresh planner statistics before the dump. Skip with `--skip-source-vacuum`. * `PrepareSnapshot` creates the replication slot first; `START_REPLICATION` @@ -41,13 +46,15 @@ Validate → SourceVacuum → PrepareSnapshot → Dump → Restore → Analyze signal the customer watches to decide when to cut over. * When the lag drops at or below `--lag-threshold-bytes` a one-shot `CaughtUp` event is emitted (“ready for cutover”). +* `SourceCleanup` (after cutover) drops auto-created publications and + replication slots on the source — see the next section. ## Install / build ```bash # Install the CLI from source. Produces a binary called `pg_dbmigrator`. cargo install pg_dbmigrator -pg_dbmigrator --help +pg_dbmigrator --help pg_dbmigrator --mode offline --source '…' --target '…' --jobs 4 ``` @@ -75,9 +82,19 @@ On the source, before starting: ```sql ALTER SYSTEM SET wal_level = 'logical'; -- requires restart -CREATE PUBLICATION pg_dbmigrator_pub FOR ALL TABLES; ``` +The publication is auto-created by the migrator if it does not already +exist (default `FOR ALL TABLES`). If you prefer to create it manually — +e.g. to publish only specific tables — run: + +```sql +CREATE PUBLICATION pg_dbmigrator_pub FOR TABLE my_schema.t1, my_schema.t2; +``` + +pass `--no-auto-create-publication` so the migrator uses the existing +one and does not attempt to create or drop it. + ```bash pg_dbmigrator \ --mode online \ @@ -91,11 +108,6 @@ pg_dbmigrator \ --cutover-poll-secs 5 ``` -The library creates a subscription called `--subscription-name` (default -`pg_dbmigrator_sub`) on the target attached to the existing slot. On cutover -the subscription is disabled and, unless `--keep-subscription` is set, -dropped. - Before the dump runs, the migrator pre-flights the source: `wal_level = 'logical'`, `max_replication_slots > 0`, and `max_wal_senders > 0`. A misconfigured source fails fast with a clear @@ -119,6 +131,33 @@ pg_dbmigrator --mode offline \ --exclude-table public.large_log ``` +## Publication / replication resource lifecycle + +The migrator fully manages the lifecycle of the replication resources it +creates, so the operator does not need to run manual cleanup SQL after a +successful cutover. + +| Resource | Created by | Cleaned up at cutover | Override | +|---|---|---|---| +| Publication on source | Auto-created if missing (default) | Dropped only if it was auto-created | `--no-auto-create-publication` | +| Replication slot on source | Always created by the migrator | Dropped by default | `--keep-slot` | +| Subscription on target | Always created by the migrator | Dropped by default | `--keep-subscription` | + +**Auto-create publication**: By default, the migrator checks whether the +named publication (`--publication`, default `pg_dbmigrator_pub`) exists on +the source. If it does not, the migrator creates it as `FOR ALL TABLES` +(or scoped to `--table` / `--schema` if specified). Auto-created +publications are tracked and dropped on the source after a successful +cutover. Pre-existing publications are never dropped. + +**Slot cleanup**: After cutover, the replication slot on the source is no +longer needed. By default the migrator drops it. Pass `--keep-slot` if +you need to inspect the slot post-migration or if another consumer +shares it. + +All cleanup steps are best-effort — failures are logged as warnings but +do not abort the migration. + ## Cutover (online mode) Cutover is driven by `SIGINT` (Ctrl+C). The CLI prints a periodic `Lag` heartbeat after the dump completes, so the operator has a continuous bytes-behind read-out: @@ -135,19 +174,38 @@ When the customer is satisfied with the lag, they press **Ctrl+C** once: * The streaming apply loop notices the request on its next poll, flushes the last LSN feedback to the source, emits a `Cutover` event, and returns. -* `Migrator::run` returns with `MigrationOutcome::cutover_triggered() - == true`. The process exits cleanly. Application traffic can now be - switched to the target. +* The migrator syncs sequences, cleans up replication resources (publication, + slot, subscription), and returns with + `MigrationOutcome::cutover_triggered() == true`. The process exits + cleanly. Application traffic can now be switched to the target. * A second Ctrl+C is treated as an abort (escape hatch — only use it if the graceful path is stuck). -Cutover is always operator-driven; `--lag-threshold-bytes` is purely advisory and only controls when the one-shot `CaughtUp` "ready for cutover" event fires. +Cutover is always operator-driven; `--lag-threshold-bytes` is purely advisory and only controls when the one-shot `CaughtUp` “ready for cutover” event fires. For online migrations, hold on to `migrator.cutover_handle()` and call `request()` from your own signal handler / RPC endpoint when the operator is ready to cut over. See [`examples/online_migration`](examples/online_migration) for a complete program that wires Ctrl+C to the cutover handle. +## Performance defaults + +The CLI ships with sensible defaults tuned for migration speed. Override +only when you have a specific reason. + +| Default | Flag to override | Effect | +|---|---|---| +| Split-section restore | `--no-split-sections` | Bulk COPY without index maintenance, then rebuild indexes in parallel. 30-60% faster on index-heavy schemas. | +| `lz4:1` dump compression | `--dump-compress ` | Negligible CPU, 3-5x smaller archive. Use `zstd:3` for better ratio, `none` to disable. | +| `--no-sync` on dump | `--keep-sync` | Skip fsync on transient dump files. | +| `--no-comments` | _(not exposed)_ | Omit COMMENT ON statements from dump. | +| `--no-security-labels` | _(not exposed)_ | Omit SE-Linux security labels from dump. | +| `--no-publications` | `--keep-publications` | Don't dump publication definitions to the target. | +| `--no-subscriptions` | `--keep-subscriptions` | Don't dump subscription definitions to the target. | +| Auto-detect `--jobs` | `--jobs N` | Clamps to `[1, 8]` based on host CPU count. | +| Pre-dump `VACUUM ANALYZE` | `--skip-source-vacuum` | Clean heap pages + fresh stats before dump. | +| Post-restore `ANALYZE` | `--skip-analyze` | Fresh planner stats on target immediately after restore. | + ## Benchmark -See [BENCHMARK.md](BENCHMARK.md) for migration performance results across 10 GB – 200 GB datasets (PG 16 → PG 18, 8 parallel jobs, zstd compression). +See [BENCHMARK.md](BENCHMARK.md) for migration performance results across 10 GB -- 200 GB datasets (PG 16 -> PG 18, 8 parallel jobs, zstd compression). ## Known limitations @@ -156,7 +214,7 @@ See [BENCHMARK.md](BENCHMARK.md) for migration performance results across 10 GB * DDL changes are not migrated automatically — refresh the publication and restart the migration if the schema changes during the run. * Extensions whose internal state cannot be re-created on the target - (Azure-reserved extensions, pg_cron metadata, …) may cause + (Azure-reserved extensions, pg_cron metadata, ...) may cause `pg_restore` to exit with code 1. Pass `--allow-restore-errors` to treat that as a non-fatal warning when user data was restored successfully. diff --git a/examples/online_migration/src/main.rs b/examples/online_migration/src/main.rs index e7b1d65..faebead 100644 --- a/examples/online_migration/src/main.rs +++ b/examples/online_migration/src/main.rs @@ -90,6 +90,8 @@ async fn main() -> Result<()> { drop_subscription_on_cutover: !env_flag("PG_DBMIGRATOR_KEEP_SUBSCRIPTION"), force_clean: env_flag("PG_DBMIGRATOR_FORCE_CLEAN"), sync_sequences_on_cutover: !env_flag("PG_DBMIGRATOR_NO_SEQUENCE_SYNC"), + auto_create_publication: !env_flag("PG_DBMIGRATOR_NO_AUTO_CREATE_PUB"), + drop_slot_on_cutover: !env_flag("PG_DBMIGRATOR_KEEP_SLOT"), apply: ReplicationApplyConfig { feedback_interval: Duration::from_secs(env_secs("PG_DBMIGRATOR_FEEDBACK_SECS", 5)), connection_timeout: Duration::from_secs(15), diff --git a/pg_dbmigrator/src/bin/pg_dbmigrator/args.rs b/pg_dbmigrator/src/bin/pg_dbmigrator/args.rs index 1126af3..ce7c7c7 100644 --- a/pg_dbmigrator/src/bin/pg_dbmigrator/args.rs +++ b/pg_dbmigrator/src/bin/pg_dbmigrator/args.rs @@ -100,6 +100,14 @@ pub struct Cli { #[arg(long)] pub no_sequence_sync: bool, + /// Disable auto-creation of the publication on the source. By default the migrator creates the publication automatically if it does not exist. Pass this flag to require the operator to create it manually before running the migration. Online mode only. + #[arg(long)] + pub no_auto_create_publication: bool, + + /// Keep the replication slot on the source after cutover (default: drop it). Online mode only. + #[arg(long)] + pub keep_slot: bool, + /// pgoutput protocol version. #[arg(long, default_value_t = 2)] pub protocol_version: u32, @@ -285,6 +293,8 @@ impl Cli { drop_subscription_on_cutover: !self.keep_subscription, force_clean: self.force_clean, sync_sequences_on_cutover: !self.no_sequence_sync, + auto_create_publication: !self.no_auto_create_publication, + drop_slot_on_cutover: !self.keep_slot, apply, cutover: CutoverConfig { poll_interval: std::time::Duration::from_secs(self.cutover_poll_secs), @@ -730,4 +740,50 @@ mod tests { assert!(!cfg.skip_analyze); assert!(!cfg.skip_source_vacuum); } + + #[test] + fn into_config_no_auto_create_publication() { + let cli = parse_args(&[ + "pg_dbmigrator", + "--mode", + "online", + "--source", + "postgresql://u@src/db", + "--target", + "postgresql://u@dst/db", + "--no-auto-create-publication", + ]); + let cfg = cli.into_config().unwrap(); + assert!(!cfg.online.auto_create_publication); + } + + #[test] + fn into_config_keep_slot() { + let cli = parse_args(&[ + "pg_dbmigrator", + "--mode", + "online", + "--source", + "postgresql://u@src/db", + "--target", + "postgresql://u@dst/db", + "--keep-slot", + ]); + let cfg = cli.into_config().unwrap(); + assert!(!cfg.online.drop_slot_on_cutover); + } + + #[test] + fn into_config_defaults_auto_create_publication_and_drop_slot() { + let cli = parse_args(&[ + "pg_dbmigrator", + "--source", + "postgresql://u@src/db", + "--target", + "postgresql://u@dst/db", + ]); + let cfg = cli.into_config().unwrap(); + assert!(cfg.online.auto_create_publication); + assert!(cfg.online.drop_slot_on_cutover); + } } diff --git a/pg_dbmigrator/src/config.rs b/pg_dbmigrator/src/config.rs index 7bbb3e5..a6f35aa 100644 --- a/pg_dbmigrator/src/config.rs +++ b/pg_dbmigrator/src/config.rs @@ -392,6 +392,12 @@ pub struct OnlineOptions { /// out-of-band sequence sync (e.g. application-level UUIDs). #[serde(default = "default_true")] pub sync_sequences_on_cutover: bool, + /// If `true` (default), automatically create the publication on the source if it does not already exist. The publication will be created as `FOR ALL TABLES` (or scoped by `MigrationConfig::tables` / `MigrationConfig::schemas` if set). Publications auto-created by the migrator are tracked and dropped after cutover. + #[serde(default = "default_true")] + pub auto_create_publication: bool, + /// If `true` (default), drop the replication slot on the source after a successful cutover. The slot is no longer needed once the target has caught up and the subscription is torn down. + #[serde(default = "default_true")] + pub drop_slot_on_cutover: bool, /// Configuration for the WAL apply worker. pub apply: ReplicationApplyConfig, /// Cutover knobs — when to declare the target "caught up" and how the @@ -414,6 +420,8 @@ impl Default for OnlineOptions { drop_subscription_on_cutover: true, force_clean: false, sync_sequences_on_cutover: true, + auto_create_publication: true, + drop_slot_on_cutover: true, apply: ReplicationApplyConfig::default(), cutover: CutoverConfig::default(), } @@ -930,6 +938,26 @@ mod tests { assert!(opts.subscription_source_conn.is_none()); } + #[test] + fn online_options_default_auto_create_publication_and_drop_slot() { + let opts = OnlineOptions::default(); + assert!(opts.auto_create_publication); + assert!(opts.drop_slot_on_cutover); + } + + #[test] + fn serde_backwards_compat_missing_auto_create_and_drop_slot_defaults_true() { + let opts = OnlineOptions::default(); + let mut json: serde_json::Value = serde_json::to_value(&opts).unwrap(); + json.as_object_mut() + .unwrap() + .remove("auto_create_publication"); + json.as_object_mut().unwrap().remove("drop_slot_on_cutover"); + let opts2: OnlineOptions = serde_json::from_value(json).unwrap(); + assert!(opts2.auto_create_publication); + assert!(opts2.drop_slot_on_cutover); + } + #[test] fn validate_accepts_valid_online_config() { let cfg = MigrationConfig { diff --git a/pg_dbmigrator/src/dump.rs b/pg_dbmigrator/src/dump.rs index e50e856..69bb426 100644 --- a/pg_dbmigrator/src/dump.rs +++ b/pg_dbmigrator/src/dump.rs @@ -228,11 +228,10 @@ impl CommandRunner for TokioCommandRunner { ) -> Result<()> { debug!(program, ?args, "spawning external command"); - // For `pg_restore` we capture stderr (while still teeing it - // live to our own stderr) so we can build a categorized error - // summary on exit-1. Other commands keep the simpler inherit - // path — they have less verbose / more deterministic output. - let capture_stderr = program == "pg_restore"; + // Capture stderr for `pg_restore` (error categorization) and + // `pg_dump` (table-level progress). Both emit useful structured + // lines on stderr when `--verbose` is active. + let capture_stderr = program == "pg_restore" || program == "pg_dump"; let mut cmd = tokio::process::Command::new(program); cmd.args(args); @@ -260,6 +259,7 @@ impl CommandRunner for TokioCommandRunner { // (a) tees every line straight to our own stderr so the // operator still sees live progress, and (b) accumulates a // bounded structured summary of error/warning lines. + let is_restore = program == "pg_restore"; let stderr_task = if capture_stderr { child.stderr.take().map(|pipe| { tokio::spawn(async move { @@ -274,7 +274,10 @@ impl CommandRunner for TokioCommandRunner { let _ = sink.write_all(line.as_bytes()).await; let _ = sink.write_all(b"\n").await; let _ = sink.flush().await; - ingest_pg_restore_stderr_line(&line, &mut summary); + if is_restore { + ingest_pg_restore_stderr_line(&line, &mut summary); + } + emit_table_progress(&line); } Ok(None) => break, Err(_) => break, @@ -369,6 +372,24 @@ fn kill_child_group(pid: Option, sigkill: bool) { #[cfg(not(unix))] fn kill_child_group(_pid: Option, _sigkill: bool) {} +/// Detect table-level progress lines from pg_dump/pg_restore verbose stderr +/// and emit structured tracing events. Patterns: +/// pg_dump: dumping contents of table "schema"."table" +/// pg_restore: processing data for table "schema"."table" +fn emit_table_progress(line: &str) { + if let Some(table) = line + .strip_prefix("pg_dump: dumping contents of table \"") + .and_then(|s| s.strip_suffix('"')) + { + info!(table, "pg_dump: dumping table"); + } else if let Some(table) = line + .strip_prefix("pg_restore: processing data for table \"") + .and_then(|s| s.strip_suffix('"')) + { + info!(table, "pg_restore: restoring table"); + } +} + /// Run `pg_dump` according to `req` using the supplied [`CommandRunner`]. pub async fn run_pg_dump( runner: &R, @@ -856,4 +877,24 @@ mod tests { let dbg = format!("{:?}", r); assert!(dbg.contains("TokioCommandRunner")); } + + #[test] + fn emit_table_progress_detects_pg_dump_line() { + // Should not panic; emits tracing event (not assertable here but + // exercises the parsing path). + emit_table_progress(r#"pg_dump: dumping contents of table "public"."users""#); + } + + #[test] + fn emit_table_progress_detects_pg_restore_line() { + emit_table_progress(r#"pg_restore: processing data for table "public"."orders""#); + } + + #[test] + fn emit_table_progress_ignores_unrelated_lines() { + // Should not panic or emit anything for non-matching lines. + emit_table_progress("pg_dump: reading extensions"); + emit_table_progress("pg_restore: connecting to database"); + emit_table_progress(""); + } } diff --git a/pg_dbmigrator/src/native_apply.rs b/pg_dbmigrator/src/native_apply.rs index ce95d44..59c8acf 100644 --- a/pg_dbmigrator/src/native_apply.rs +++ b/pg_dbmigrator/src/native_apply.rs @@ -291,6 +291,68 @@ pub async fn cleanup_target_subscription(target_conn: &str, online: &OnlineOptio Ok(()) } +/// Drop the publication on the source. Best-effort: errors are logged but +/// do not propagate. +/// +/// Called after a successful cutover when the publication was auto-created +/// by the migrator. Uses `DROP PUBLICATION IF EXISTS` for idempotency. +pub async fn drop_source_publication(source_conn: &str, publication: &str) -> Result<()> { + let client = connect_with_sslmode(source_conn).await?; + let pub_ident = quote_ident(publication)?; + let sql = format!("DROP PUBLICATION IF EXISTS {pub_ident}"); + info!(publication, "dropping auto-created publication on source"); + if let Err(e) = client.batch_execute(&sql).await { + warn!(error = %e, publication, "failed to drop publication on source (continuing)"); + } else { + info!(publication, "publication dropped on source"); + } + Ok(()) +} + +/// Build the SQL to drop a publication (pure function for testing). +pub fn build_drop_publication_sql(publication: &str) -> Result { + let pub_ident = quote_ident(publication)?; + Ok(format!("DROP PUBLICATION IF EXISTS {pub_ident}")) +} + +/// Drop the replication slot on the source. Best-effort: errors are logged +/// but do not propagate. +/// +/// Called after a successful cutover when `drop_slot_on_cutover` is `true`. +/// Only drops the slot if it exists and is not currently active. +pub async fn drop_source_slot(source_conn: &str, slot_name: &str) -> Result<()> { + let client = connect_with_sslmode(source_conn).await?; + let slot_lit = quote_literal(slot_name)?; + let sql = format!( + "DO $$\n\ + BEGIN\n\ + IF EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = {slot_lit} AND NOT active) THEN\n\ + PERFORM pg_drop_replication_slot({slot_lit});\n\ + END IF;\n\ + END $$;", + ); + info!(slot_name, "dropping replication slot on source"); + if let Err(e) = client.batch_execute(&sql).await { + warn!(error = %e, slot_name, "failed to drop replication slot on source (continuing)"); + } else { + info!(slot_name, "replication slot dropped on source"); + } + Ok(()) +} + +/// Build the SQL to drop a replication slot (pure function for testing). +pub fn build_drop_slot_sql(slot_name: &str) -> Result { + let slot_lit = quote_literal(slot_name)?; + Ok(format!( + "DO $$\n\ + BEGIN\n\ + IF EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = {slot_lit} AND NOT active) THEN\n\ + PERFORM pg_drop_replication_slot({slot_lit});\n\ + END IF;\n\ + END $$;", + )) +} + /// Run the native apply phase: create a subscription on the target, poll the /// source for replication lag, and exit gracefully when the operator /// triggers cutover. @@ -995,4 +1057,30 @@ mod tests { let sql = make_create_subscription_sql(&opts, conn).unwrap(); assert!(sql.contains(conn)); } + + #[test] + fn build_drop_publication_sql_produces_if_exists() { + let sql = build_drop_publication_sql("my_pub").unwrap(); + assert_eq!(sql, "DROP PUBLICATION IF EXISTS \"my_pub\""); + } + + #[test] + fn build_drop_publication_sql_quotes_special_chars() { + let sql = build_drop_publication_sql("pub\"name").unwrap(); + assert!(sql.contains("\"pub\"\"name\"")); + } + + #[test] + fn build_drop_slot_sql_checks_inactive() { + let sql = build_drop_slot_sql("my_slot").unwrap(); + assert!(sql.contains("NOT active")); + assert!(sql.contains("pg_drop_replication_slot")); + assert!(sql.contains("'my_slot'")); + } + + #[test] + fn build_drop_slot_sql_quotes_special_chars() { + let sql = build_drop_slot_sql("slot'name").unwrap(); + assert!(sql.contains("'slot''name'")); + } } diff --git a/pg_dbmigrator/src/orchestrator.rs b/pg_dbmigrator/src/orchestrator.rs index 699fbfc..875d8d1 100644 --- a/pg_dbmigrator/src/orchestrator.rs +++ b/pg_dbmigrator/src/orchestrator.rs @@ -16,12 +16,13 @@ use crate::cutover::CutoverHandle; use crate::dump::{run_pg_dump, CommandRunner, DumpFormat, DumpRequest, TokioCommandRunner}; use crate::error::{MigrationError, Result}; use crate::native_apply::{ - disable_target_subscription, force_clean_stale_state, run_native_apply, wait_for_slot_inactive, - ApplyStats, PgSubscriptionLagProvider, + disable_target_subscription, drop_source_publication, drop_source_slot, + force_clean_stale_state, run_native_apply, wait_for_slot_inactive, ApplyStats, + PgSubscriptionLagProvider, }; use crate::preflight::{ - ensure_pglogical_not_interfering, ensure_target_database_exists, verify_pg_tools_installed, - verify_publication_exists, verify_source_logical_replication_ready, + ensure_pglogical_not_interfering, ensure_publication_exists, ensure_target_database_exists, + verify_pg_tools_installed, verify_publication_exists, verify_source_logical_replication_ready, }; use crate::progress::{MigrationStage, ProgressEvent, ProgressReporter, TracingReporter}; use crate::restore::{run_pg_restore, run_pg_restore_in_sections, RestoreRequest}; @@ -212,23 +213,42 @@ impl Migrator { // stream to keep. We only call `prepare_replication_slot` (and // hold a stream) when we still need to run pg_dump. let mut prepared_stream = None; + let mut pub_auto_created = false; let snapshot_name = if !token.has(CompletedStage::Dump) { - // Fail fast if the publication is missing. Without this check - // the apply worker would only error out 10+ minutes later - // (after dump+restore) from inside `CREATE SUBSCRIPTION`. - self.report( - MigrationStage::Validate, - format!( - "verifying publication `{}` exists on source", - self.config.online.publication - ), - ) - .await; - verify_publication_exists( - &self.config.source.connection_string, - &self.config.online.publication, - ) - .await?; + // Ensure the publication exists on the source. If auto-create + // is enabled and it's missing, create it. Otherwise fail fast + // so the apply worker doesn't error 10+ minutes later. + if self.config.online.auto_create_publication { + self.report( + MigrationStage::Validate, + format!( + "ensuring publication `{}` exists on source (auto-create enabled)", + self.config.online.publication + ), + ) + .await; + pub_auto_created = ensure_publication_exists( + &self.config.source.connection_string, + &self.config.online.publication, + &self.config.tables, + &self.config.schemas, + ) + .await?; + } else { + self.report( + MigrationStage::Validate, + format!( + "verifying publication `{}` exists on source", + self.config.online.publication + ), + ) + .await; + verify_publication_exists( + &self.config.source.connection_string, + &self.config.online.publication, + ) + .await?; + } // 1. Prepare slot + snapshot (must happen *before* pg_dump runs). self.report(MigrationStage::PrepareSnapshot, "creating replication slot") @@ -393,6 +413,53 @@ impl Migrator { } } + // 6. Post-cutover cleanup: drop auto-created publication and slot. + if stats.cutover_triggered { + if pub_auto_created { + self.report( + MigrationStage::Cutover, + format!( + "dropping auto-created publication `{}` on source", + self.config.online.publication + ), + ) + .await; + if let Err(e) = drop_source_publication( + &self.config.source.connection_string, + &self.config.online.publication, + ) + .await + { + tracing::warn!( + error = %e, + "failed to drop auto-created publication (non-fatal)" + ); + } + } + + if self.config.online.drop_slot_on_cutover { + self.report( + MigrationStage::Cutover, + format!( + "dropping replication slot `{}` on source", + self.config.online.slot_name + ), + ) + .await; + if let Err(e) = drop_source_slot( + &self.config.source.connection_string, + &self.config.online.slot_name, + ) + .await + { + tracing::warn!( + error = %e, + "failed to drop replication slot (non-fatal)" + ); + } + } + } + self.report(MigrationStage::Complete, "online migration finished") .await; Ok(MigrationOutcome { diff --git a/pg_dbmigrator/src/preflight.rs b/pg_dbmigrator/src/preflight.rs index 6c49f8c..7efeb12 100644 --- a/pg_dbmigrator/src/preflight.rs +++ b/pg_dbmigrator/src/preflight.rs @@ -145,6 +145,67 @@ pub async fn verify_source_logical_replication_ready(source_conn: &str) -> Resul Ok(()) } +/// Build the `CREATE PUBLICATION` SQL statement from the given parameters. +/// +/// When both `tables` and `schemas` are empty, creates `FOR ALL TABLES`. +/// When `tables` is non-empty, creates `FOR TABLE , , …`. +/// When only `schemas` is non-empty, creates `FOR TABLES IN SCHEMA , , …`. +pub fn build_create_publication_sql( + publication: &str, + tables: &[String], + schemas: &[String], +) -> Result { + let pub_ident = pg_walstream::quote_ident(publication)?; + let scope = if !tables.is_empty() { + let quoted: std::result::Result, _> = tables + .iter() + .map(|t| pg_walstream::quote_ident(t)) + .collect(); + format!("FOR TABLE {}", quoted?.join(", ")) + } else if !schemas.is_empty() { + let quoted: std::result::Result, _> = schemas + .iter() + .map(|s| pg_walstream::quote_ident(s)) + .collect(); + format!("FOR TABLES IN SCHEMA {}", quoted?.join(", ")) + } else { + "FOR ALL TABLES".to_string() + }; + Ok(format!("CREATE PUBLICATION {pub_ident} {scope}")) +} + +/// Ensure that a logical-replication publication with the given name exists +/// on the source. If absent and `auto_create` is enabled, create it +/// automatically. +/// +/// Returns `Ok(true)` if the publication was auto-created, `Ok(false)` if +/// it already existed. +pub async fn ensure_publication_exists( + source_conn: &str, + publication: &str, + tables: &[String], + schemas: &[String], +) -> Result { + let client = connect_with_sslmode(source_conn).await?; + let row = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM pg_publication WHERE pubname = $1)", + &[&publication], + ) + .await?; + let exists: bool = row.get(0); + if exists { + info!(publication, "publication already exists on source"); + return Ok(false); + } + + let sql = build_create_publication_sql(publication, tables, schemas)?; + info!(publication, sql = %sql, "auto-creating publication on source"); + client.batch_execute(&sql).await?; + info!(publication, "publication created successfully"); + Ok(true) +} + /// Rewrite a connection string so the path component (database name) points /// to the `postgres` maintenance database. Used to run admin commands like /// `CREATE DATABASE` which cannot target the database they are creating. @@ -425,4 +486,45 @@ mod tests { fn required_tools_length() { assert_eq!(REQUIRED_TOOLS.len(), 2); } + + #[test] + fn build_publication_sql_all_tables() { + let sql = build_create_publication_sql("my_pub", &[], &[]).unwrap(); + assert_eq!(sql, "CREATE PUBLICATION \"my_pub\" FOR ALL TABLES"); + } + + #[test] + fn build_publication_sql_specific_tables() { + let tables = vec!["public.users".to_string(), "public.orders".to_string()]; + let sql = build_create_publication_sql("my_pub", &tables, &[]).unwrap(); + assert_eq!( + sql, + "CREATE PUBLICATION \"my_pub\" FOR TABLE \"public.users\", \"public.orders\"" + ); + } + + #[test] + fn build_publication_sql_specific_schemas() { + let schemas = vec!["public".to_string(), "app".to_string()]; + let sql = build_create_publication_sql("my_pub", &[], &schemas).unwrap(); + assert_eq!( + sql, + "CREATE PUBLICATION \"my_pub\" FOR TABLES IN SCHEMA \"public\", \"app\"" + ); + } + + #[test] + fn build_publication_sql_tables_take_precedence_over_schemas() { + let tables = vec!["public.users".to_string()]; + let schemas = vec!["app".to_string()]; + let sql = build_create_publication_sql("my_pub", &tables, &schemas).unwrap(); + assert!(sql.contains("FOR TABLE")); + assert!(!sql.contains("FOR TABLES IN SCHEMA")); + } + + #[test] + fn build_publication_sql_quotes_special_chars() { + let sql = build_create_publication_sql("pub\"name", &[], &[]).unwrap(); + assert!(sql.contains("\"pub\"\"name\"")); + } } diff --git a/pg_dbmigrator/tests/postgres_integration.rs b/pg_dbmigrator/tests/postgres_integration.rs index b2c31d9..4cc8aa3 100644 --- a/pg_dbmigrator/tests/postgres_integration.rs +++ b/pg_dbmigrator/tests/postgres_integration.rs @@ -675,3 +675,109 @@ async fn maybe_analyze_target_runs_when_not_skipped() { let result = pg_dbmigrator::analyze::maybe_analyze_target(&config).await; assert!(result.is_ok()); } + +// ─── preflight::ensure_publication_exists ──────────────────────────────────── + +#[tokio::test] +async fn ensure_publication_exists_creates_when_missing() { + let url = skip_without_pg!(source_url()); + let client = connect_with_sslmode(&url).await.unwrap(); + + // Clean up any leftover + client + .batch_execute("DROP PUBLICATION IF EXISTS integ_auto_pub") + .await + .ok(); + + let created = + pg_dbmigrator::preflight::ensure_publication_exists(&url, "integ_auto_pub", &[], &[]) + .await + .unwrap(); + assert!(created, "publication should have been auto-created"); + + // Verify it exists now + let row = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM pg_publication WHERE pubname = 'integ_auto_pub')", + &[], + ) + .await + .unwrap(); + let exists: bool = row.get(0); + assert!(exists); + + // Clean up + client + .batch_execute("DROP PUBLICATION IF EXISTS integ_auto_pub") + .await + .ok(); +} + +#[tokio::test] +async fn ensure_publication_exists_noop_when_present() { + let url = skip_without_pg!(source_url()); + let client = connect_with_sslmode(&url).await.unwrap(); + + // Pre-create the publication + client + .batch_execute("CREATE PUBLICATION integ_existing_pub FOR ALL TABLES") + .await + .unwrap_or(()); + + let created = + pg_dbmigrator::preflight::ensure_publication_exists(&url, "integ_existing_pub", &[], &[]) + .await + .unwrap(); + assert!( + !created, + "publication already existed, should not re-create" + ); + + // Clean up + client + .batch_execute("DROP PUBLICATION IF EXISTS integ_existing_pub") + .await + .ok(); +} + +// ─── native_apply::drop_source_publication ────────────────────────────────── + +#[tokio::test] +async fn drop_source_publication_is_idempotent() { + let url = skip_without_pg!(source_url()); + let client = connect_with_sslmode(&url).await.unwrap(); + + // Create a publication, then drop it twice — both should succeed + client + .batch_execute("CREATE PUBLICATION integ_drop_pub FOR ALL TABLES") + .await + .unwrap_or(()); + + let result = pg_dbmigrator::native_apply::drop_source_publication(&url, "integ_drop_pub").await; + assert!(result.is_ok()); + + // Second drop should also succeed (IF EXISTS) + let result = pg_dbmigrator::native_apply::drop_source_publication(&url, "integ_drop_pub").await; + assert!(result.is_ok()); +} + +// ─── native_apply::drop_source_slot ───────────────────────────────────────── + +#[tokio::test] +async fn drop_source_slot_is_idempotent() { + let url = skip_without_pg!(source_url()); + let client = connect_with_sslmode(&url).await.unwrap(); + + // Create a slot, then drop it + client + .batch_execute("SELECT pg_create_logical_replication_slot('integ_drop_slot', 'pgoutput')") + .await + .unwrap_or(()); + + let result = pg_dbmigrator::native_apply::drop_source_slot(&url, "integ_drop_slot").await; + assert!(result.is_ok()); + + // Second drop should also succeed (slot absent → noop) + let result = pg_dbmigrator::native_apply::drop_source_slot(&url, "integ_drop_slot").await; + assert!(result.is_ok()); +} diff --git a/tests/integration/run_all.sh b/tests/integration/run_all.sh index 1b837df..216b56e 100755 --- a/tests/integration/run_all.sh +++ b/tests/integration/run_all.sh @@ -125,6 +125,8 @@ ONLINE_TESTS=( tests/integration/run_online_cancel_resume.sh tests/integration/run_online_multi_resume_sustained.sh tests/integration/run_online_sequence_sync.sh + tests/integration/run_online_auto_pub_lifecycle.sh + tests/integration/run_online_keep_slot.sh ) case "$FILTER" in diff --git a/tests/integration/run_online_auto_pub_lifecycle.sh b/tests/integration/run_online_auto_pub_lifecycle.sh new file mode 100755 index 0000000..7baa426 --- /dev/null +++ b/tests/integration/run_online_auto_pub_lifecycle.sh @@ -0,0 +1,87 @@ +#!/usr/bin/env bash +# End-to-end test: publication/subscription lifecycle auto-management. +# +# Exercises the new auto-create + post-cutover cleanup paths that the +# standard online test does NOT cover (seed.sql pre-creates the +# publication there). +# +# Flow: +# 1. Seed source but DROP the publication so it is absent at start. +# 2. Run online migrator with auto-create enabled (default) — the +# migrator must auto-create the publication. +# 3. Wait for initial catch-up, SIGINT to trigger cutover. +# 4. Assert data equality. +# 5. Assert the publication was dropped on source after cutover. +# 6. Assert the replication slot was dropped on source after cutover. +set -euo pipefail + +ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" +cd "$ROOT" +source "$ROOT/tests/integration/lib.sh" + +LOG_FILE="$(mktemp -t pg_dbmigrator_auto_pub.XXXXXX.log)" +echo "==> log file: $LOG_FILE" + +trap 'stop_migrator' EXIT + +wait_for_pg "$SOURCE_URL" "source" +wait_for_pg "$TARGET_URL" "target" + +# ── Setup: seed source but remove the publication ─────────────────────── +setup_online_test + +echo "==> dropping publication so auto-create path is exercised" +psql_exec "$SOURCE_URL" "DROP PUBLICATION IF EXISTS pg_dbmigrator_pub;" + +# Verify publication is indeed absent before we start. +pub_count=$("${PSQL_BASE[@]}" "$SOURCE_URL" \ + -c "SELECT count(*) FROM pg_publication WHERE pubname = 'pg_dbmigrator_pub'" \ + | tr -d '[:space:]') +if [[ "$pub_count" != "0" ]]; then + echo "FAIL: publication should not exist at this point (count=$pub_count)" >&2 + exit 1 +fi +echo "==> confirmed: publication absent on source" + +# ── Run online migration (auto-create enabled by default) ─────────────── +build_example online_migration_example +launch_online_migrator "$LOG_FILE" + +# ── Wait for initial catch-up ─────────────────────────────────────────── +echo "==> waiting for 'ready for cutover'" +CUTOVER_LINE=$(wait_for_log_match "$LOG_FILE" "ready for cutover" 0 120) +echo "==> got 'ready for cutover' on line $CUTOVER_LINE" + +# Verify auto-create happened — the log should contain the marker. +if ! grep -q "publication created successfully" "$LOG_FILE"; then + echo "FAIL: migrator did not auto-create the publication" >&2 + tail -n 80 "$LOG_FILE" >&2 + exit 1 +fi +echo "==> confirmed: publication was auto-created by migrator" + +# ── Cutover ───────────────────────────────────────────────────────────── +sigint_and_wait "$LOG_FILE" 120 +assert_data_equal 500 + +# ── Post-cutover: verify publication was cleaned up on source ─────────── +pub_count=$("${PSQL_BASE[@]}" "$SOURCE_URL" \ + -c "SELECT count(*) FROM pg_publication WHERE pubname = 'pg_dbmigrator_pub'" \ + | tr -d '[:space:]') +if [[ "$pub_count" != "0" ]]; then + echo "FAIL: auto-created publication should have been dropped after cutover (count=$pub_count)" >&2 + exit 1 +fi +echo "==> confirmed: publication dropped on source after cutover" + +# ── Post-cutover: verify replication slot was cleaned up on source ────── +slot_count=$("${PSQL_BASE[@]}" "$SOURCE_URL" \ + -c "SELECT count(*) FROM pg_replication_slots WHERE slot_name LIKE 'pg_dbmigrator%'" \ + | tr -d '[:space:]') +if [[ "$slot_count" != "0" ]]; then + echo "FAIL: replication slot should have been dropped after cutover (count=$slot_count)" >&2 + exit 1 +fi +echo "==> confirmed: replication slot dropped on source after cutover" + +echo "PASS: online auto-pub lifecycle — auto-create, data equal (500 rows), publication + slot cleaned up" diff --git a/tests/integration/run_online_keep_slot.sh b/tests/integration/run_online_keep_slot.sh new file mode 100755 index 0000000..7459484 --- /dev/null +++ b/tests/integration/run_online_keep_slot.sh @@ -0,0 +1,67 @@ +#!/usr/bin/env bash +# End-to-end test: keep-slot flag — verify the replication slot survives +# cutover when PG_DBMIGRATOR_KEEP_SLOT=1 is set. +# +# Also verifies that when a pre-existing publication was NOT auto-created, +# it is NOT dropped after cutover (only auto-created ones are cleaned up). +set -euo pipefail + +ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd)" +cd "$ROOT" +source "$ROOT/tests/integration/lib.sh" + +LOG_FILE="$(mktemp -t pg_dbmigrator_keep_slot.XXXXXX.log)" +echo "==> log file: $LOG_FILE" + +trap 'stop_migrator' EXIT + +wait_for_pg "$SOURCE_URL" "source" +wait_for_pg "$TARGET_URL" "target" + +# ── Setup: seed source (publication created by seed.sql) ──────────────── +setup_online_test + +# Verify publication exists (pre-created by seed.sql — not auto-created). +pub_count=$("${PSQL_BASE[@]}" "$SOURCE_URL" \ + -c "SELECT count(*) FROM pg_publication WHERE pubname = 'pg_dbmigrator_pub'" \ + | tr -d '[:space:]') +if [[ "$pub_count" != "1" ]]; then + echo "FAIL: publication should exist from seed.sql (count=$pub_count)" >&2 + exit 1 +fi + +# ── Run online migration with keep-slot enabled ───────────────────────── +build_example online_migration_example +launch_online_migrator "$LOG_FILE" \ + PG_DBMIGRATOR_KEEP_SLOT=1 + +# ── Wait for initial catch-up ─────────────────────────────────────────── +echo "==> waiting for 'ready for cutover'" +CUTOVER_LINE=$(wait_for_log_match "$LOG_FILE" "ready for cutover" 0 120) +echo "==> got 'ready for cutover' on line $CUTOVER_LINE" + +# ── Cutover ───────────────────────────────────────────────────────────── +sigint_and_wait "$LOG_FILE" 120 +assert_data_equal 500 + +# ── Post-cutover: slot should STILL exist (keep-slot=true) ────────────── +slot_count=$("${PSQL_BASE[@]}" "$SOURCE_URL" \ + -c "SELECT count(*) FROM pg_replication_slots WHERE slot_name LIKE 'pg_dbmigrator%'" \ + | tr -d '[:space:]') +if [[ "$slot_count" == "0" ]]; then + echo "FAIL: replication slot should have been kept (KEEP_SLOT=1) but it was dropped" >&2 + exit 1 +fi +echo "==> confirmed: replication slot retained after cutover (count=$slot_count)" + +# ── Post-cutover: pre-existing publication should NOT be dropped ──────── +pub_count=$("${PSQL_BASE[@]}" "$SOURCE_URL" \ + -c "SELECT count(*) FROM pg_publication WHERE pubname = 'pg_dbmigrator_pub'" \ + | tr -d '[:space:]') +if [[ "$pub_count" != "1" ]]; then + echo "FAIL: pre-existing publication should NOT be dropped (was not auto-created)" >&2 + exit 1 +fi +echo "==> confirmed: pre-existing publication retained after cutover" + +echo "PASS: online keep-slot — slot retained, pre-existing publication retained, data equal (500 rows)" diff --git a/tests/integration/seed.sql b/tests/integration/seed.sql index 8d7931b..f746497 100644 --- a/tests/integration/seed.sql +++ b/tests/integration/seed.sql @@ -28,6 +28,7 @@ INSERT INTO app.events (note) SELECT 'event-' || g FROM generate_series(1, 100) g; --- Publication required for online mode. Library does not auto-create it. +-- Publication for online mode. The library can auto-create it, but most +-- integration tests pre-create it here so they exercise the pre-existing path. DROP PUBLICATION IF EXISTS pg_dbmigrator_pub; CREATE PUBLICATION pg_dbmigrator_pub FOR ALL TABLES; From 824282b01c4def07e96e1badbd3e7db41da49836 Mon Sep 17 00:00:00 2001 From: danielshih Date: Fri, 8 May 2026 01:42:27 +0000 Subject: [PATCH 2/4] feat: implement auto-creation and cleanup of publications and replication slots --- pg_dbmigrator/src/orchestrator.rs | 25 +++++++++++++---- pg_dbmigrator/src/preflight.rs | 46 +++++++++++++++++++++++++++---- pg_dbmigrator/src/progress.rs | 6 ++++ pg_dbmigrator/src/resume.rs | 33 ++++++++++++++++++++++ 4 files changed, 100 insertions(+), 10 deletions(-) diff --git a/pg_dbmigrator/src/orchestrator.rs b/pg_dbmigrator/src/orchestrator.rs index 875d8d1..431a9e6 100644 --- a/pg_dbmigrator/src/orchestrator.rs +++ b/pg_dbmigrator/src/orchestrator.rs @@ -213,7 +213,6 @@ impl Migrator { // stream to keep. We only call `prepare_replication_slot` (and // hold a stream) when we still need to run pg_dump. let mut prepared_stream = None; - let mut pub_auto_created = false; let snapshot_name = if !token.has(CompletedStage::Dump) { // Ensure the publication exists on the source. If auto-create // is enabled and it's missing, create it. Otherwise fail fast @@ -227,13 +226,17 @@ impl Migrator { ), ) .await; - pub_auto_created = ensure_publication_exists( + let created = ensure_publication_exists( &self.config.source.connection_string, &self.config.online.publication, &self.config.tables, &self.config.schemas, ) .await?; + if created { + token.pub_auto_created = true; + self.save_resume(&token, &dump_path).await; + } } else { self.report( MigrationStage::Validate, @@ -415,9 +418,9 @@ impl Migrator { // 6. Post-cutover cleanup: drop auto-created publication and slot. if stats.cutover_triggered { - if pub_auto_created { + if token.pub_auto_created { self.report( - MigrationStage::Cutover, + MigrationStage::SourceCleanup, format!( "dropping auto-created publication `{}` on source", self.config.online.publication @@ -439,13 +442,25 @@ impl Migrator { if self.config.online.drop_slot_on_cutover { self.report( - MigrationStage::Cutover, + MigrationStage::SourceCleanup, format!( "dropping replication slot `{}` on source", self.config.online.slot_name ), ) .await; + if let Err(e) = wait_for_slot_inactive( + &self.config.source.connection_string, + &self.config.online.slot_name, + self.reporter.as_ref(), + ) + .await + { + tracing::warn!( + error = %e, + "failed waiting for slot to become inactive (non-fatal)" + ); + } if let Err(e) = drop_source_slot( &self.config.source.connection_string, &self.config.online.slot_name, diff --git a/pg_dbmigrator/src/preflight.rs b/pg_dbmigrator/src/preflight.rs index 7efeb12..9091d9e 100644 --- a/pg_dbmigrator/src/preflight.rs +++ b/pg_dbmigrator/src/preflight.rs @@ -145,6 +145,19 @@ pub async fn verify_source_logical_replication_ready(source_conn: &str) -> Resul Ok(()) } +/// Quote a potentially schema-qualified name (`schema.table`) by splitting +/// on `.` and quoting each part individually. Unqualified names (no dot) +/// are quoted as a single identifier. +/// +/// PostgreSQL requires `"schema"."table"` — quoting the whole thing as +/// `"schema.table"` creates a single identifier that includes a literal dot. +pub fn quote_qualified_name(name: &str) -> Result { + let parts: Vec<&str> = name.splitn(2, '.').collect(); + let quoted: std::result::Result, _> = + parts.iter().map(|p| pg_walstream::quote_ident(p)).collect(); + Ok(quoted?.join(".")) +} + /// Build the `CREATE PUBLICATION` SQL statement from the given parameters. /// /// When both `tables` and `schemas` are empty, creates `FOR ALL TABLES`. @@ -157,10 +170,8 @@ pub fn build_create_publication_sql( ) -> Result { let pub_ident = pg_walstream::quote_ident(publication)?; let scope = if !tables.is_empty() { - let quoted: std::result::Result, _> = tables - .iter() - .map(|t| pg_walstream::quote_ident(t)) - .collect(); + let quoted: std::result::Result, _> = + tables.iter().map(|t| quote_qualified_name(t)).collect(); format!("FOR TABLE {}", quoted?.join(", ")) } else if !schemas.is_empty() { let quoted: std::result::Result, _> = schemas @@ -499,7 +510,7 @@ mod tests { let sql = build_create_publication_sql("my_pub", &tables, &[]).unwrap(); assert_eq!( sql, - "CREATE PUBLICATION \"my_pub\" FOR TABLE \"public.users\", \"public.orders\"" + "CREATE PUBLICATION \"my_pub\" FOR TABLE \"public\".\"users\", \"public\".\"orders\"" ); } @@ -527,4 +538,29 @@ mod tests { let sql = build_create_publication_sql("pub\"name", &[], &[]).unwrap(); assert!(sql.contains("\"pub\"\"name\"")); } + + #[test] + fn quote_qualified_name_unqualified() { + let result = quote_qualified_name("users").unwrap(); + assert_eq!(result, "\"users\""); + } + + #[test] + fn quote_qualified_name_schema_qualified() { + let result = quote_qualified_name("public.users").unwrap(); + assert_eq!(result, "\"public\".\"users\""); + } + + #[test] + fn quote_qualified_name_special_chars() { + let result = quote_qualified_name("my schema.my table").unwrap(); + assert_eq!(result, "\"my schema\".\"my table\""); + } + + #[test] + fn quote_qualified_name_dot_in_table_part() { + // Only splits on first dot: "schema.table.extra" -> "schema" + "table.extra" + let result = quote_qualified_name("public.my.table").unwrap(); + assert_eq!(result, "\"public\".\"my.table\""); + } } diff --git a/pg_dbmigrator/src/progress.rs b/pg_dbmigrator/src/progress.rs index dd3581b..fd1c66e 100644 --- a/pg_dbmigrator/src/progress.rs +++ b/pg_dbmigrator/src/progress.rs @@ -37,6 +37,9 @@ pub enum MigrationStage { /// Cutover requested — the apply loop is winding down so the operator can /// switch traffic to the target. Cutover, + /// Post-cutover cleanup of auto-created publications and replication slots + /// on the source. + SourceCleanup, /// All work completed. Complete, } @@ -249,12 +252,15 @@ mod tests { let stages = [ MigrationStage::Validate, MigrationStage::PrepareSnapshot, + MigrationStage::SourceVacuum, MigrationStage::Dump, MigrationStage::Restore, + MigrationStage::Analyze, MigrationStage::StreamApply, MigrationStage::Lag, MigrationStage::CaughtUp, MigrationStage::Cutover, + MigrationStage::SourceCleanup, MigrationStage::Complete, ]; for stage in stages { diff --git a/pg_dbmigrator/src/resume.rs b/pg_dbmigrator/src/resume.rs index fec1e72..cec4e0d 100644 --- a/pg_dbmigrator/src/resume.rs +++ b/pg_dbmigrator/src/resume.rs @@ -90,6 +90,10 @@ pub struct ResumeToken { /// Most recent `confirmed_flush_lsn` observed by the apply lag /// poller. Useful for the operator's sanity check after a resume. pub last_applied_lsn: Option, + /// Whether the publication was auto-created by this migration run. + /// Persisted so cleanup can drop it even after a resume. + #[serde(default)] + pub pub_auto_created: bool, /// RFC-3339 timestamp of the last save. pub updated_at: String, } @@ -124,6 +128,7 @@ impl ResumeToken { }, snapshot_name: None, last_applied_lsn: None, + pub_auto_created: false, updated_at: now_rfc3339(), } } @@ -459,4 +464,32 @@ mod tests { assert!(CompletedStage::Dump < CompletedStage::Restore); assert!(CompletedStage::Restore < CompletedStage::Analyze); } + + #[test] + fn pub_auto_created_defaults_false_when_missing() { + let json = r#"{ + "schema_version": 1, + "config_hash": "abc", + "mode": "online", + "completed": [], + "dump_path": "/tmp/dump", + "slot_name": "slot", + "subscription_name": null, + "publication": null, + "snapshot_name": null, + "last_applied_lsn": null, + "updated_at": "2025-01-01T00:00:00Z" + }"#; + let token: ResumeToken = serde_json::from_str(json).unwrap(); + assert!(!token.pub_auto_created); + } + + #[test] + fn pub_auto_created_roundtrip() { + let mut t = ResumeToken::new(&cfg(), PathBuf::from("/tmp/dump")); + t.pub_auto_created = true; + let json = serde_json::to_string(&t).unwrap(); + let t2: ResumeToken = serde_json::from_str(&json).unwrap(); + assert!(t2.pub_auto_created); + } } From 6d17676fc54c33a18ad470c9bfc90ff516f712ea Mon Sep 17 00:00:00 2001 From: danielshih Date: Fri, 8 May 2026 01:56:10 +0000 Subject: [PATCH 3/4] fix: apply PR review suggestions and increase patch coverage - Fix schema-qualified quoting: validate empty parts, reject trailing/leading dots - Combine tables+schemas in CREATE PUBLICATION (PG 15+ syntax) - Propagate errors from drop_source_publication/drop_source_slot instead of swallowing - Fix emit_table_progress to capture full identifier (not strip outer quotes) - Persist pub_auto_created in ResumeToken for crash-safe cleanup - Add SourceCleanup stage to MigrationStage enum - Extract cleanup_source_after_cutover as public testable function - Add wait_for_slot_inactive before drop_source_slot post-cutover - Add integration tests for cleanup_source_after_cutover - Fix CI port conflict: clean up stale containers before starting stack Co-Authored-By: Claude Opus 4.6 --- .github/actions/setup-integration/action.yml | 9 +- pg_dbmigrator/src/dump.rs | 10 +- pg_dbmigrator/src/lib.rs | 2 +- pg_dbmigrator/src/native_apply.rs | 14 +-- pg_dbmigrator/src/orchestrator.rs | 110 +++++++++---------- pg_dbmigrator/src/preflight.rs | 50 ++++++--- pg_dbmigrator/tests/postgres_integration.rs | 103 +++++++++++++++++ 7 files changed, 210 insertions(+), 88 deletions(-) diff --git a/.github/actions/setup-integration/action.yml b/.github/actions/setup-integration/action.yml index d5065a9..2b1f2b0 100644 --- a/.github/actions/setup-integration/action.yml +++ b/.github/actions/setup-integration/action.yml @@ -15,6 +15,13 @@ runs: sudo apt-get update sudo apt-get install -y postgresql-client-17 + - name: Clean up stale containers and free ports + shell: bash + run: | + docker compose -f docker-compose.test.yml down -v 2>/dev/null || true + docker rm -f pg_dbmigrator_source pg_dbmigrator_target 2>/dev/null || true + sudo systemctl stop postgresql 2>/dev/null || true + - name: Start docker-compose stack shell: bash - run: docker compose -f docker-compose.test.yml up -d --wait + run: docker compose -f docker-compose.test.yml up -d --wait --force-recreate diff --git a/pg_dbmigrator/src/dump.rs b/pg_dbmigrator/src/dump.rs index 69bb426..3c831ea 100644 --- a/pg_dbmigrator/src/dump.rs +++ b/pg_dbmigrator/src/dump.rs @@ -377,15 +377,9 @@ fn kill_child_group(_pid: Option, _sigkill: bool) {} /// pg_dump: dumping contents of table "schema"."table" /// pg_restore: processing data for table "schema"."table" fn emit_table_progress(line: &str) { - if let Some(table) = line - .strip_prefix("pg_dump: dumping contents of table \"") - .and_then(|s| s.strip_suffix('"')) - { + if let Some(table) = line.strip_prefix("pg_dump: dumping contents of table ") { info!(table, "pg_dump: dumping table"); - } else if let Some(table) = line - .strip_prefix("pg_restore: processing data for table \"") - .and_then(|s| s.strip_suffix('"')) - { + } else if let Some(table) = line.strip_prefix("pg_restore: processing data for table ") { info!(table, "pg_restore: restoring table"); } } diff --git a/pg_dbmigrator/src/lib.rs b/pg_dbmigrator/src/lib.rs index 123ae68..2af4ecf 100644 --- a/pg_dbmigrator/src/lib.rs +++ b/pg_dbmigrator/src/lib.rs @@ -59,6 +59,6 @@ pub use config::{ }; pub use cutover::{CutoverHandle, LagSampler, Transition}; pub use error::{MigrationError, Result}; -pub use orchestrator::Migrator; +pub use orchestrator::{cleanup_source_after_cutover, Migrator}; pub use progress::{JsonReporter, MigrationStage, ProgressEvent, ProgressReporter}; pub use resume::{CompletedStage, ResumeToken, RESUME_SCHEMA_VERSION}; diff --git a/pg_dbmigrator/src/native_apply.rs b/pg_dbmigrator/src/native_apply.rs index 59c8acf..37d9275 100644 --- a/pg_dbmigrator/src/native_apply.rs +++ b/pg_dbmigrator/src/native_apply.rs @@ -301,11 +301,8 @@ pub async fn drop_source_publication(source_conn: &str, publication: &str) -> Re let pub_ident = quote_ident(publication)?; let sql = format!("DROP PUBLICATION IF EXISTS {pub_ident}"); info!(publication, "dropping auto-created publication on source"); - if let Err(e) = client.batch_execute(&sql).await { - warn!(error = %e, publication, "failed to drop publication on source (continuing)"); - } else { - info!(publication, "publication dropped on source"); - } + client.batch_execute(&sql).await?; + info!(publication, "publication dropped on source"); Ok(()) } @@ -332,11 +329,8 @@ pub async fn drop_source_slot(source_conn: &str, slot_name: &str) -> Result<()> END $$;", ); info!(slot_name, "dropping replication slot on source"); - if let Err(e) = client.batch_execute(&sql).await { - warn!(error = %e, slot_name, "failed to drop replication slot on source (continuing)"); - } else { - info!(slot_name, "replication slot dropped on source"); - } + client.batch_execute(&sql).await?; + info!(slot_name, "cleanup of replication slot on source completed"); Ok(()) } diff --git a/pg_dbmigrator/src/orchestrator.rs b/pg_dbmigrator/src/orchestrator.rs index 431a9e6..d28046b 100644 --- a/pg_dbmigrator/src/orchestrator.rs +++ b/pg_dbmigrator/src/orchestrator.rs @@ -418,61 +418,13 @@ impl Migrator { // 6. Post-cutover cleanup: drop auto-created publication and slot. if stats.cutover_triggered { - if token.pub_auto_created { - self.report( - MigrationStage::SourceCleanup, - format!( - "dropping auto-created publication `{}` on source", - self.config.online.publication - ), - ) - .await; - if let Err(e) = drop_source_publication( - &self.config.source.connection_string, - &self.config.online.publication, - ) - .await - { - tracing::warn!( - error = %e, - "failed to drop auto-created publication (non-fatal)" - ); - } - } - - if self.config.online.drop_slot_on_cutover { - self.report( - MigrationStage::SourceCleanup, - format!( - "dropping replication slot `{}` on source", - self.config.online.slot_name - ), - ) - .await; - if let Err(e) = wait_for_slot_inactive( - &self.config.source.connection_string, - &self.config.online.slot_name, - self.reporter.as_ref(), - ) - .await - { - tracing::warn!( - error = %e, - "failed waiting for slot to become inactive (non-fatal)" - ); - } - if let Err(e) = drop_source_slot( - &self.config.source.connection_string, - &self.config.online.slot_name, - ) - .await - { - tracing::warn!( - error = %e, - "failed to drop replication slot (non-fatal)" - ); - } - } + cleanup_source_after_cutover( + &self.config.source.connection_string, + &self.config.online, + token.pub_auto_created, + self.reporter.as_ref(), + ) + .await; } self.report(MigrationStage::Complete, "online migration finished") @@ -712,6 +664,54 @@ impl MigrationOutcome { } } +/// Post-cutover cleanup: drop auto-created publication and replication slot +/// on the source. Non-fatal — errors are logged as warnings. +pub async fn cleanup_source_after_cutover( + source_conn: &str, + online: &crate::config::OnlineOptions, + pub_auto_created: bool, + reporter: &dyn ProgressReporter, +) { + if pub_auto_created { + reporter + .report(ProgressEvent::new( + MigrationStage::SourceCleanup, + format!( + "dropping auto-created publication `{}` on source", + online.publication + ), + )) + .await; + if let Err(e) = drop_source_publication(source_conn, &online.publication).await { + tracing::warn!( + error = %e, + "failed to drop auto-created publication (non-fatal)" + ); + } + } + + if online.drop_slot_on_cutover { + reporter + .report(ProgressEvent::new( + MigrationStage::SourceCleanup, + format!("dropping replication slot `{}` on source", online.slot_name), + )) + .await; + if let Err(e) = wait_for_slot_inactive(source_conn, &online.slot_name, reporter).await { + tracing::warn!( + error = %e, + "failed waiting for slot to become inactive (non-fatal)" + ); + } + if let Err(e) = drop_source_slot(source_conn, &online.slot_name).await { + tracing::warn!( + error = %e, + "failed to drop replication slot (non-fatal)" + ); + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/pg_dbmigrator/src/preflight.rs b/pg_dbmigrator/src/preflight.rs index 9091d9e..985f56a 100644 --- a/pg_dbmigrator/src/preflight.rs +++ b/pg_dbmigrator/src/preflight.rs @@ -153,6 +153,11 @@ pub async fn verify_source_logical_replication_ready(source_conn: &str) -> Resul /// `"schema.table"` creates a single identifier that includes a literal dot. pub fn quote_qualified_name(name: &str) -> Result { let parts: Vec<&str> = name.splitn(2, '.').collect(); + if parts.iter().any(|p| p.is_empty()) { + return Err(MigrationError::config(format!( + "invalid qualified name: `{name}` (empty component)" + ))); + } let quoted: std::result::Result, _> = parts.iter().map(|p| pg_walstream::quote_ident(p)).collect(); Ok(quoted?.join(".")) @@ -169,16 +174,21 @@ pub fn build_create_publication_sql( schemas: &[String], ) -> Result { let pub_ident = pg_walstream::quote_ident(publication)?; - let scope = if !tables.is_empty() { - let quoted: std::result::Result, _> = - tables.iter().map(|t| quote_qualified_name(t)).collect(); - format!("FOR TABLE {}", quoted?.join(", ")) - } else if !schemas.is_empty() { - let quoted: std::result::Result, _> = schemas - .iter() - .map(|s| pg_walstream::quote_ident(s)) - .collect(); - format!("FOR TABLES IN SCHEMA {}", quoted?.join(", ")) + let scope = if !tables.is_empty() || !schemas.is_empty() { + let mut scope_parts = Vec::new(); + if !tables.is_empty() { + let quoted: std::result::Result, _> = + tables.iter().map(|t| quote_qualified_name(t)).collect(); + scope_parts.push(format!("TABLE {}", quoted?.join(", "))); + } + if !schemas.is_empty() { + let quoted: std::result::Result, _> = schemas + .iter() + .map(|s| pg_walstream::quote_ident(s)) + .collect(); + scope_parts.push(format!("TABLES IN SCHEMA {}", quoted?.join(", "))); + } + format!("FOR {}", scope_parts.join(", ")) } else { "FOR ALL TABLES".to_string() }; @@ -525,12 +535,14 @@ mod tests { } #[test] - fn build_publication_sql_tables_take_precedence_over_schemas() { + fn build_publication_sql_combines_tables_and_schemas() { let tables = vec!["public.users".to_string()]; let schemas = vec!["app".to_string()]; let sql = build_create_publication_sql("my_pub", &tables, &schemas).unwrap(); - assert!(sql.contains("FOR TABLE")); - assert!(!sql.contains("FOR TABLES IN SCHEMA")); + assert_eq!( + sql, + "CREATE PUBLICATION \"my_pub\" FOR TABLE \"public\".\"users\", TABLES IN SCHEMA \"app\"" + ); } #[test] @@ -563,4 +575,16 @@ mod tests { let result = quote_qualified_name("public.my.table").unwrap(); assert_eq!(result, "\"public\".\"my.table\""); } + + #[test] + fn quote_qualified_name_rejects_trailing_dot() { + let result = quote_qualified_name("public."); + assert!(result.is_err()); + } + + #[test] + fn quote_qualified_name_rejects_leading_dot() { + let result = quote_qualified_name(".table"); + assert!(result.is_err()); + } } diff --git a/pg_dbmigrator/tests/postgres_integration.rs b/pg_dbmigrator/tests/postgres_integration.rs index 4cc8aa3..781fe74 100644 --- a/pg_dbmigrator/tests/postgres_integration.rs +++ b/pg_dbmigrator/tests/postgres_integration.rs @@ -781,3 +781,106 @@ async fn drop_source_slot_is_idempotent() { let result = pg_dbmigrator::native_apply::drop_source_slot(&url, "integ_drop_slot").await; assert!(result.is_ok()); } + +// ─── orchestrator::cleanup_source_after_cutover ───────────────────────────── + +#[tokio::test] +async fn cleanup_source_after_cutover_drops_pub_and_slot() { + let url = skip_without_pg!(source_url()); + let client = connect_with_sslmode(&url).await.unwrap(); + + // Pre-create a publication and slot + client + .batch_execute("CREATE PUBLICATION integ_cleanup_pub FOR ALL TABLES") + .await + .unwrap_or(()); + client + .batch_execute( + "SELECT pg_create_logical_replication_slot('integ_cleanup_slot', 'pgoutput')", + ) + .await + .unwrap_or(()); + + let online = pg_dbmigrator::OnlineOptions { + publication: "integ_cleanup_pub".into(), + slot_name: "integ_cleanup_slot".into(), + drop_slot_on_cutover: true, + ..pg_dbmigrator::OnlineOptions::default() + }; + + let reporter = pg_dbmigrator::progress::CollectingReporter::new(); + pg_dbmigrator::cleanup_source_after_cutover(&url, &online, true, &reporter).await; + + // Verify publication was dropped + let row = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM pg_publication WHERE pubname = 'integ_cleanup_pub')", + &[], + ) + .await + .unwrap(); + let exists: bool = row.get(0); + assert!(!exists, "publication should have been dropped"); + + // Verify slot was dropped + let row = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM pg_replication_slots WHERE slot_name = 'integ_cleanup_slot')", + &[], + ) + .await + .unwrap(); + let exists: bool = row.get(0); + assert!(!exists, "slot should have been dropped"); + + // Check reporter emitted SourceCleanup events + let events = reporter.events().await; + assert!(events.len() >= 2); + assert!(events + .iter() + .all(|e| e.stage == pg_dbmigrator::MigrationStage::SourceCleanup)); +} + +#[tokio::test] +async fn cleanup_source_after_cutover_skips_when_not_auto_created() { + let url = skip_without_pg!(source_url()); + let client = connect_with_sslmode(&url).await.unwrap(); + + // Pre-create a publication (simulate operator-created) + client + .batch_execute("CREATE PUBLICATION integ_keep_pub FOR ALL TABLES") + .await + .unwrap_or(()); + + let online = pg_dbmigrator::OnlineOptions { + publication: "integ_keep_pub".into(), + slot_name: "integ_absent_slot_xyz".into(), + drop_slot_on_cutover: false, + ..pg_dbmigrator::OnlineOptions::default() + }; + + let reporter = pg_dbmigrator::progress::CollectingReporter::new(); + // pub_auto_created = false, drop_slot_on_cutover = false → should be a no-op + pg_dbmigrator::cleanup_source_after_cutover(&url, &online, false, &reporter).await; + + // Publication should still exist + let row = client + .query_one( + "SELECT EXISTS(SELECT 1 FROM pg_publication WHERE pubname = 'integ_keep_pub')", + &[], + ) + .await + .unwrap(); + let exists: bool = row.get(0); + assert!(exists, "publication should NOT have been dropped"); + + // No events emitted + let events = reporter.events().await; + assert!(events.is_empty()); + + // Clean up + client + .batch_execute("DROP PUBLICATION IF EXISTS integ_keep_pub") + .await + .ok(); +} From b4c7fd3e033e3e4c8cfd78a7ad4f37ac8e57c2fa Mon Sep 17 00:00:00 2001 From: danielshih Date: Fri, 8 May 2026 02:18:58 +0000 Subject: [PATCH 4/4] add support for excluding tables and schemas in publication creation --- pg_dbmigrator/src/orchestrator.rs | 2 + pg_dbmigrator/src/preflight.rs | 195 ++++++++++++++++- pg_dbmigrator/tests/postgres_integration.rs | 228 +++++++++++++++++++- 3 files changed, 416 insertions(+), 9 deletions(-) diff --git a/pg_dbmigrator/src/orchestrator.rs b/pg_dbmigrator/src/orchestrator.rs index d28046b..acef8f7 100644 --- a/pg_dbmigrator/src/orchestrator.rs +++ b/pg_dbmigrator/src/orchestrator.rs @@ -231,6 +231,8 @@ impl Migrator { &self.config.online.publication, &self.config.tables, &self.config.schemas, + &self.config.exclude_tables, + &self.config.exclude_schemas, ) .await?; if created { diff --git a/pg_dbmigrator/src/preflight.rs b/pg_dbmigrator/src/preflight.rs index 985f56a..4f55585 100644 --- a/pg_dbmigrator/src/preflight.rs +++ b/pg_dbmigrator/src/preflight.rs @@ -195,10 +195,75 @@ pub fn build_create_publication_sql( Ok(format!("CREATE PUBLICATION {pub_ident} {scope}")) } +/// Filter a list of `schema.table` names by removing entries that match +/// `exclude_tables` or belong to a schema in `exclude_schemas`. +pub fn filter_tables_by_exclusions( + tables: &[String], + exclude_tables: &[String], + exclude_schemas: &[String], +) -> Vec { + tables + .iter() + .filter(|t| { + if exclude_tables.iter().any(|ex| ex == *t) { + return false; + } + if let Some(schema) = t.split('.').next() { + if exclude_schemas.iter().any(|ex| ex == schema) { + return false; + } + } + true + }) + .cloned() + .collect() +} + +/// Query the source for all ordinary and partitioned tables, excluding +/// system schemas and applying the caller's exclusion lists. Returns +/// `schema.table` qualified names suitable for `build_create_publication_sql`. +async fn fetch_published_tables( + client: &tokio_postgres::Client, + exclude_tables: &[String], + exclude_schemas: &[String], +) -> Result> { + let rows = client + .query( + "SELECT n.nspname::text, c.relname::text \ + FROM pg_class c \ + JOIN pg_namespace n ON n.oid = c.relnamespace \ + WHERE c.relkind IN ('r', 'p') \ + AND n.nspname NOT IN ('pg_catalog', 'information_schema')", + &[], + ) + .await?; + + let all_tables: Vec = rows + .iter() + .map(|r| { + let schema: &str = r.get(0); + let table: &str = r.get(1); + format!("{schema}.{table}") + }) + .collect(); + + Ok(filter_tables_by_exclusions( + &all_tables, + exclude_tables, + exclude_schemas, + )) +} + /// Ensure that a logical-replication publication with the given name exists /// on the source. If absent and `auto_create` is enabled, create it /// automatically. /// +/// When `exclude_tables` or `exclude_schemas` are non-empty and the include +/// lists (`tables`, `schemas`) are empty, the publication is scoped to an +/// explicit table list obtained by querying the source and subtracting the +/// excluded objects. This prevents the target's apply worker from crashing +/// when excluded objects are modified on the source. +/// /// Returns `Ok(true)` if the publication was auto-created, `Ok(false)` if /// it already existed. pub async fn ensure_publication_exists( @@ -206,6 +271,8 @@ pub async fn ensure_publication_exists( publication: &str, tables: &[String], schemas: &[String], + exclude_tables: &[String], + exclude_schemas: &[String], ) -> Result { let client = connect_with_sslmode(source_conn).await?; let row = client @@ -220,7 +287,27 @@ pub async fn ensure_publication_exists( return Ok(false); } - let sql = build_create_publication_sql(publication, tables, schemas)?; + let has_exclusions = !exclude_tables.is_empty() || !exclude_schemas.is_empty(); + let has_includes = !tables.is_empty() || !schemas.is_empty(); + + let (effective_tables, effective_schemas): (Vec, Vec) = if has_exclusions + && !has_includes + { + let resolved = fetch_published_tables(&client, exclude_tables, exclude_schemas).await?; + (resolved, Vec::new()) + } else if has_exclusions && has_includes { + let filtered_tables = filter_tables_by_exclusions(tables, exclude_tables, exclude_schemas); + let filtered_schemas: Vec = schemas + .iter() + .filter(|s| !exclude_schemas.iter().any(|ex| ex == *s)) + .cloned() + .collect(); + (filtered_tables, filtered_schemas) + } else { + (tables.to_vec(), schemas.to_vec()) + }; + + let sql = build_create_publication_sql(publication, &effective_tables, &effective_schemas)?; info!(publication, sql = %sql, "auto-creating publication on source"); client.batch_execute(&sql).await?; info!(publication, "publication created successfully"); @@ -587,4 +674,110 @@ mod tests { let result = quote_qualified_name(".table"); assert!(result.is_err()); } + + #[test] + fn filter_tables_excludes_by_table_name() { + let tables = vec![ + "public.users".into(), + "public.orders".into(), + "public.large_logs".into(), + ]; + let result = filter_tables_by_exclusions(&tables, &["public.large_logs".into()], &[]); + assert_eq!(result, vec!["public.users", "public.orders"]); + } + + #[test] + fn filter_tables_excludes_by_schema() { + let tables = vec![ + "public.users".into(), + "audit.events".into(), + "audit.actions".into(), + "app.config".into(), + ]; + let result = filter_tables_by_exclusions(&tables, &[], &["audit".into()]); + assert_eq!(result, vec!["public.users", "app.config"]); + } + + #[test] + fn filter_tables_excludes_both_table_and_schema() { + let tables = vec![ + "public.users".into(), + "public.large_logs".into(), + "audit.events".into(), + "app.config".into(), + ]; + let result = + filter_tables_by_exclusions(&tables, &["public.large_logs".into()], &["audit".into()]); + assert_eq!(result, vec!["public.users", "app.config"]); + } + + #[test] + fn filter_tables_no_exclusions_returns_all() { + let tables = vec!["public.users".into(), "public.orders".into()]; + let result = filter_tables_by_exclusions(&tables, &[], &[]); + assert_eq!(result, tables); + } + + #[test] + fn filter_tables_empty_input() { + let result: Vec = + filter_tables_by_exclusions(&[], &["public.x".into()], &["audit".into()]); + assert!(result.is_empty()); + } + + #[test] + fn filter_tables_exclude_all_matches_returns_empty() { + let tables = vec!["audit.x".into(), "audit.y".into()]; + let result = filter_tables_by_exclusions(&tables, &[], &["audit".into()]); + assert!(result.is_empty()); + } + + #[test] + fn filter_tables_exclude_nonexistent_is_noop() { + let tables = vec!["public.users".into()]; + let result = filter_tables_by_exclusions( + &tables, + &["public.nonexistent".into()], + &["no_such_schema".into()], + ); + assert_eq!(result, vec!["public.users"]); + } + + #[test] + fn filter_then_build_sql_excludes_correctly() { + let all_tables: Vec = vec![ + "public.users".into(), + "public.orders".into(), + "audit.logs".into(), + "temp.scratch".into(), + ]; + let filtered = + filter_tables_by_exclusions(&all_tables, &["public.orders".into()], &["audit".into()]); + let sql = build_create_publication_sql("my_pub", &filtered, &[]).unwrap(); + assert_eq!( + sql, + "CREATE PUBLICATION \"my_pub\" FOR TABLE \"public\".\"users\", \"temp\".\"scratch\"" + ); + assert!(!sql.contains("orders")); + assert!(!sql.contains("audit")); + } + + #[test] + fn filter_schemas_from_include_list() { + let schemas: Vec = ["public", "audit", "app"] + .iter() + .map(|s| (*s).into()) + .collect(); + let exclude_schemas: Vec = ["audit"].iter().map(|s| (*s).into()).collect(); + let filtered: Vec = schemas + .iter() + .filter(|s| !exclude_schemas.iter().any(|ex| ex == *s)) + .cloned() + .collect(); + assert_eq!(filtered, vec!["public", "app"]); + let sql = build_create_publication_sql("p", &[], &filtered).unwrap(); + assert!(sql.contains("\"public\"")); + assert!(sql.contains("\"app\"")); + assert!(!sql.contains("\"audit\"")); + } } diff --git a/pg_dbmigrator/tests/postgres_integration.rs b/pg_dbmigrator/tests/postgres_integration.rs index 781fe74..04758ac 100644 --- a/pg_dbmigrator/tests/postgres_integration.rs +++ b/pg_dbmigrator/tests/postgres_integration.rs @@ -689,10 +689,16 @@ async fn ensure_publication_exists_creates_when_missing() { .await .ok(); - let created = - pg_dbmigrator::preflight::ensure_publication_exists(&url, "integ_auto_pub", &[], &[]) - .await - .unwrap(); + let created = pg_dbmigrator::preflight::ensure_publication_exists( + &url, + "integ_auto_pub", + &[], + &[], + &[], + &[], + ) + .await + .unwrap(); assert!(created, "publication should have been auto-created"); // Verify it exists now @@ -724,10 +730,16 @@ async fn ensure_publication_exists_noop_when_present() { .await .unwrap_or(()); - let created = - pg_dbmigrator::preflight::ensure_publication_exists(&url, "integ_existing_pub", &[], &[]) - .await - .unwrap(); + let created = pg_dbmigrator::preflight::ensure_publication_exists( + &url, + "integ_existing_pub", + &[], + &[], + &[], + &[], + ) + .await + .unwrap(); assert!( !created, "publication already existed, should not re-create" @@ -740,6 +752,206 @@ async fn ensure_publication_exists_noop_when_present() { .ok(); } +#[tokio::test] +async fn ensure_publication_excludes_tables_when_no_includes() { + let url = skip_without_pg!(source_url()); + let client = connect_with_sslmode(&url).await.unwrap(); + + client + .batch_execute("DROP PUBLICATION IF EXISTS integ_excl_pub") + .await + .ok(); + client + .batch_execute( + "CREATE TABLE IF NOT EXISTS public.keep_me (id int); \ + CREATE TABLE IF NOT EXISTS public.skip_me (id int);", + ) + .await + .unwrap(); + + let created = pg_dbmigrator::preflight::ensure_publication_exists( + &url, + "integ_excl_pub", + &[], + &[], + &["public.skip_me".into()], + &[], + ) + .await + .unwrap(); + assert!(created); + + // The publication should list tables explicitly, NOT be FOR ALL TABLES. + let row = client + .query_one( + "SELECT puballtables FROM pg_publication WHERE pubname = 'integ_excl_pub'", + &[], + ) + .await + .unwrap(); + let all_tables: bool = row.get(0); + assert!( + !all_tables, + "publication should NOT be FOR ALL TABLES when exclusions are set" + ); + + // Verify skip_me is not in the publication's table list. + let skip_rows = client + .query( + "SELECT schemaname, tablename FROM pg_publication_tables \ + WHERE pubname = 'integ_excl_pub' AND tablename = 'skip_me'", + &[], + ) + .await + .unwrap(); + assert!( + skip_rows.is_empty(), + "excluded table should not be in publication" + ); + + // Verify keep_me IS in the publication. + let keep_rows = client + .query( + "SELECT schemaname, tablename FROM pg_publication_tables \ + WHERE pubname = 'integ_excl_pub' AND tablename = 'keep_me'", + &[], + ) + .await + .unwrap(); + assert!( + !keep_rows.is_empty(), + "non-excluded table should be in publication" + ); + + client + .batch_execute( + "DROP PUBLICATION IF EXISTS integ_excl_pub; \ + DROP TABLE IF EXISTS public.keep_me; \ + DROP TABLE IF EXISTS public.skip_me;", + ) + .await + .ok(); +} + +#[tokio::test] +async fn ensure_publication_excludes_schema_when_no_includes() { + let url = skip_without_pg!(source_url()); + let client = connect_with_sslmode(&url).await.unwrap(); + + client + .batch_execute("DROP PUBLICATION IF EXISTS integ_excl_schema_pub") + .await + .ok(); + client + .batch_execute( + "CREATE SCHEMA IF NOT EXISTS excl_test; \ + CREATE TABLE IF NOT EXISTS excl_test.should_skip (id int); \ + CREATE TABLE IF NOT EXISTS public.should_keep (id int);", + ) + .await + .unwrap(); + + let created = pg_dbmigrator::preflight::ensure_publication_exists( + &url, + "integ_excl_schema_pub", + &[], + &[], + &[], + &["excl_test".into()], + ) + .await + .unwrap(); + assert!(created); + + let skip_rows = client + .query( + "SELECT schemaname, tablename FROM pg_publication_tables \ + WHERE pubname = 'integ_excl_schema_pub' AND schemaname = 'excl_test'", + &[], + ) + .await + .unwrap(); + assert!( + skip_rows.is_empty(), + "tables from excluded schema should not be in publication" + ); + + client + .batch_execute( + "DROP PUBLICATION IF EXISTS integ_excl_schema_pub; \ + DROP TABLE IF EXISTS excl_test.should_skip; \ + DROP TABLE IF EXISTS public.should_keep; \ + DROP SCHEMA IF EXISTS excl_test;", + ) + .await + .ok(); +} + +#[tokio::test] +async fn ensure_publication_filters_includes_with_exclusions() { + let url = skip_without_pg!(source_url()); + let client = connect_with_sslmode(&url).await.unwrap(); + + client + .batch_execute("DROP PUBLICATION IF EXISTS integ_incl_excl_pub") + .await + .ok(); + client + .batch_execute( + "CREATE TABLE IF NOT EXISTS public.inc_keep (id int); \ + CREATE TABLE IF NOT EXISTS public.inc_skip (id int);", + ) + .await + .unwrap(); + + let created = pg_dbmigrator::preflight::ensure_publication_exists( + &url, + "integ_incl_excl_pub", + &["public.inc_keep".into(), "public.inc_skip".into()], + &[], + &["public.inc_skip".into()], + &[], + ) + .await + .unwrap(); + assert!(created); + + let skip_rows = client + .query( + "SELECT tablename FROM pg_publication_tables \ + WHERE pubname = 'integ_incl_excl_pub' AND tablename = 'inc_skip'", + &[], + ) + .await + .unwrap(); + assert!( + skip_rows.is_empty(), + "excluded table should be filtered from include list" + ); + + let keep_rows = client + .query( + "SELECT tablename FROM pg_publication_tables \ + WHERE pubname = 'integ_incl_excl_pub' AND tablename = 'inc_keep'", + &[], + ) + .await + .unwrap(); + assert!( + !keep_rows.is_empty(), + "non-excluded table from include list should be in publication" + ); + + client + .batch_execute( + "DROP PUBLICATION IF EXISTS integ_incl_excl_pub; \ + DROP TABLE IF EXISTS public.inc_keep; \ + DROP TABLE IF EXISTS public.inc_skip;", + ) + .await + .ok(); +} + // ─── native_apply::drop_source_publication ────────────────────────────────── #[tokio::test]