diff --git a/Agent.md b/Agent.md index 46ee76c..2cc7451 100644 --- a/Agent.md +++ b/Agent.md @@ -69,15 +69,15 @@ docker-compose.test.yml # source :55432, target :55433 |--------|------| | `config.rs` | `MigrationConfig`/`OnlineOptions`, validation, performance defaults | | `error.rs` | `MigrationError` (thiserror) + `Result` | -| `dump.rs` | pg_dump wrapper, `CommandRunner` trait, argv builder, stderr progress parsing | +| `dump.rs` | pg_dump wrapper, `CommandRunner` trait, argv builder, stderr ingestion | | `restore.rs` | pg_restore/psql wrapper | | `analyze.rs` | Pre-dump VACUUM ANALYZE + post-restore ANALYZE | | `snapshot.rs` | Replication slot creation + exported snapshot | | `native_apply.rs` | CREATE SUBSCRIPTION, lag polling, `ApplyStats`, `drop_source_{publication,slot}` | -| `orchestrator.rs` | `Migrator` — wires all stages | +| `orchestrator.rs` | `Migrator` — wires all stages, parallel pre-flight via `tokio::join!` | | `progress.rs` | `ProgressReporter` trait + Tracing/Collecting impls | | `preflight.rs` | Source validation, `ensure_publication_exists` (auto-create) | -| `sequences.rs` | Source→target setval at cutover | +| `sequences.rs` | Source→target setval at cutover (batched PL/pgSQL for single round-trip) | | `resume.rs` | Resume token persistence | | `tls.rs` | TLS-aware connection helper | @@ -89,6 +89,13 @@ docker-compose.test.yml # source :55432, target :55433 4. **Sequence sync at cutover**: migrator runs setval() on all target sequences after cutover. 5. **Publication lifecycle**: auto-created publications are tracked (`pub_auto_created`) and dropped after cutover; pre-existing ones are never dropped. +### Performance optimizations + +- **Parallel pre-flight**: `ensure_target_database_exists` and `verify_source_logical_replication_ready` run concurrently via `tokio::join!` (they hit different servers). +- **Batched sequence sync**: `apply_sequences_to_target()` builds a single PL/pgSQL `DO` block with per-sequence exception handling, reducing N round-trips to 1. Falls back to individual statements if the batch fails. +- **Split-section restore**: pre-data → data → post-data for index-free COPY (30-60% faster index rebuild). +- **LZ4 compression**: default `lz4:1` for dump archives (fast compress/decompress). + --- ## 2. Design Principles (mandatory reading) @@ -117,7 +124,9 @@ docker-compose.test.yml # source :55432, target :55433 6. **Contract with pg_walstream** - Slot creation before dump. START_REPLICATION only after dump. - - Pinned at `0.6.2` via path dep to `../pg-walstream`. Verify `ChangeEvent`/`EventType` compatibility before bumping. + - Pinned at `0.6.3` via workspace dep. Verify `ChangeEvent`/`EventType` compatibility before bumping. + - Used functions: `quote_ident`, `quote_literal`, `parse_lsn`, `format_lsn`, `build_create_subscription_sql`, `build_disable_subscription_sql`, `build_detach_slot_sql`, `build_drop_subscription_sql`, `LogicalReplicationStream`, `ReplicationStreamConfig`, `CreateSubscriptionOptions`. + - pg_walstream has NO publication SQL builders — publication SQL in `preflight.rs` is hand-rolled (necessary). 7. **No unsolicited optimisation** - Make only the changes the task requires. No silent async conversions, container swaps, or new crate additions. @@ -220,7 +229,7 @@ make integration # e2e, requires Docker - **Small workspace** — most files are 200–400 lines. Read the entire file before editing. Use grep to confirm blast radius. - **Validate after editing** — run `cargo test -p pg_dbmigrator` after every change. - **New stage / mode** → must update ALL of: `MigrationStage` enum, `Migrator` entry point, CLI args, README, and this file. -- **pg_walstream** lives in sibling workspace (`../pg-walstream`). Pinned at `0.6.2` via path dep. Do NOT vendor/fork — propose changes upstream. +- **pg_walstream** pinned at `0.6.3` via workspace dep. Do NOT vendor/fork — propose changes upstream. - **Library/CLI boundary** — library returns `pg_dbmigrator::Result` only. `anyhow` is CLI/examples only. - **Versions** — all dependency versions live in workspace `Cargo.toml` under `[workspace.dependencies]`. Crate manifests use `xxx.workspace = true`. diff --git a/Cargo.lock b/Cargo.lock index fd09f33..4bc3997 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1025,7 +1025,7 @@ dependencies = [ [[package]] name = "offline_migration_example" -version = "0.1.1" +version = "0.2.0" dependencies = [ "anyhow", "pg_dbmigrator", @@ -1049,7 +1049,7 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "online_migration_example" -version = "0.1.1" +version = "0.2.0" dependencies = [ "anyhow", "pg_dbmigrator", @@ -1102,7 +1102,7 @@ checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" [[package]] name = "pg_dbmigrator" -version = "0.1.1" +version = "0.2.0" dependencies = [ "anyhow", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 44808a8..85c7084 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ members = [ ] [workspace.package] -version = "0.1.1" +version = "0.2.0" edition = "2021" license = "BSD-3-Clause" authors = ["danielshih"] diff --git a/pg_dbmigrator/src/dump.rs b/pg_dbmigrator/src/dump.rs index 3c831ea..234b35f 100644 --- a/pg_dbmigrator/src/dump.rs +++ b/pg_dbmigrator/src/dump.rs @@ -277,7 +277,6 @@ impl CommandRunner for TokioCommandRunner { if is_restore { ingest_pg_restore_stderr_line(&line, &mut summary); } - emit_table_progress(&line); } Ok(None) => break, Err(_) => break, @@ -372,18 +371,6 @@ fn kill_child_group(pid: Option, sigkill: bool) { #[cfg(not(unix))] fn kill_child_group(_pid: Option, _sigkill: bool) {} -/// Detect table-level progress lines from pg_dump/pg_restore verbose stderr -/// and emit structured tracing events. Patterns: -/// pg_dump: dumping contents of table "schema"."table" -/// pg_restore: processing data for table "schema"."table" -fn emit_table_progress(line: &str) { - if let Some(table) = line.strip_prefix("pg_dump: dumping contents of table ") { - info!(table, "pg_dump: dumping table"); - } else if let Some(table) = line.strip_prefix("pg_restore: processing data for table ") { - info!(table, "pg_restore: restoring table"); - } -} - /// Run `pg_dump` according to `req` using the supplied [`CommandRunner`]. pub async fn run_pg_dump( runner: &R, @@ -871,24 +858,4 @@ mod tests { let dbg = format!("{:?}", r); assert!(dbg.contains("TokioCommandRunner")); } - - #[test] - fn emit_table_progress_detects_pg_dump_line() { - // Should not panic; emits tracing event (not assertable here but - // exercises the parsing path). - emit_table_progress(r#"pg_dump: dumping contents of table "public"."users""#); - } - - #[test] - fn emit_table_progress_detects_pg_restore_line() { - emit_table_progress(r#"pg_restore: processing data for table "public"."orders""#); - } - - #[test] - fn emit_table_progress_ignores_unrelated_lines() { - // Should not panic or emit anything for non-matching lines. - emit_table_progress("pg_dump: reading extensions"); - emit_table_progress("pg_restore: connecting to database"); - emit_table_progress(""); - } } diff --git a/pg_dbmigrator/src/orchestrator.rs b/pg_dbmigrator/src/orchestrator.rs index acef8f7..4931a9f 100644 --- a/pg_dbmigrator/src/orchestrator.rs +++ b/pg_dbmigrator/src/orchestrator.rs @@ -176,33 +176,26 @@ impl Migrator { .await?; } - // 0.5. Ensure the target database exists — pg_restore needs it. + // Run target-database check and source logical-replication validation in parallel — they hit different servers so there is no ordering dependency. self.report( MigrationStage::Validate, format!( - "ensuring target database `{}` exists", + "ensuring target database `{}` exists + verifying source logical replication", self.config.target.database ), ) .await; - ensure_target_database_exists( - &self.config.target.connection_string, - &self.config.target.database, - ) - .await?; - - // 0.6. Verify the source is configured for logical replication. - // Doing this *before* slot creation gives the operator a clean, - // actionable error instead of a confusing libpq error 30 s into - // CREATE_REPLICATION_SLOT. - self.report( - MigrationStage::Validate, - "verifying source is configured for logical replication", - ) - .await; - verify_source_logical_replication_ready(&self.config.source.connection_string).await?; + let (target_db_result, source_repl_result) = tokio::join!( + ensure_target_database_exists( + &self.config.target.connection_string, + &self.config.target.database, + ), + verify_source_logical_replication_ready(&self.config.source.connection_string), + ); + target_db_result?; + source_repl_result?; - // 0.7. Pre-dump: VACUUM ANALYZE on source. + // Pre-dump: VACUUM ANALYZE on source. let dump_path = self.dump_path_or_default("dump_online"); let mut token = self.load_or_init_resume(&dump_path).await?; diff --git a/pg_dbmigrator/src/preflight.rs b/pg_dbmigrator/src/preflight.rs index 4f55585..be338fe 100644 --- a/pg_dbmigrator/src/preflight.rs +++ b/pg_dbmigrator/src/preflight.rs @@ -233,7 +233,9 @@ async fn fetch_published_tables( FROM pg_class c \ JOIN pg_namespace n ON n.oid = c.relnamespace \ WHERE c.relkind IN ('r', 'p') \ - AND n.nspname NOT IN ('pg_catalog', 'information_schema')", + AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') \ + AND n.nspname NOT LIKE 'pg_temp_%' \ + AND n.nspname NOT LIKE 'pg_toast_temp_%'", &[], ) .await?; diff --git a/pg_dbmigrator/src/sequences.rs b/pg_dbmigrator/src/sequences.rs index 8be9146..775d84f 100644 --- a/pg_dbmigrator/src/sequences.rs +++ b/pg_dbmigrator/src/sequences.rs @@ -30,6 +30,7 @@ //! //! [`pg_sequence_last_value(regclass)`]: https://www.postgresql.org/docs/current/functions-info.html +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tokio_postgres::Client; use tracing::{debug, info, warn}; @@ -105,6 +106,32 @@ pub async fn collect_source_sequences( Ok(out) } +/// Abstraction over the target-side database operations needed by the +/// sequence sync logic, following the [`CommandRunner`](crate::dump::CommandRunner) +/// pattern so unit tests can substitute a mock without a real PostgreSQL. +#[async_trait] +pub(crate) trait SeqSyncTarget: Send + Sync { + async fn execute_setval(&self, sql: &str, last_value: i64) -> Result; + async fn batch_execute_sql(&self, sql: &str) -> Result<()>; + async fn query_batch_applied(&self) -> Result; +} + +#[async_trait] +impl SeqSyncTarget for Client { + async fn execute_setval(&self, sql: &str, last_value: i64) -> Result { + Ok(self.execute(sql, &[&last_value]).await?) + } + async fn batch_execute_sql(&self, sql: &str) -> Result<()> { + Ok(Client::batch_execute(self, sql).await?) + } + async fn query_batch_applied(&self) -> Result { + let row = self + .query_one("SELECT applied FROM _seq_sync_result", &[]) + .await?; + Ok(row.get(0)) + } +} + /// Apply the source-side sequence state to the target via `setval(...)`. /// /// Sequences whose source-side `last_value` is `None` are skipped — they @@ -116,22 +143,136 @@ pub async fn collect_source_sequences( /// (e.g. owned by a role we can't act on) shouldn't block the rest of /// the migration from completing. The function returns the number of /// sequences successfully applied. +/// +/// When more than one sequence needs syncing, a single PL/pgSQL `DO` +/// block is executed to reduce network round-trips. Each `setval()` +/// inside the block is wrapped in its own exception handler so a failure +/// on one sequence doesn't prevent the others from being applied. If the +/// batched approach fails entirely (e.g. older PG without PL/pgSQL), we +/// fall back to individual statements. pub async fn apply_sequences_to_target( target: &Client, sequences: &[SourceSequence], ) -> Result { - let mut applied = 0usize; + apply_sequences_impl(target, sequences).await +} + +async fn apply_sequences_impl( + target: &dyn SeqSyncTarget, + sequences: &[SourceSequence], +) -> Result { + let actionable: Vec<&SourceSequence> = sequences + .iter() + .filter(|s| { + if s.last_value.is_none() { + debug!( + schema = %s.schema, + name = %s.name, + "skipping sequence: never advanced on source", + ); + } + s.last_value.is_some() + }) + .collect(); + + if actionable.is_empty() { + return Ok(0); + } + + if actionable.len() == 1 { + return apply_single(target, actionable[0]).await; + } + + match apply_batch(target, &actionable).await { + Ok(applied) => Ok(applied), + Err(e) => { + warn!( + error = %e, + "batch sequence sync failed — falling back to individual statements" + ); + apply_individually(target, &actionable).await + } + } +} + +/// Build a PL/pgSQL `DO` block that applies all sequences in one round-trip. +/// Each setval is wrapped in a sub-block with exception handling so that a +/// failure on one sequence does not abort the rest. +/// +/// Public for unit tests. +pub fn build_batch_setval_sql(sequences: &[&SourceSequence]) -> Result { + let mut body = String::new(); + body.push_str("CREATE TEMP TABLE IF NOT EXISTS _seq_sync_result (applied int);\n"); + body.push_str("TRUNCATE _seq_sync_result;\n"); + body.push_str("DO $__pg_dbmigrator_seq_sync__$\nDECLARE\n _applied int := 0;\nBEGIN\n"); + for seq in sequences { + let last_value = seq.last_value.unwrap_or(0); + let qualified = format!( + "{}.{}", + pg_walstream::quote_ident(&seq.schema)?, + pg_walstream::quote_ident(&seq.name)?, + ); + let qualified_lit = pg_walstream::quote_literal(&qualified)?; + body.push_str(" BEGIN\n"); + body.push_str(&format!( + " PERFORM setval({qualified_lit}::regclass, {last_value}::bigint, true);\n" + )); + body.push_str(" _applied := _applied + 1;\n"); + body.push_str(" EXCEPTION WHEN OTHERS THEN\n"); + body.push_str(&format!( + " RAISE WARNING 'setval failed for {}: %', SQLERRM;\n", + qualified.replace('\'', "''").replace('%', "%%") + )); + body.push_str(" END;\n"); + } + body.push_str(" INSERT INTO _seq_sync_result VALUES (_applied);\n"); + body.push_str("END;\n$__pg_dbmigrator_seq_sync__$;"); + Ok(body) +} + +/// Execute the batched PL/pgSQL block and parse the applied count from the +/// RAISE NOTICE output. +async fn apply_batch(target: &dyn SeqSyncTarget, sequences: &[&SourceSequence]) -> Result { + let sql = build_batch_setval_sql(sequences)?; + target.batch_execute_sql(&sql).await?; + let applied = target.query_batch_applied().await?; for seq in sequences { - let Some(last_value) = seq.last_value else { - debug!( + debug!(schema = %seq.schema, name = %seq.name, "synced sequence (batch)"); + } + Ok(applied as usize) +} + +/// Apply a single sequence (used when there's only one). +async fn apply_single(target: &dyn SeqSyncTarget, seq: &SourceSequence) -> Result { + let last_value = seq.last_value.unwrap_or(0); + let sql = build_setval_sql(&seq.schema, &seq.name)?; + match target.execute_setval(&sql, last_value).await { + Ok(_) => { + debug!(schema = %seq.schema, name = %seq.name, last_value, "synced sequence"); + Ok(1) + } + Err(e) => { + warn!( schema = %seq.schema, name = %seq.name, - "skipping sequence: never advanced on source", + error = %e, + "failed to sync sequence (continuing)" ); - continue; - }; + Ok(0) + } + } +} + +/// Sequential per-statement fallback. +async fn apply_individually( + target: &dyn SeqSyncTarget, + sequences: &[&SourceSequence], +) -> Result { + let mut applied = 0usize; + for seq in sequences { + let last_value = seq.last_value.unwrap_or(0); let sql = build_setval_sql(&seq.schema, &seq.name)?; - match target.execute(&sql, &[&last_value]).await { + match target.execute_setval(&sql, last_value).await { Ok(_) => { applied += 1; debug!(schema = %seq.schema, name = %seq.name, last_value, "synced sequence"); @@ -314,4 +455,230 @@ mod tests { fn collect_sql_with_schema_filter_uses_any_operator() { assert!(COLLECT_SEQUENCES_SQL_WITH_SCHEMA_FILTER.contains("ANY($1::text[])")); } + + #[test] + fn build_batch_setval_sql_produces_valid_plpgsql() { + let seqs = [ + SourceSequence { + schema: "public".into(), + name: "users_id_seq".into(), + last_value: Some(42), + }, + SourceSequence { + schema: "public".into(), + name: "orders_id_seq".into(), + last_value: Some(100), + }, + ]; + let refs: Vec<&SourceSequence> = seqs.iter().collect(); + let sql = build_batch_setval_sql(&refs).unwrap(); + assert!(sql.contains("CREATE TEMP TABLE IF NOT EXISTS _seq_sync_result")); + assert!(!sql.contains("ON COMMIT DROP")); + assert!(sql.contains("DO $__pg_dbmigrator_seq_sync__$")); + assert!(sql.ends_with("$__pg_dbmigrator_seq_sync__$;")); + assert!(sql.contains("PERFORM setval")); + assert!(sql.contains("42::bigint")); + assert!(sql.contains("100::bigint")); + assert!(sql.contains("EXCEPTION WHEN OTHERS")); + assert!(sql.contains("_applied := _applied + 1")); + assert!(sql.contains("INSERT INTO _seq_sync_result VALUES (_applied)")); + } + + #[test] + fn build_batch_setval_sql_escapes_special_chars() { + let seqs = [SourceSequence { + schema: "my\"schema".into(), + name: "o'reilly_seq".into(), + last_value: Some(7), + }]; + let refs: Vec<&SourceSequence> = seqs.iter().collect(); + let sql = build_batch_setval_sql(&refs).unwrap(); + assert!(sql.contains("\"my\"\"schema\"")); + assert!(sql.contains("7::bigint")); + } + + #[test] + fn build_batch_setval_sql_escapes_percent_in_raise_warning() { + let seqs = [SourceSequence { + schema: "public".into(), + name: "pct%seq".into(), + last_value: Some(1), + }]; + let refs: Vec<&SourceSequence> = seqs.iter().collect(); + let sql = build_batch_setval_sql(&refs).unwrap(); + assert!( + sql.contains("%%"), + "percent signs in identifiers must be doubled for RAISE WARNING" + ); + } + + #[test] + fn build_batch_setval_sql_empty_input() { + let refs: Vec<&SourceSequence> = vec![]; + let sql = build_batch_setval_sql(&refs).unwrap(); + assert!(sql.contains("DO $__pg_dbmigrator_seq_sync__$")); + assert!(!sql.contains("PERFORM setval")); + assert!(sql.contains("INSERT INTO _seq_sync_result VALUES (_applied)")); + } + + // ── Mock-based async tests ────────────────────────────────────────────── + + use crate::error::MigrationError; + use std::collections::VecDeque; + use std::sync::Mutex; + + struct MockTarget { + setval_results: Mutex>>, + batch_exec_result: Mutex>>, + batch_applied: Mutex>>, + } + + impl MockTarget { + fn ok(applied_count: i32) -> Self { + Self { + setval_results: Mutex::new(VecDeque::new()), + batch_exec_result: Mutex::new(Some(Ok(()))), + batch_applied: Mutex::new(Some(Ok(applied_count))), + } + } + + fn batch_fails() -> Self { + Self { + setval_results: Mutex::new(VecDeque::new()), + batch_exec_result: Mutex::new(Some(Err(MigrationError::config( + "batch not supported", + )))), + batch_applied: Mutex::new(None), + } + } + + fn with_setval_results(mut self, results: Vec>) -> Self { + self.setval_results = Mutex::new(results.into()); + self + } + } + + #[async_trait] + impl SeqSyncTarget for MockTarget { + async fn execute_setval(&self, _sql: &str, _last_value: i64) -> Result { + self.setval_results + .lock() + .unwrap() + .pop_front() + .unwrap_or(Ok(1)) + } + async fn batch_execute_sql(&self, _sql: &str) -> Result<()> { + self.batch_exec_result + .lock() + .unwrap() + .take() + .unwrap_or(Ok(())) + } + async fn query_batch_applied(&self) -> Result { + self.batch_applied.lock().unwrap().take().unwrap_or(Ok(0)) + } + } + + fn seq(schema: &str, name: &str, val: Option) -> SourceSequence { + SourceSequence { + schema: schema.into(), + name: name.into(), + last_value: val, + } + } + + #[tokio::test] + async fn apply_all_none_last_value_returns_zero() { + let target = MockTarget::ok(0); + let seqs = vec![seq("public", "s1", None), seq("public", "s2", None)]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 0); + } + + #[tokio::test] + async fn apply_empty_sequences_returns_zero() { + let target = MockTarget::ok(0); + let applied = apply_sequences_impl(&target, &[]).await.unwrap(); + assert_eq!(applied, 0); + } + + #[tokio::test] + async fn apply_single_sequence_success() { + let target = MockTarget::ok(0).with_setval_results(vec![Ok(1)]); + let seqs = vec![seq("public", "users_id_seq", Some(42))]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 1); + } + + #[tokio::test] + async fn apply_single_sequence_failure_returns_zero() { + let target = MockTarget::ok(0) + .with_setval_results(vec![Err(MigrationError::config("permission denied"))]); + let seqs = vec![seq("public", "users_id_seq", Some(42))]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 0); + } + + #[tokio::test] + async fn apply_batch_success() { + let target = MockTarget::ok(2); + let seqs = vec![seq("public", "s1", Some(10)), seq("public", "s2", Some(20))]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 2); + } + + #[tokio::test] + async fn apply_batch_reports_partial_success() { + let target = MockTarget::ok(1); + let seqs = vec![seq("public", "s1", Some(10)), seq("public", "s2", Some(20))]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 1); + } + + #[tokio::test] + async fn apply_batch_failure_falls_back_to_individual() { + let target = MockTarget::batch_fails().with_setval_results(vec![Ok(1), Ok(1)]); + let seqs = vec![seq("public", "s1", Some(10)), seq("public", "s2", Some(20))]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 2); + } + + #[tokio::test] + async fn apply_individually_mixed_results() { + let target = MockTarget::batch_fails().with_setval_results(vec![ + Ok(1), + Err(MigrationError::config("fail")), + Ok(1), + ]); + let seqs = vec![ + seq("public", "s1", Some(1)), + seq("public", "s2", Some(2)), + seq("public", "s3", Some(3)), + ]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 2); + } + + #[tokio::test] + async fn apply_filters_none_and_routes_remaining() { + let target = MockTarget::ok(0).with_setval_results(vec![Ok(1)]); + let seqs = vec![ + seq("public", "never_used", None), + seq("public", "used_seq", Some(99)), + ]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 1); + } + + #[tokio::test] + async fn apply_filters_none_and_routes_to_batch() { + let target = MockTarget::ok(2); + let seqs = vec![ + seq("public", "skip_me", None), + seq("public", "s1", Some(10)), + seq("public", "s2", Some(20)), + ]; + let applied = apply_sequences_impl(&target, &seqs).await.unwrap(); + assert_eq!(applied, 2); + } } diff --git a/pg_dbmigrator/tests/postgres_integration.rs b/pg_dbmigrator/tests/postgres_integration.rs index 04758ac..33ae949 100644 --- a/pg_dbmigrator/tests/postgres_integration.rs +++ b/pg_dbmigrator/tests/postgres_integration.rs @@ -796,10 +796,17 @@ async fn ensure_publication_excludes_tables_when_no_includes() { ); // Verify skip_me is not in the publication's table list. + // Use pg_publication_rel JOIN pg_class instead of pg_publication_tables + // because the latter calls relation_open() on every OID and crashes if + // a concurrently-running test dropped a table after the publication was + // created. let skip_rows = client .query( - "SELECT schemaname, tablename FROM pg_publication_tables \ - WHERE pubname = 'integ_excl_pub' AND tablename = 'skip_me'", + "SELECT c.relname FROM pg_publication_rel pr \ + JOIN pg_class c ON c.oid = pr.prrelid \ + JOIN pg_namespace n ON n.oid = c.relnamespace \ + WHERE pr.prpubid = (SELECT oid FROM pg_publication WHERE pubname = 'integ_excl_pub') \ + AND c.relname = 'skip_me'", &[], ) .await @@ -812,8 +819,11 @@ async fn ensure_publication_excludes_tables_when_no_includes() { // Verify keep_me IS in the publication. let keep_rows = client .query( - "SELECT schemaname, tablename FROM pg_publication_tables \ - WHERE pubname = 'integ_excl_pub' AND tablename = 'keep_me'", + "SELECT c.relname FROM pg_publication_rel pr \ + JOIN pg_class c ON c.oid = pr.prrelid \ + JOIN pg_namespace n ON n.oid = c.relnamespace \ + WHERE pr.prpubid = (SELECT oid FROM pg_publication WHERE pubname = 'integ_excl_pub') \ + AND c.relname = 'keep_me'", &[], ) .await