Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions Agent.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ docker-compose.test.yml # source :55432, target :55433
|--------|------|
| `config.rs` | `MigrationConfig`/`OnlineOptions`, validation, performance defaults |
| `error.rs` | `MigrationError` (thiserror) + `Result<T>` |
| `dump.rs` | pg_dump wrapper, `CommandRunner` trait, argv builder, stderr progress parsing |
| `dump.rs` | pg_dump wrapper, `CommandRunner` trait, argv builder, stderr ingestion |
| `restore.rs` | pg_restore/psql wrapper |
| `analyze.rs` | Pre-dump VACUUM ANALYZE + post-restore ANALYZE |
| `snapshot.rs` | Replication slot creation + exported snapshot |
| `native_apply.rs` | CREATE SUBSCRIPTION, lag polling, `ApplyStats`, `drop_source_{publication,slot}` |
| `orchestrator.rs` | `Migrator` — wires all stages |
| `orchestrator.rs` | `Migrator` — wires all stages, parallel pre-flight via `tokio::join!` |
| `progress.rs` | `ProgressReporter` trait + Tracing/Collecting impls |
| `preflight.rs` | Source validation, `ensure_publication_exists` (auto-create) |
| `sequences.rs` | Source→target setval at cutover |
| `sequences.rs` | Source→target setval at cutover (batched PL/pgSQL for single round-trip) |
| `resume.rs` | Resume token persistence |
| `tls.rs` | TLS-aware connection helper |

Expand All @@ -89,6 +89,13 @@ docker-compose.test.yml # source :55432, target :55433
4. **Sequence sync at cutover**: migrator runs setval() on all target sequences after cutover.
5. **Publication lifecycle**: auto-created publications are tracked (`pub_auto_created`) and dropped after cutover; pre-existing ones are never dropped.

### Performance optimizations

- **Parallel pre-flight**: `ensure_target_database_exists` and `verify_source_logical_replication_ready` run concurrently via `tokio::join!` (they hit different servers).
- **Batched sequence sync**: `apply_sequences_to_target()` builds a single PL/pgSQL `DO` block with per-sequence exception handling, reducing N round-trips to 1. Falls back to individual statements if the batch fails.
- **Split-section restore**: pre-data → data → post-data for index-free COPY (30-60% faster index rebuild).
- **LZ4 compression**: default `lz4:1` for dump archives (fast compress/decompress).

---

## 2. Design Principles (mandatory reading)
Expand Down Expand Up @@ -117,7 +124,9 @@ docker-compose.test.yml # source :55432, target :55433

6. **Contract with pg_walstream**
- Slot creation before dump. START_REPLICATION only after dump.
- Pinned at `0.6.2` via path dep to `../pg-walstream`. Verify `ChangeEvent`/`EventType` compatibility before bumping.
- Pinned at `0.6.3` via workspace dep. Verify `ChangeEvent`/`EventType` compatibility before bumping.
- Used functions: `quote_ident`, `quote_literal`, `parse_lsn`, `format_lsn`, `build_create_subscription_sql`, `build_disable_subscription_sql`, `build_detach_slot_sql`, `build_drop_subscription_sql`, `LogicalReplicationStream`, `ReplicationStreamConfig`, `CreateSubscriptionOptions`.
- pg_walstream has NO publication SQL builders — publication SQL in `preflight.rs` is hand-rolled (necessary).

7. **No unsolicited optimisation**
- Make only the changes the task requires. No silent async conversions, container swaps, or new crate additions.
Expand Down Expand Up @@ -220,7 +229,7 @@ make integration # e2e, requires Docker
- **Small workspace** — most files are 200–400 lines. Read the entire file before editing. Use grep to confirm blast radius.
- **Validate after editing** — run `cargo test -p pg_dbmigrator` after every change.
- **New stage / mode** → must update ALL of: `MigrationStage` enum, `Migrator` entry point, CLI args, README, and this file.
- **pg_walstream** lives in sibling workspace (`../pg-walstream`). Pinned at `0.6.2` via path dep. Do NOT vendor/fork — propose changes upstream.
- **pg_walstream** pinned at `0.6.3` via workspace dep. Do NOT vendor/fork — propose changes upstream.
- **Library/CLI boundary** — library returns `pg_dbmigrator::Result<T>` only. `anyhow` is CLI/examples only.
- **Versions** — all dependency versions live in workspace `Cargo.toml` under `[workspace.dependencies]`. Crate manifests use `xxx.workspace = true`.

