From 69c9ab20f997470a572d83e98d6fddcf40bbc760 Mon Sep 17 00:00:00 2001 From: danielshih Date: Fri, 8 May 2026 12:00:46 +0000 Subject: [PATCH 1/6] implement performance optimizations for migration process & rel v.0.2.0 --- Agent.md | 19 +++- Cargo.lock | 6 +- Cargo.toml | 2 +- pg_dbmigrator/src/dump.rs | 33 ------ pg_dbmigrator/src/orchestrator.rs | 31 +++--- pg_dbmigrator/src/sequences.rs | 172 ++++++++++++++++++++++++++++-- 6 files changed, 196 insertions(+), 67 deletions(-) diff --git a/Agent.md b/Agent.md index 46ee76c..2cc7451 100644 --- a/Agent.md +++ b/Agent.md @@ -69,15 +69,15 @@ docker-compose.test.yml # source :55432, target :55433 |--------|------| | `config.rs` | `MigrationConfig`/`OnlineOptions`, validation, performance defaults | | `error.rs` | `MigrationError` (thiserror) + `Result` | -| `dump.rs` | pg_dump wrapper, `CommandRunner` trait, argv builder, stderr progress parsing | +| `dump.rs` | pg_dump wrapper, `CommandRunner` trait, argv builder, stderr ingestion | | `restore.rs` | pg_restore/psql wrapper | | `analyze.rs` | Pre-dump VACUUM ANALYZE + post-restore ANALYZE | | `snapshot.rs` | Replication slot creation + exported snapshot | | `native_apply.rs` | CREATE SUBSCRIPTION, lag polling, `ApplyStats`, `drop_source_{publication,slot}` | -| `orchestrator.rs` | `Migrator` — wires all stages | +| `orchestrator.rs` | `Migrator` — wires all stages, parallel pre-flight via `tokio::join!` | | `progress.rs` | `ProgressReporter` trait + Tracing/Collecting impls | | `preflight.rs` | Source validation, `ensure_publication_exists` (auto-create) | -| `sequences.rs` | Source→target setval at cutover | +| `sequences.rs` | Source→target setval at cutover (batched PL/pgSQL for single round-trip) | | `resume.rs` | Resume token persistence | | `tls.rs` | TLS-aware connection helper | @@ -89,6 +89,13 @@ docker-compose.test.yml # source :55432, target :55433 4. **Sequence sync at cutover**: migrator runs setval() on all target sequences after cutover. 5. **Publication lifecycle**: auto-created publications are tracked (`pub_auto_created`) and dropped after cutover; pre-existing ones are never dropped. +### Performance optimizations + +- **Parallel pre-flight**: `ensure_target_database_exists` and `verify_source_logical_replication_ready` run concurrently via `tokio::join!` (they hit different servers). +- **Batched sequence sync**: `apply_sequences_to_target()` builds a single PL/pgSQL `DO` block with per-sequence exception handling, reducing N round-trips to 1. Falls back to individual statements if the batch fails. +- **Split-section restore**: pre-data → data → post-data for index-free COPY (30-60% faster index rebuild). +- **LZ4 compression**: default `lz4:1` for dump archives (fast compress/decompress). + --- ## 2. Design Principles (mandatory reading) @@ -117,7 +124,9 @@ docker-compose.test.yml # source :55432, target :55433 6. **Contract with pg_walstream** - Slot creation before dump. START_REPLICATION only after dump. - - Pinned at `0.6.2` via path dep to `../pg-walstream`. Verify `ChangeEvent`/`EventType` compatibility before bumping. + - Pinned at `0.6.3` via workspace dep. Verify `ChangeEvent`/`EventType` compatibility before bumping. + - Used functions: `quote_ident`, `quote_literal`, `parse_lsn`, `format_lsn`, `build_create_subscription_sql`, `build_disable_subscription_sql`, `build_detach_slot_sql`, `build_drop_subscription_sql`, `LogicalReplicationStream`, `ReplicationStreamConfig`, `CreateSubscriptionOptions`. + - pg_walstream has NO publication SQL builders — publication SQL in `preflight.rs` is hand-rolled (necessary). 7. **No unsolicited optimisation** - Make only the changes the task requires. No silent async conversions, container swaps, or new crate additions. @@ -220,7 +229,7 @@ make integration # e2e, requires Docker - **Small workspace** — most files are 200–400 lines. Read the entire file before editing. Use grep to confirm blast radius. - **Validate after editing** — run `cargo test -p pg_dbmigrator` after every change. - **New stage / mode** → must update ALL of: `MigrationStage` enum, `Migrator` entry point, CLI args, README, and this file. -- **pg_walstream** lives in sibling workspace (`../pg-walstream`). Pinned at `0.6.2` via path dep. Do NOT vendor/fork — propose changes upstream. +- **pg_walstream** pinned at `0.6.3` via workspace dep. Do NOT vendor/fork — propose changes upstream. - **Library/CLI boundary** — library returns `pg_dbmigrator::Result` only. `anyhow` is CLI/examples only. - **Versions** — all dependency versions live in workspace `Cargo.toml` under `[workspace.dependencies]`. Crate manifests use `xxx.workspace = true`. diff --git a/Cargo.lock b/Cargo.lock index fd09f33..4bc3997 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1025,7 +1025,7 @@ dependencies = [ [[package]] name = "offline_migration_example" -version = "0.1.1" +version = "0.2.0" dependencies = [ "anyhow", "pg_dbmigrator", @@ -1049,7 +1049,7 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "online_migration_example" -version = "0.1.1" +version = "0.2.0" dependencies = [ "anyhow", "pg_dbmigrator", @@ -1102,7 +1102,7 @@ checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" [[package]] name = "pg_dbmigrator" -version = "0.1.1" +version = "0.2.0" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 44808a8..85c7084 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ members = [ ] [workspace.package] -version = "0.1.1" +version = "0.2.0" edition = "2021" license = "BSD-3-Clause" authors = ["danielshih"] diff --git a/pg_dbmigrator/src/dump.rs b/pg_dbmigrator/src/dump.rs index 3c831ea..234b35f 100644 --- a/pg_dbmigrator/src/dump.rs +++ b/pg_dbmigrator/src/dump.rs @@ -277,7 +277,6 @@ impl CommandRunner for TokioCommandRunner { if is_restore { ingest_pg_restore_stderr_line(&line, &mut summary); } - emit_table_progress(&line); } Ok(None) => break, Err(_) => break, @@ -372,18 +371,6 @@ fn kill_child_group(pid: Option, sigkill: bool) { #[cfg(not(unix))] fn kill_child_group(_pid: Option, _sigkill: bool) {} -/// Detect table-level progress lines from pg_dump/pg_restore verbose stderr -/// and emit structured tracing events. Patterns: -/// pg_dump: dumping contents of table "schema"."table" -/// pg_restore: processing data for table "schema"."table" -fn emit_table_progress(line: &str) { - if let Some(table) = line.strip_prefix("pg_dump: dumping contents of table ") { - info!(table, "pg_dump: dumping table"); - } else if let Some(table) = line.strip_prefix("pg_restore: processing data for table ") { - info!(table, "pg_restore: restoring table"); - } -} - /// Run `pg_dump` according to `req` using the supplied [`CommandRunner`]. pub async fn run_pg_dump( runner: &R, @@ -871,24 +858,4 @@ mod tests { let dbg = format!("{:?}", r); assert!(dbg.contains("TokioCommandRunner")); } - - #[test] - fn emit_table_progress_detects_pg_dump_line() { - // Should not panic; emits tracing event (not assertable here but - // exercises the parsing path). - emit_table_progress(r#"pg_dump: dumping contents of table "public"."users""#); - } - - #[test] - fn emit_table_progress_detects_pg_restore_line() { - emit_table_progress(r#"pg_restore: processing data for table "public"."orders""#); - } - - #[test] - fn emit_table_progress_ignores_unrelated_lines() { - // Should not panic or emit anything for non-matching lines. - emit_table_progress("pg_dump: reading extensions"); - emit_table_progress("pg_restore: connecting to database"); - emit_table_progress(""); - } } diff --git a/pg_dbmigrator/src/orchestrator.rs b/pg_dbmigrator/src/orchestrator.rs index acef8f7..4931a9f 100644 --- a/pg_dbmigrator/src/orchestrator.rs +++ b/pg_dbmigrator/src/orchestrator.rs @@ -176,33 +176,26 @@ impl Migrator { .await?; } - // 0.5. Ensure the target database exists — pg_restore needs it. + // Run target-database check and source logical-replication validation in parallel — they hit different servers so there is no ordering dependency. self.report( MigrationStage::Validate, format!( - "ensuring target database `{}` exists", + "ensuring target database `{}` exists + verifying source logical replication", self.config.target.database ), ) .await; - ensure_target_database_exists( - &self.config.target.connection_string, - &self.config.target.database, - ) - .await?; - - // 0.6. Verify the source is configured for logical replication. - // Doing this *before* slot creation gives the operator a clean, - // actionable error instead of a confusing libpq error 30 s into - // CREATE_REPLICATION_SLOT. - self.report( - MigrationStage::Validate, - "verifying source is configured for logical replication", - ) - .await; - verify_source_logical_replication_ready(&self.config.source.connection_string).await?; + let (target_db_result, source_repl_result) = tokio::join!( + ensure_target_database_exists( + &self.config.target.connection_string, + &self.config.target.database, + ), + verify_source_logical_replication_ready(&self.config.source.connection_string), + ); + target_db_result?; + source_repl_result?; - // 0.7. Pre-dump: VACUUM ANALYZE on source. + // Pre-dump: VACUUM ANALYZE on source. let dump_path = self.dump_path_or_default("dump_online"); let mut token = self.load_or_init_resume(&dump_path).await?; diff --git a/pg_dbmigrator/src/sequences.rs b/pg_dbmigrator/src/sequences.rs index 8be9146..6927131 100644 --- a/pg_dbmigrator/src/sequences.rs +++ b/pg_dbmigrator/src/sequences.rs @@ -116,20 +116,133 @@ pub async fn collect_source_sequences( /// (e.g. owned by a role we can't act on) shouldn't block the rest of /// the migration from completing. The function returns the number of /// sequences successfully applied. +/// +/// When more than one sequence needs syncing, a single PL/pgSQL `DO` +/// block is executed to reduce network round-trips. Each `setval()` +/// inside the block is wrapped in its own exception handler so a failure +/// on one sequence doesn't prevent the others from being applied. If the +/// batched approach fails entirely (e.g. older PG without PL/pgSQL), we +/// fall back to individual statements. pub async fn apply_sequences_to_target( target: &Client, sequences: &[SourceSequence], ) -> Result { - let mut applied = 0usize; + let actionable: Vec<&SourceSequence> = sequences + .iter() + .filter(|s| { + if s.last_value.is_none() { + debug!( + schema = %s.schema, + name = %s.name, + "skipping sequence: never advanced on source", + ); + } + s.last_value.is_some() + }) + .collect(); + + if actionable.is_empty() { + return Ok(0); + } + + if actionable.len() == 1 { + return apply_single(target, actionable[0]).await; + } + + match apply_batch(target, &actionable).await { + Ok(applied) => Ok(applied), + Err(e) => { + warn!( + error = %e, + "batch sequence sync failed — falling back to individual statements" + ); + apply_individually(target, &actionable).await + } + } +} + +/// Build a PL/pgSQL `DO` block that applies all sequences in one round-trip. +/// Each setval is wrapped in a sub-block with exception handling so that a +/// failure on one sequence does not abort the rest. +/// +/// Public for unit tests. +pub fn build_batch_setval_sql(sequences: &[&SourceSequence]) -> Result { + let mut body = String::from("DO $seq_sync$\nDECLARE\n _applied int := 0;\nBEGIN\n"); + for seq in sequences { + let last_value = seq.last_value.unwrap_or(0); + let qualified = format!( + "{}.{}", + pg_walstream::quote_ident(&seq.schema)?, + pg_walstream::quote_ident(&seq.name)?, + ); + let qualified_lit = pg_walstream::quote_literal(&qualified)?; + body.push_str(" BEGIN\n"); + body.push_str(&format!( + " PERFORM setval({qualified_lit}::regclass, {last_value}::bigint, true);\n" + )); + body.push_str(" _applied := _applied + 1;\n"); + body.push_str(" EXCEPTION WHEN OTHERS THEN\n"); + body.push_str(&format!( + " RAISE WARNING 'setval failed for {}: %', SQLERRM;\n", + qualified.replace('\'', "''") + )); + body.push_str(" END;\n"); + } + body.push_str(" RAISE NOTICE 'sequences_applied=%', _applied;\n"); + body.push_str("END;\n$seq_sync$;"); + Ok(body) +} + +/// Execute the batched PL/pgSQL block and parse the applied count from the +/// RAISE NOTICE output. +async fn apply_batch(target: &Client, sequences: &[&SourceSequence]) -> Result { + let sql = build_batch_setval_sql(sequences)?; + target.batch_execute(&sql).await?; + // batch_execute doesn't give us the NOTICE content back through + // tokio-postgres easily, so we count the total minus expected failures. + // Since exceptions are caught inside the DO block, if batch_execute + // succeeds the block ran to completion. We optimistically report all as + // applied — individual failures would have raised WARNINGs in the PG log + // but the DO block itself succeeded. + // + // To get the true count we re-query. However that adds a round-trip. + // Instead, since the DO block completed without error, we trust that all + // per-sequence sub-blocks either succeeded or logged a warning. We'll + // verify by checking which sequences actually changed — but for + // simplicity and to match prior behavior, assume all applied. + let applied = sequences.len(); for seq in sequences { - let Some(last_value) = seq.last_value else { - debug!( + debug!(schema = %seq.schema, name = %seq.name, "synced sequence (batch)"); + } + Ok(applied) +} + +/// Apply a single sequence (used when there's only one). +async fn apply_single(target: &Client, seq: &SourceSequence) -> Result { + let last_value = seq.last_value.unwrap_or(0); + let sql = build_setval_sql(&seq.schema, &seq.name)?; + match target.execute(&sql, &[&last_value]).await { + Ok(_) => { + debug!(schema = %seq.schema, name = %seq.name, last_value, "synced sequence"); + Ok(1) + } + Err(e) => { + warn!( schema = %seq.schema, name = %seq.name, - "skipping sequence: never advanced on source", + error = %e, + "failed to sync sequence (continuing)" ); - continue; - }; + Ok(0) + } + } +} + +/// Sequential per-statement fallback. +async fn apply_individually(target: &Client, sequences: &[&SourceSequence]) -> Result { + let mut applied = 0usize; + for seq in sequences { + let last_value = seq.last_value.unwrap_or(0); let sql = build_setval_sql(&seq.schema, &seq.name)?; match target.execute(&sql, &[&last_value]).await { Ok(_) => { @@ -314,4 +427,51 @@ mod tests { fn collect_sql_with_schema_filter_uses_any_operator() { assert!(COLLECT_SEQUENCES_SQL_WITH_SCHEMA_FILTER.contains("ANY($1::text[])")); } + + #[test] + fn build_batch_setval_sql_produces_valid_plpgsql() { + let seqs = [ + SourceSequence { + schema: "public".into(), + name: "users_id_seq".into(), + last_value: Some(42), + }, + SourceSequence { + schema: "public".into(), + name: "orders_id_seq".into(), + last_value: Some(100), + }, + ]; + let refs: Vec<&SourceSequence> = seqs.iter().collect(); + let sql = build_batch_setval_sql(&refs).unwrap(); + assert!(sql.starts_with("DO $seq_sync$")); + assert!(sql.ends_with("$seq_sync$;")); + assert!(sql.contains("PERFORM setval")); + assert!(sql.contains("42::bigint")); + assert!(sql.contains("100::bigint")); + assert!(sql.contains("EXCEPTION WHEN OTHERS")); + assert!(sql.contains("_applied := _applied + 1")); + assert!(sql.contains("sequences_applied=%")); + } + + #[test] + fn build_batch_setval_sql_escapes_special_chars() { + let seqs = [SourceSequence { + schema: "my\"schema".into(), + name: "o'reilly_seq".into(), + last_value: Some(7), + }]; + let refs: Vec<&SourceSequence> = seqs.iter().collect(); + let sql = build_batch_setval_sql(&refs).unwrap(); + assert!(sql.contains("\"my\"\"schema\"")); + assert!(sql.contains("7::bigint")); + } + + #[test] + fn build_batch_setval_sql_empty_input() { + let refs: Vec<&SourceSequence> = vec![]; + let sql = build_batch_setval_sql(&refs).unwrap(); + assert!(sql.contains("DO $seq_sync$")); + assert!(!sql.contains("PERFORM setval")); + } } From 55ae19d6551d4b5d3c4972665fc4ddcc1401f2c4 Mon Sep 17 00:00:00 2001 From: danielshih Date: Fri, 8 May 2026 12:06:37 +0000 Subject: [PATCH 2/6] enhance sequence synchronization by using a temporary table for applied counts --- pg_dbmigrator/src/sequences.rs | 51 +++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/pg_dbmigrator/src/sequences.rs b/pg_dbmigrator/src/sequences.rs index 6927131..0ff3c25 100644 --- a/pg_dbmigrator/src/sequences.rs +++ b/pg_dbmigrator/src/sequences.rs @@ -167,7 +167,12 @@ pub async fn apply_sequences_to_target( /// /// Public for unit tests. pub fn build_batch_setval_sql(sequences: &[&SourceSequence]) -> Result { - let mut body = String::from("DO $seq_sync$\nDECLARE\n _applied int := 0;\nBEGIN\n"); + let mut body = String::new(); + body.push_str( + "CREATE TEMP TABLE IF NOT EXISTS _seq_sync_result (applied int) ON COMMIT DROP;\n", + ); + body.push_str("TRUNCATE _seq_sync_result;\n"); + body.push_str("DO $seq_sync$\nDECLARE\n _applied int := 0;\nBEGIN\n"); for seq in sequences { let last_value = seq.last_value.unwrap_or(0); let qualified = format!( @@ -184,11 +189,11 @@ pub fn build_batch_setval_sql(sequences: &[&SourceSequence]) -> Result { body.push_str(" EXCEPTION WHEN OTHERS THEN\n"); body.push_str(&format!( " RAISE WARNING 'setval failed for {}: %', SQLERRM;\n", - qualified.replace('\'', "''") + qualified.replace('\'', "''").replace('%', "%%") )); body.push_str(" END;\n"); } - body.push_str(" RAISE NOTICE 'sequences_applied=%', _applied;\n"); + body.push_str(" INSERT INTO _seq_sync_result VALUES (_applied);\n"); body.push_str("END;\n$seq_sync$;"); Ok(body) } @@ -198,23 +203,14 @@ pub fn build_batch_setval_sql(sequences: &[&SourceSequence]) -> Result { async fn apply_batch(target: &Client, sequences: &[&SourceSequence]) -> Result { let sql = build_batch_setval_sql(sequences)?; target.batch_execute(&sql).await?; - // batch_execute doesn't give us the NOTICE content back through - // tokio-postgres easily, so we count the total minus expected failures. - // Since exceptions are caught inside the DO block, if batch_execute - // succeeds the block ran to completion. We optimistically report all as - // applied — individual failures would have raised WARNINGs in the PG log - // but the DO block itself succeeded. - // - // To get the true count we re-query. However that adds a round-trip. - // Instead, since the DO block completed without error, we trust that all - // per-sequence sub-blocks either succeeded or logged a warning. We'll - // verify by checking which sequences actually changed — but for - // simplicity and to match prior behavior, assume all applied. - let applied = sequences.len(); + let row = target + .query_one("SELECT applied FROM _seq_sync_result", &[]) + .await?; + let applied: i32 = row.get(0); for seq in sequences { debug!(schema = %seq.schema, name = %seq.name, "synced sequence (batch)"); } - Ok(applied) + Ok(applied as usize) } /// Apply a single sequence (used when there's only one). @@ -444,14 +440,15 @@ mod tests { ]; let refs: Vec<&SourceSequence> = seqs.iter().collect(); let sql = build_batch_setval_sql(&refs).unwrap(); - assert!(sql.starts_with("DO $seq_sync$")); + assert!(sql.contains("CREATE TEMP TABLE IF NOT EXISTS _seq_sync_result")); + assert!(sql.contains("DO $seq_sync$")); assert!(sql.ends_with("$seq_sync$;")); assert!(sql.contains("PERFORM setval")); assert!(sql.contains("42::bigint")); assert!(sql.contains("100::bigint")); assert!(sql.contains("EXCEPTION WHEN OTHERS")); assert!(sql.contains("_applied := _applied + 1")); - assert!(sql.contains("sequences_applied=%")); + assert!(sql.contains("INSERT INTO _seq_sync_result VALUES (_applied)")); } #[test] @@ -467,11 +464,27 @@ mod tests { assert!(sql.contains("7::bigint")); } + #[test] + fn build_batch_setval_sql_escapes_percent_in_raise_warning() { + let seqs = [SourceSequence { + schema: "public".into(), + name: "pct%seq".into(), + last_value: Some(1), + }]; + let refs: Vec<&SourceSequence> = seqs.iter().collect(); + let sql = build_batch_setval_sql(&refs).unwrap(); + assert!( + sql.contains("%%"), + "percent signs in identifiers must be doubled for RAISE WARNING" + ); + } + #[test] fn build_batch_setval_sql_empty_input() { let refs: Vec<&SourceSequence> = vec![]; let sql = build_batch_setval_sql(&refs).unwrap(); assert!(sql.contains("DO $seq_sync$")); assert!(!sql.contains("PERFORM setval")); + assert!(sql.contains("INSERT INTO _seq_sync_result VALUES (_applied)")); } } From ed20dcc99566f2d05a0f99d6712c38c3edfa59de Mon Sep 17 00:00:00 2001 From: danielshih Date: Fri, 8 May 2026 12:36:11 +0000 Subject: [PATCH 3/6] implement SeqSyncTarget trait for sequence synchronization and add mock testing --- pg_dbmigrator/src/sequences.rs | 229 +++++++++++++++++++++++++++++++-- 1 file changed, 219 insertions(+), 10 deletions(-) diff --git a/pg_dbmigrator/src/sequences.rs b/pg_dbmigrator/src/sequences.rs index 0ff3c25..1c28c78 100644 --- a/pg_dbmigrator/src/sequences.rs +++ b/pg_dbmigrator/src/sequences.rs @@ -30,6 +30,7 @@ //! //! [`pg_sequence_last_value(regclass)`]: https://www.postgresql.org/docs/current/functions-info.html +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tokio_postgres::Client; use tracing::{debug, info, warn}; @@ -105,6 +106,32 @@ pub async fn collect_source_sequences( Ok(out) } +/// Abstraction over the target-side database operations needed by the +/// sequence sync logic, following the [`CommandRunner`](crate::dump::CommandRunner) +/// pattern so unit tests can substitute a mock without a real PostgreSQL. +#[async_trait] +pub(crate) trait SeqSyncTarget: Send + Sync { + async fn execute_setval(&self, sql: &str, last_value: i64) -> Result; + async fn batch_execute_sql(&self, sql: &str) -> Result<()>; + async fn query_batch_applied(&self) -> Result; +} + +#[async_trait] +impl SeqSyncTarget for Client { + async fn execute_setval(&self, sql: &str, last_value: i64) -> Result { + Ok(self.execute(sql, &[&last_value]).await?) + } + async fn batch_execute_sql(&self, sql: &str) -> Result<()> { + Ok(Client::batch_execute(self, sql).await?) + } + async fn query_batch_applied(&self) -> Result { + let row = self + .query_one("SELECT applied FROM _seq_sync_result", &[]) + .await?; + Ok(row.get(0)) + } +} + /// Apply the source-side sequence state to the target via `setval(...)`. /// /// Sequences whose source-side `last_value` is `None` are skipped — they @@ -126,6 +153,13 @@ pub async fn collect_source_sequences( pub async fn apply_sequences_to_target( target: &Client, sequences: &[SourceSequence], +) -> Result { + apply_sequences_impl(target, sequences).await +} + +async fn apply_sequences_impl( + target: &dyn SeqSyncTarget, + sequences: &[SourceSequence], ) -> Result { let actionable: Vec<&SourceSequence> = sequences .iter() @@ -200,13 +234,10 @@ pub fn build_batch_setval_sql(sequences: &[&SourceSequence]) -> Result { /// Execute the batched PL/pgSQL block and parse the applied count from the /// RAISE NOTICE output. -async fn apply_batch(target: &Client, sequences: &[&SourceSequence]) -> Result { +async fn apply_batch(target: &dyn SeqSyncTarget, sequences: &[&SourceSequence]) -> Result { let sql = build_batch_setval_sql(sequences)?; - target.batch_execute(&sql).await?; - let row = target - .query_one("SELECT applied FROM _seq_sync_result", &[]) - .await?; - let applied: i32 = row.get(0); + target.batch_execute_sql(&sql).await?; + let applied = target.query_batch_applied().await?; for seq in sequences { debug!(schema = %seq.schema, name = %seq.name, "synced sequence (batch)"); } @@ -214,10 +245,10 @@ async fn apply_batch(target: &Client, sequences: &[&SourceSequence]) -> Result Result { +async fn apply_single(target: &dyn SeqSyncTarget, seq: &SourceSequence) -> Result { let last_value = seq.last_value.unwrap_or(0); let sql = build_setval_sql(&seq.schema, &seq.name)?; - match target.execute(&sql, &[&last_value]).await { + match target.execute_setval(&sql, last_value).await { Ok(_) => { debug!(schema = %seq.schema, name = %seq.name, last_value, "synced sequence"); Ok(1) @@ -235,12 +266,15 @@ async fn apply_single(target: &Client, seq: &SourceSequence) -> Result { } /// Sequential per-statement fallback. -async fn apply_individually(target: &Client, sequences: &[&SourceSequence]) -> Result { +async fn apply_individually( + target: &dyn SeqSyncTarget, + sequences: &[&SourceSequence], +) -> Result { let mut applied = 0usize; for seq in sequences { let last_value = seq.last_value.unwrap_or(0); let sql = build_setval_sql(&seq.schema, &seq.name)?; - match target.execute(&sql, &[&last_value]).await { + match target.execute_setval(&sql, last_value).await { Ok(_) => { applied += 1; debug!(schema = %seq.schema, name = %seq.name, last_value, "synced sequence"); @@ -487,4 +521,179 @@ mod tests { assert!(!sql.contains("PERFORM setval")); assert!(sql.contains("INSERT INTO _seq_sync_result VALUES (_applied)")); } + + // ── Mock-based async tests ────────────────────────────────────────────── + + use crate::error::MigrationError; + use std::collections::VecDeque; + use std::sync::Mutex; + + struct MockTarget { + setval_results: Mutex>>, + batch_exec_result: Mutex>>, + batch_applied: Mutex>>, + } + + impl MockTarget { + fn ok(applied_count: i32) -> Self { + Self { + setval_results: Mutex::new(VecDeque::new()), + batch_exec_result: Mutex::new(Some(Ok(()))), + batch_applied: Mutex::new(Some(Ok(applied_count))), + } + } + + fn batch_fails() -> Self { + Self { + setval_results: Mutex::new(VecDeque::new()), + batch_exec_result: Mutex::new(Some(Err(MigrationError::config( + "batch not supported", + )))), + batch_applied: Mutex::new(None), + } + } + + fn with_setval_results(mut self, results: Vec>) -> Self { + self.setval_results = Mutex::new(results.into()); + self + } + } + + #[async_trait] + impl SeqSyncTarget for MockTarget { + async fn execute_setval(&self, _sql: &str, _last_value: i64) -> Result { + self.setval_results + .lock() + .unwrap() + .pop_front() + .unwrap_or(Ok(1)) + } + async fn batch_execute_sql(&self, _sql: &str) -> Result<()> { + self.batch_exec_result + .lock() + .unwrap() + .take() + .unwrap_or(Ok(())) + } + async fn query_batch_applied(&self) -> Result { + self.batch_applied + .lock() + .unwrap() + .take() + .unwrap_or(Ok(0)) + } + } + + fn seq(schema: &str, name: &str, val: Option) -> SourceSequence { + SourceSequence { + schema: schema.into(), + name: name.into(), + last_value: val, + } + } + + #[tokio::test] + async fn apply_all_none_last_value_returns_zero() { + let target = MockTarget::ok(0); + let seqs = vec![seq("public", "s1", None), seq("public", "s2", None)]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 0); + } + + #[tokio::test] + async fn apply_empty_sequences_returns_zero() { + let target = MockTarget::ok(0); + let applied = apply_sequences_impl(&target, &[]).await.unwrap(); + assert_eq!(applied, 0); + } + + #[tokio::test] + async fn apply_single_sequence_success() { + let target = MockTarget::ok(0).with_setval_results(vec![Ok(1)]); + let seqs = vec![seq("public", "users_id_seq", Some(42))]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 1); + } + + #[tokio::test] + async fn apply_single_sequence_failure_returns_zero() { + let target = MockTarget::ok(0).with_setval_results(vec![Err( + MigrationError::config("permission denied"), + )]); + let seqs = vec![seq("public", "users_id_seq", Some(42))]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 0); + } + + #[tokio::test] + async fn apply_batch_success() { + let target = MockTarget::ok(2); + let seqs = vec![ + seq("public", "s1", Some(10)), + seq("public", "s2", Some(20)), + ]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 2); + } + + #[tokio::test] + async fn apply_batch_reports_partial_success() { + let target = MockTarget::ok(1); + let seqs = vec![ + seq("public", "s1", Some(10)), + seq("public", "s2", Some(20)), + ]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 1); + } + + #[tokio::test] + async fn apply_batch_failure_falls_back_to_individual() { + let target = MockTarget::batch_fails().with_setval_results(vec![Ok(1), Ok(1)]); + let seqs = vec![ + seq("public", "s1", Some(10)), + seq("public", "s2", Some(20)), + ]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 2); + } + + #[tokio::test] + async fn apply_individually_mixed_results() { + let target = MockTarget::batch_fails().with_setval_results(vec![ + Ok(1), + Err(MigrationError::config("fail")), + Ok(1), + ]); + let seqs = vec![ + seq("public", "s1", Some(1)), + seq("public", "s2", Some(2)), + seq("public", "s3", Some(3)), + ]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 2); + } + + #[tokio::test] + async fn apply_filters_none_and_routes_remaining() { + let target = MockTarget::ok(0).with_setval_results(vec![Ok(1)]); + let seqs = vec![ + seq("public", "never_used", None), + seq("public", "used_seq", Some(99)), + ]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 1); + } + + #[tokio::test] + async fn apply_filters_none_and_routes_to_batch() { + let target = MockTarget::ok(2); + let seqs = vec![ + seq("public", "skip_me", None), + seq("public", "s1", Some(10)), + seq("public", "s2", Some(20)), + ]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 2); + } } From db023ef273b77fd1b6d2ab897ce14e82c5bdaba5 Mon Sep 17 00:00:00 2001 From: danielshih Date: Fri, 8 May 2026 12:37:41 +0000 Subject: [PATCH 4/6] refactor: simplify sequence test setup by consolidating vector initialization --- pg_dbmigrator/src/sequences.rs | 26 ++++++-------------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/pg_dbmigrator/src/sequences.rs b/pg_dbmigrator/src/sequences.rs index 1c28c78..7d15de0 100644 --- a/pg_dbmigrator/src/sequences.rs +++ b/pg_dbmigrator/src/sequences.rs @@ -576,11 +576,7 @@ mod tests { .unwrap_or(Ok(())) } async fn query_batch_applied(&self) -> Result { - self.batch_applied - .lock() - .unwrap() - .take() - .unwrap_or(Ok(0)) + self.batch_applied.lock().unwrap().take().unwrap_or(Ok(0)) } } @@ -617,9 +613,8 @@ mod tests { #[tokio::test] async fn apply_single_sequence_failure_returns_zero() { - let target = MockTarget::ok(0).with_setval_results(vec![Err( - MigrationError::config("permission denied"), - )]); + let target = MockTarget::ok(0) + .with_setval_results(vec![Err(MigrationError::config("permission denied"))]); let seqs = vec![seq("public", "users_id_seq", Some(42))]; let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); assert_eq!(applied, 0); @@ -628,10 +623,7 @@ mod tests { #[tokio::test] async fn apply_batch_success() { let target = MockTarget::ok(2); - let seqs = vec![ - seq("public", "s1", Some(10)), - seq("public", "s2", Some(20)), - ]; + let seqs = vec![seq("public", "s1", Some(10)), seq("public", "s2", Some(20))]; let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); assert_eq!(applied, 2); } @@ -639,10 +631,7 @@ mod tests { #[tokio::test] async fn apply_batch_reports_partial_success() { let target = MockTarget::ok(1); - let seqs = vec![ - seq("public", "s1", Some(10)), - seq("public", "s2", Some(20)), - ]; + let seqs = vec![seq("public", "s1", Some(10)), seq("public", "s2", Some(20))]; let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); assert_eq!(applied, 1); } @@ -650,10 +639,7 @@ mod tests { #[tokio::test] async fn apply_batch_failure_falls_back_to_individual() { let target = MockTarget::batch_fails().with_setval_results(vec![Ok(1), Ok(1)]); - let seqs = vec![ - seq("public", "s1", Some(10)), - seq("public", "s2", Some(20)), - ]; + let seqs = vec![seq("public", "s1", Some(10)), seq("public", "s2", Some(20))]; let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); assert_eq!(applied, 2); } From 88cc52bfd13cbe00391cc0be5d4f60f6b41d3528 Mon Sep 17 00:00:00 2001 From: danielshih Date: Fri, 8 May 2026 12:46:38 +0000 Subject: [PATCH 5/6] refactor: update temporary table creation and sync block naming in sequence SQL generation --- pg_dbmigrator/src/sequences.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/pg_dbmigrator/src/sequences.rs b/pg_dbmigrator/src/sequences.rs index 7d15de0..775d84f 100644 --- a/pg_dbmigrator/src/sequences.rs +++ b/pg_dbmigrator/src/sequences.rs @@ -202,11 +202,9 @@ async fn apply_sequences_impl( /// Public for unit tests. pub fn build_batch_setval_sql(sequences: &[&SourceSequence]) -> Result { let mut body = String::new(); - body.push_str( - "CREATE TEMP TABLE IF NOT EXISTS _seq_sync_result (applied int) ON COMMIT DROP;\n", - ); + body.push_str("CREATE TEMP TABLE IF NOT EXISTS _seq_sync_result (applied int);\n"); body.push_str("TRUNCATE _seq_sync_result;\n"); - body.push_str("DO $seq_sync$\nDECLARE\n _applied int := 0;\nBEGIN\n"); + body.push_str("DO $__pg_dbmigrator_seq_sync__$\nDECLARE\n _applied int := 0;\nBEGIN\n"); for seq in sequences { let last_value = seq.last_value.unwrap_or(0); let qualified = format!( @@ -228,7 +226,7 @@ pub fn build_batch_setval_sql(sequences: &[&SourceSequence]) -> Result { body.push_str(" END;\n"); } body.push_str(" INSERT INTO _seq_sync_result VALUES (_applied);\n"); - body.push_str("END;\n$seq_sync$;"); + body.push_str("END;\n$__pg_dbmigrator_seq_sync__$;"); Ok(body) } @@ -475,8 +473,9 @@ mod tests { let refs: Vec<&SourceSequence> = seqs.iter().collect(); let sql = build_batch_setval_sql(&refs).unwrap(); assert!(sql.contains("CREATE TEMP TABLE IF NOT EXISTS _seq_sync_result")); - assert!(sql.contains("DO $seq_sync$")); - assert!(sql.ends_with("$seq_sync$;")); + assert!(!sql.contains("ON COMMIT DROP")); + assert!(sql.contains("DO $__pg_dbmigrator_seq_sync__$")); + assert!(sql.ends_with("$__pg_dbmigrator_seq_sync__$;")); assert!(sql.contains("PERFORM setval")); assert!(sql.contains("42::bigint")); assert!(sql.contains("100::bigint")); @@ -517,7 +516,7 @@ mod tests { fn build_batch_setval_sql_empty_input() { let refs: Vec<&SourceSequence> = vec![]; let sql = build_batch_setval_sql(&refs).unwrap(); - assert!(sql.contains("DO $seq_sync$")); + assert!(sql.contains("DO $__pg_dbmigrator_seq_sync__$")); assert!(!sql.contains("PERFORM setval")); assert!(sql.contains("INSERT INTO _seq_sync_result VALUES (_applied)")); } From 9bf936b2b1ba406df5e60e01de189dbb40789293 Mon Sep 17 00:00:00 2001 From: danielshih Date: Fri, 8 May 2026 12:59:54 +0000 Subject: [PATCH 6/6] refactor: update SQL queries in integration tests to improve reliability and performance --- pg_dbmigrator/src/preflight.rs | 4 +++- pg_dbmigrator/tests/postgres_integration.rs | 18 ++++++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/pg_dbmigrator/src/preflight.rs b/pg_dbmigrator/src/preflight.rs index 4f55585..be338fe 100644 --- a/pg_dbmigrator/src/preflight.rs +++ b/pg_dbmigrator/src/preflight.rs @@ -233,7 +233,9 @@ async fn fetch_published_tables( FROM pg_class c \ JOIN pg_namespace n ON n.oid = c.relnamespace \ WHERE c.relkind IN ('r', 'p') \ - AND n.nspname NOT IN ('pg_catalog', 'information_schema')", + AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') \ + AND n.nspname NOT LIKE 'pg_temp_%' \ + AND n.nspname NOT LIKE 'pg_toast_temp_%'", &[], ) .await?; diff --git a/pg_dbmigrator/tests/postgres_integration.rs b/pg_dbmigrator/tests/postgres_integration.rs index 04758ac..33ae949 100644 --- a/pg_dbmigrator/tests/postgres_integration.rs +++ b/pg_dbmigrator/tests/postgres_integration.rs @@ -796,10 +796,17 @@ async fn ensure_publication_excludes_tables_when_no_includes() { ); // Verify skip_me is not in the publication's table list. + // Use pg_publication_rel JOIN pg_class instead of pg_publication_tables + // because the latter calls relation_open() on every OID and crashes if + // a concurrently-running test dropped a table after the publication was + // created. let skip_rows = client .query( - "SELECT schemaname, tablename FROM pg_publication_tables \ - WHERE pubname = 'integ_excl_pub' AND tablename = 'skip_me'", + "SELECT c.relname FROM pg_publication_rel pr \ + JOIN pg_class c ON c.oid = pr.prrelid \ + JOIN pg_namespace n ON n.oid = c.relnamespace \ + WHERE pr.prpubid = (SELECT oid FROM pg_publication WHERE pubname = 'integ_excl_pub') \ + AND c.relname = 'skip_me'", &[], ) .await @@ -812,8 +819,11 @@ async fn ensure_publication_excludes_tables_when_no_includes() { // Verify keep_me IS in the publication. let keep_rows = client .query( - "SELECT schemaname, tablename FROM pg_publication_tables \ - WHERE pubname = 'integ_excl_pub' AND tablename = 'keep_me'", + "SELECT c.relname FROM pg_publication_rel pr \ + JOIN pg_class c ON c.oid = pr.prrelid \ + JOIN pg_namespace n ON n.oid = c.relnamespace \ + WHERE pr.prpubid = (SELECT oid FROM pg_publication WHERE pubname = 'integ_excl_pub') \ + AND c.relname = 'keep_me'", &[], ) .await