Expand Down
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ members = [
]

[workspace.package]
version = "0.1.1"
version = "0.2.0"
edition = "2021"
license = "BSD-3-Clause"
authors = ["danielshih"]
Expand Down
33 changes: 0 additions & 33 deletions pg_dbmigrator/src/dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ impl CommandRunner for TokioCommandRunner {
if is_restore {
ingest_pg_restore_stderr_line(&line, &mut summary);
}
emit_table_progress(&line);
}
Ok(None) => break,
Err(_) => break,
Expand Down Expand Up @@ -372,18 +371,6 @@ fn kill_child_group(pid: Option<u32>, sigkill: bool) {
#[cfg(not(unix))]
fn kill_child_group(_pid: Option<u32>, _sigkill: bool) {}

/// Detect table-level progress lines from pg_dump/pg_restore verbose stderr
/// and emit structured tracing events. Patterns:
/// pg_dump: dumping contents of table "schema"."table"
/// pg_restore: processing data for table "schema"."table"
fn emit_table_progress(line: &str) {
if let Some(table) = line.strip_prefix("pg_dump: dumping contents of table ") {
info!(table, "pg_dump: dumping table");
} else if let Some(table) = line.strip_prefix("pg_restore: processing data for table ") {
info!(table, "pg_restore: restoring table");
}
}

/// Run `pg_dump` according to `req` using the supplied [`CommandRunner`].
pub async fn run_pg_dump<R: CommandRunner + ?Sized>(
runner: &R,
Expand Down Expand Up @@ -871,24 +858,4 @@ mod tests {
let dbg = format!("{:?}", r);
assert!(dbg.contains("TokioCommandRunner"));
}

#[test]
fn emit_table_progress_detects_pg_dump_line() {
// Should not panic; emits tracing event (not assertable here but
// exercises the parsing path).
emit_table_progress(r#"pg_dump: dumping contents of table "public"."users""#);
}

#[test]
fn emit_table_progress_detects_pg_restore_line() {
emit_table_progress(r#"pg_restore: processing data for table "public"."orders""#);
}

#[test]
fn emit_table_progress_ignores_unrelated_lines() {
// Should not panic or emit anything for non-matching lines.
emit_table_progress("pg_dump: reading extensions");
emit_table_progress("pg_restore: connecting to database");
emit_table_progress("");
}
}
31 changes: 12 additions & 19 deletions pg_dbmigrator/src/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,33 +176,26 @@ impl Migrator {
.await?;
}

// 0.5. Ensure the target database exists — pg_restore needs it.
// Run target-database check and source logical-replication validation in parallel — they hit different servers so there is no ordering dependency.
self.report(
MigrationStage::Validate,
format!(
"ensuring target database `{}` exists",
"ensuring target database `{}` exists + verifying source logical replication",
self.config.target.database
),
)
.await;
ensure_target_database_exists(
&self.config.target.connection_string,
&self.config.target.database,
)
.await?;

// 0.6. Verify the source is configured for logical replication.
// Doing this *before* slot creation gives the operator a clean,
// actionable error instead of a confusing libpq error 30 s into
// CREATE_REPLICATION_SLOT.
self.report(
MigrationStage::Validate,
"verifying source is configured for logical replication",
)
.await;
verify_source_logical_replication_ready(&self.config.source.connection_string).await?;
let (target_db_result, source_repl_result) = tokio::join!(
ensure_target_database_exists(
&self.config.target.connection_string,
&self.config.target.database,
),
verify_source_logical_replication_ready(&self.config.source.connection_string),
);
target_db_result?;
source_repl_result?;

// 0.7. Pre-dump: VACUUM ANALYZE on source.
// Pre-dump: VACUUM ANALYZE on source.
let dump_path = self.dump_path_or_default("dump_online");
let mut token = self.load_or_init_resume(&dump_path).await?;

Expand Down
4 changes: 3 additions & 1 deletion pg_dbmigrator/src/preflight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ async fn fetch_published_tables(
FROM pg_class c \
JOIN pg_namespace n ON n.oid = c.relnamespace \
WHERE c.relkind IN ('r', 'p') \
AND n.nspname NOT IN ('pg_catalog', 'information_schema')",
AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') \
AND n.nspname NOT LIKE 'pg_temp_%' \
AND n.nspname NOT LIKE 'pg_toast_temp_%'",
&[],
)
.await?;
Expand Down
Loading
Loading