diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 96e906f..4c934d8 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -72,6 +72,7 @@ jobs: namespaces: >- test-persistent-schemas test-ps-conflict + test-ps-skip-missing needs_non_pg_snapshot: false needs_cnpg: false diff --git a/src/controllers/replica.rs b/src/controllers/replica.rs index 6bd50d1..8bbbc0d 100644 --- a/src/controllers/replica.rs +++ b/src/controllers/replica.rs @@ -894,6 +894,85 @@ pub async fn reconcile(replica: Arc, ctx: Arc) } } + // Check consecutive failures before the in-progress restore early return, + // so that suspension takes priority and the phase doesn't get stuck at + // Restoring when all restores are failing. + let consecutive_failures = replica + .status + .as_ref() + .and_then(|s| s.consecutive_restore_failures) + .unwrap_or(0); + + const MAX_CONSECUTIVE_FAILURES: u32 = 3; + if consecutive_failures >= MAX_CONSECUTIVE_FAILURES { + if active_restore.is_some() { + replica.update_phase(client, ReplicaPhase::Ready).await?; + } else { + replica.update_phase(client, ReplicaPhase::Pending).await?; + } + + let already_suspended = replica + .status + .as_ref() + .and_then(|s| { + s.conditions + .iter() + .find(|c| c.type_ == "RestoreSchedulingSuspended") + }) + .is_some_and(|c| c.status == "True"); + + if !already_suspended { + warn!( + replica = name, + consecutive_failures, + "restore scheduling suspended after {MAX_CONSECUTIVE_FAILURES} consecutive failures" + ); + replica + .update_condition( + client, + "RestoreSchedulingSuspended", + "True", + "ConsecutiveFailures", + &format!( + "Scheduling suspended after {consecutive_failures} consecutive restore failures. \ + Fix the underlying issue and reset with: kubectl patch postgresphysicalreplica {name} \ + -n {namespace} --subresource=status --type=merge \ + -p '{{\"status\":{{\"consecutiveRestoreFailures\":0}}}}'" + ), + ) + .await?; + } + return Ok(Action::requeue(Duration::from_secs(300))); + } + + // Clear RestoreSchedulingSuspended if it was previously set but the + // counter has since been reset below the threshold (e.g. manual patch). + if replica + .status + .as_ref() + .and_then(|s| { + s.conditions + .iter() + .find(|c| c.type_ == "RestoreSchedulingSuspended") + }) + .is_some_and(|c| c.status == "True") + { + info!( + replica = name, + consecutive_failures, + "clearing RestoreSchedulingSuspended condition (counter below threshold)" + ); + replica + .update_condition( + client, + "RestoreSchedulingSuspended", + "False", + "CounterReset", + "Consecutive failure counter reset below threshold", + ) + .await?; + } + // Decide whether to trigger a new restore if let Some(in_progress) = in_progress_restore { let phase = in_progress.status.as_ref().and_then(|s| s.phase.as_ref()); @@ -972,84 +1051,6 @@ pub async fn reconcile(replica: Arc, ctx: Arc) let schedule_decision = replica.check_schedule(); - let consecutive_failures = replica - .status - .as_ref() - .and_then(|s| s.consecutive_restore_failures) - .unwrap_or(0); - - const MAX_CONSECUTIVE_FAILURES: u32 = 3; - if consecutive_failures >= MAX_CONSECUTIVE_FAILURES { - // Update phase even while suspended so it doesn't stay stuck at - // "Restoring" after all in-progress restores have failed. - if active_restore.is_some() { - replica.update_phase(client, ReplicaPhase::Ready).await?; - } else { - replica.update_phase(client, ReplicaPhase::Pending).await?; - } - - let already_suspended = replica - .status - .as_ref() - .and_then(|s| { - s.conditions - .iter() - .find(|c| c.type_ == "RestoreSchedulingSuspended") - }) - .is_some_and(|c| c.status == "True"); - - if !already_suspended { - warn!( - replica = name, - consecutive_failures, - "restore scheduling suspended after {MAX_CONSECUTIVE_FAILURES} consecutive failures" - ); - replica - .update_condition( - client, - "RestoreSchedulingSuspended", - "True", - "ConsecutiveFailures", - &format!( - "Scheduling suspended after {consecutive_failures} consecutive restore failures. \ - Fix the underlying issue and reset with: kubectl patch postgresphysicalreplica {name} \ - -n {namespace} --subresource=status --type=merge \ - -p '{{\"status\":{{\"consecutiveRestoreFailures\":0}}}}'" - ), - ) - .await?; - } - return Ok(Action::requeue(Duration::from_secs(300))); - } - - // Clear RestoreSchedulingSuspended if it was previously set but the - // counter has since been reset below the threshold (e.g. manual patch). - if replica - .status - .as_ref() - .and_then(|s| { - s.conditions - .iter() - .find(|c| c.type_ == "RestoreSchedulingSuspended") - }) - .is_some_and(|c| c.status == "True") - { - info!( - replica = name, - consecutive_failures, - "clearing RestoreSchedulingSuspended condition (counter below threshold)" - ); - replica - .update_condition( - client, - "RestoreSchedulingSuspended", - "False", - "CounterReset", - "Consecutive failure counter reset below threshold", - ) - .await?; - } - let should_restore = never_restored || active_restore_deleted || matches!(schedule_decision, ScheduleDecision::Trigger); @@ -1295,6 +1296,41 @@ async fn reconcile_schema_migration( ) .await?; + // Filter out schemas that don't exist on the source. This happens when the + // user adds a schema to persistent_schemas before actually creating it. + let all_schemas: &[String] = schemas; + let schemas = query_existing_schemas( + client, + namespace, + &old_restore_name, + &source_dbname, + &reader_user, + &reader_password, + all_schemas, + ctx.use_port_forward(), + ) + .await?; + let existing_set: HashSet<&str> = schemas.iter().map(String::as_str).collect(); + let skipped: Vec<&String> = all_schemas + .iter() + .filter(|s| !existing_set.contains(s.as_str())) + .collect(); + if !skipped.is_empty() { + warn!( + restore = %old_restore_name, + skipped = ?skipped, + "persistent schemas not found on source, skipping" + ); + } + + if schemas.is_empty() { + info!( + replica = %replica_name, + "no persistent schemas exist on source, skipping migration" + ); + return Ok(true); + } + // Measure the actual on-disk database size of the source restore and // compute how much the persistent schemas have grown beyond the original // snapshot. This delta is stored in the replica status so the next @@ -1357,14 +1393,14 @@ async fn reconcile_schema_migration( // Check that none of the persistent schemas already exist in the snapshot. // If they do, the pg_dump|psql migration would conflict, so we must fail // the restore instead of attempting migration. - let conflicting = check_snapshot_schema_conflicts( + let conflicting = query_existing_schemas( client, namespace, &new_restore_name, &target_dbname, &reader_user, &reader_password, - schemas, + &schemas, ctx.use_port_forward(), ) .await?; @@ -1452,7 +1488,7 @@ async fn reconcile_schema_migration( &new_restore_name, &source_dbname, &target_dbname, - schemas, + &schemas, &reader_secret_name, &target_superuser_secret_name, &callback_url, @@ -1480,14 +1516,15 @@ async fn reconcile_schema_migration( Ok(false) // Job created, not complete yet } -/// Check whether any of the persistent schemas already exist in the snapshot -/// (i.e. in the new restore database before migration). Returns the list of -/// schema names that were found. +/// Query which of the requested schemas actually exist in a restore's database. +/// Returns schemas in the same order as the input slice, using the system +/// catalog (`pg_catalog.pg_namespace`) which is always fully visible regardless +/// of schema-level privileges. #[expect( clippy::too_many_arguments, reason = "internal helper with tightly-coupled params" )] -async fn check_snapshot_schema_conflicts( +async fn query_existing_schemas( client: &Client, namespace: &str, restore_name: &str, @@ -1511,14 +1548,17 @@ async fn check_snapshot_schema_conflicts( let rows = conn .client .query( - "SELECT schema_name FROM information_schema.schemata \ - WHERE schema_name = ANY($1)", + "SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname = ANY($1)", &[&schemas], ) .await?; - let found: Vec = rows.iter().map(|r| r.get(0)).collect(); - Ok(found) + let found: HashSet = rows.iter().map(|r| r.get::<_, String>(0)).collect(); + Ok(schemas + .iter() + .filter(|s| found.contains(s.as_str())) + .cloned() + .collect()) } pub fn error_policy( diff --git a/tests/helpers.rs b/tests/helpers.rs index da7aa80..d9ee7e3 100644 --- a/tests/helpers.rs +++ b/tests/helpers.rs @@ -6,11 +6,12 @@ use std::{collections::BTreeMap, time::Duration}; use jiff::Span; + use k8s_openapi::{ ByteString, api::{ batch::v1::Job, - core::v1::{Secret, SecretReference}, + core::v1::{LocalObjectReference, Secret, SecretReference}, }, apimachinery::pkg::api::resource::Quantity, }; @@ -21,7 +22,7 @@ use kube::{ use postgres_restore_operator::{ types::{ OverlayDatabaseConfig, PostgresPhysicalReplica, PostgresPhysicalReplicaSpec, - PostgresPhysicalRestore, ReplicaPhase, RestorePhase, + PostgresPhysicalRestore, PostgresPhysicalRestoreSpec, ReplicaPhase, RestorePhase, }, util::TimeSpan, }; @@ -325,3 +326,30 @@ pub async fn wait_for_pod_ready(ns: &str, pod: &str, timeout_dur: Duration) { .await .unwrap_or_else(|_| panic!("timed out waiting for pod {pod} to be ready in namespace {ns}")); } + +pub fn build_second_restore( + name: &str, + ns: &str, + first_restore: &PostgresPhysicalRestore, + replica: &PostgresPhysicalReplica, +) -> PostgresPhysicalRestore { + let mut restore = PostgresPhysicalRestore::new( + name, + PostgresPhysicalRestoreSpec { + replica: LocalObjectReference { + name: replica.name_any(), + }, + snapshot: first_restore.spec.snapshot.clone(), + snapshot_size: first_restore.spec.snapshot_size.clone(), + snapshot_time: None, + storage_size: first_restore.spec.storage_size.clone(), + }, + ); + restore.metadata.namespace = Some(ns.to_string()); + restore.metadata.labels = Some(BTreeMap::from([( + "pgro.bes.au/replica".to_string(), + replica.name_any(), + )])); + restore.metadata.owner_references = Some(vec![replica.owner_reference()]); + restore +} diff --git a/tests/persistent_schemas.rs b/tests/persistent_schemas.rs index ba79f3b..c055184 100644 --- a/tests/persistent_schemas.rs +++ b/tests/persistent_schemas.rs @@ -1,5 +1,5 @@ use k8s_openapi::api::core::v1::{LocalObjectReference, Secret}; -use kube::{Api, ResourceExt, api::PostParams}; +use kube::{Api, ResourceExt as _, api::PostParams}; use postgres_restore_operator::types::{ PostgresPhysicalReplica, PostgresPhysicalRestore, PostgresPhysicalRestoreSpec, ReplicaPhase, RestorePhase, @@ -100,69 +100,24 @@ async fn persistent_schemas_migration() { "expected 42 rows in persistent_data schema on first restore" ); - // Capture snapshot details from the first restore to create a second one let first_restore_obj = restores .get(&first_restore_name) .await .expect("failed to get first restore"); - let snapshot_id = first_restore_obj.spec.snapshot.clone(); - let snapshot_size = first_restore_obj.spec.snapshot_size.clone(); - let storage_size = first_restore_obj.spec.storage_size.clone(); - let replica_obj = replicas .get(replica_name) .await .expect("failed to get replica"); - let replica_uid = replica_obj.uid().expect("replica has no UID"); // Manually create a second restore from the same snapshot to trigger switchover let second_restore_name = format!("{replica_name}-second"); println!("--- creating second restore manually: {second_restore_name}"); - let second_restore = PostgresPhysicalRestore::new( - &second_restore_name, - PostgresPhysicalRestoreSpec { - replica: LocalObjectReference { - name: replica_name.into(), - }, - snapshot: snapshot_id, - snapshot_size, - snapshot_time: None, - storage_size, - }, - ); - - let mut restore_value = serde_json::to_value(&second_restore).unwrap(); - if let Some(meta) = restore_value - .as_object_mut() - .and_then(|o| o.get_mut("metadata")) - .and_then(|m| m.as_object_mut()) - { - meta.insert( - "namespace".to_string(), - serde_json::Value::String(ns.to_string()), - ); - meta.insert( - "labels".to_string(), - serde_json::json!({ "pgro.bes.au/replica": replica_name }), - ); - meta.insert( - "ownerReferences".to_string(), - serde_json::json!([{ - "apiVersion": "pgro.bes.au/v1alpha1", - "kind": "PostgresPhysicalReplica", - "name": replica_name, - "uid": replica_uid, - "controller": true, - "blockOwnerDeletion": true, - }]), - ); - } - - let second_restore_resource: PostgresPhysicalRestore = - serde_json::from_value(restore_value).unwrap(); restores - .create(&PostParams::default(), &second_restore_resource) + .create( + &PostParams::default(), + &build_second_restore(&second_restore_name, ns, &first_restore_obj, &replica_obj), + ) .await .expect("failed to create second restore"); @@ -342,7 +297,179 @@ async fn persistent_schemas_conflict_fails_restore() { wait_for_replica_phase(&replicas, replica_name, ReplicaPhase::Ready, PHASE_TIMEOUT).await; println!("--- first restore active: {first_restore_name}"); - // Capture snapshot details from the first restore to create a second one + let first_restore_obj = restores + .get(&first_restore_name) + .await + .expect("failed to get first restore"); + let replica_obj = replicas + .get(replica_name) + .await + .expect("failed to get replica"); + + // Manually create a second restore from the same snapshot to trigger switchover. + // The "public" schema will already exist in this snapshot, so migration should + // be blocked and the restore should fail. + let second_restore_name = format!("{replica_name}-conflict"); + println!("--- creating second restore: {second_restore_name}"); + + restores + .create( + &PostParams::default(), + &build_second_restore(&second_restore_name, ns, &first_restore_obj, &replica_obj), + ) + .await + .expect("failed to create second restore"); + + println!("--- waiting for second restore to reach Failed phase (schema conflict)"); + timeout(LONG_PHASE_TIMEOUT, async { + loop { + if let Ok(restore) = restores.get(&second_restore_name).await { + let phase = restore.status.as_ref().and_then(|s| s.phase.as_ref()); + println!("[{second_restore_name}] phase: {phase:?}"); + if phase == Some(&RestorePhase::Failed) { + return; + } + } + sleep(POLL_INTERVAL).await; + } + }) + .await + .expect("timed out waiting for second restore to fail due to schema conflict"); + + println!("--- verifying first restore is still Active"); + let first_restore_after = restores + .get(&first_restore_name) + .await + .expect("failed to get first restore after conflict"); + assert_eq!( + first_restore_after + .status + .as_ref() + .and_then(|s| s.phase.as_ref()), + Some(&RestorePhase::Active), + "first restore should remain Active after schema conflict" + ); + + println!("--- verifying replica still points to first restore"); + let replica_after = replicas + .get(replica_name) + .await + .expect("failed to get replica after conflict"); + let status = replica_after + .status + .as_ref() + .expect("replica has no status"); + assert_eq!( + status.current_restore.as_deref(), + Some(first_restore_name.as_str()), + "currentRestore should still be the first restore" + ); + + println!("--- verifying consecutiveRestoreFailures was incremented"); + assert!( + status.consecutive_restore_failures.unwrap_or(0) >= 1, + "consecutiveRestoreFailures should be at least 1 after schema conflict" + ); + + println!("--- persistent schema conflict correctly prevented switchover, cleaning up"); + cleanup_namespace(&client, ns, &[replica_name]).await; +} + +/// When `persistent_schemas` includes a schema that was never created on the +/// source restore, migration should skip it and still succeed. +#[tokio::test] +#[ignore = "requires a running Kubernetes cluster with MinIO and kopia"] +async fn persistent_schemas_skip_missing_on_source() { + let client = make_client().await; + let ns = "test-ps-skip-missing"; + let replica_name = "ps-skip-replica"; + + setup_namespace(&client, ns).await; + cleanup_namespace(&client, ns, &[replica_name]).await; + + let secrets: Api = Api::namespaced(client.clone(), ns); + let replicas: Api = Api::namespaced(client.clone(), ns); + let restores: Api = Api::namespaced(client.clone(), ns); + + println!("--- creating kopia secret"); + secrets + .create( + &PostParams::default(), + &build_kopia_secret(ns, "ps-skip-kopia-creds", "test-bucket"), + ) + .await + .expect("failed to create kopia secret"); + + // Configure persistent_schemas with one real schema and one that will + // never be created. The operator should skip the missing one. + println!( + "--- creating replica with persistent_schemas: [\"real_schema\", \"nonexistent_schema\"]" + ); + let mut replica = build_replica( + replica_name, + "ps-skip-kopia-creds", + ReplicaOpts { + read_only: false, + ..Default::default() + }, + ); + replica.spec.persistent_schemas = Some(vec![ + "real_schema".to_string(), + "nonexistent_schema".to_string(), + ]); + replica.metadata.namespace = Some(ns.into()); + replicas + .create(&PostParams::default(), &replica) + .await + .expect("failed to create replica"); + + println!("--- waiting for first restore to become Active"); + let first_restore_name = + wait_for_restore_phase(&restores, replica_name, RestorePhase::Active, PHASE_TIMEOUT).await; + wait_for_replica_phase(&replicas, replica_name, ReplicaPhase::Ready, PHASE_TIMEOUT).await; + println!("--- first restore active: {first_restore_name}"); + + let first_deploy = format!("deployment/{first_restore_name}"); + + // Only create real_schema; nonexistent_schema is intentionally never created. + println!("--- creating real_schema with data in the first restore"); + kubectl_exec( + ns, + &first_deploy, + &[ + "psql", + "-U", + "analytics", + "-d", + "myapp", + "-c", + "CREATE SCHEMA real_schema; \ + CREATE TABLE real_schema.items (id serial PRIMARY KEY, val text NOT NULL); \ + INSERT INTO real_schema.items (val) \ + SELECT 'item-' || i FROM generate_series(1, 10) AS i", + ], + ) + .await; + + println!("--- verifying 10 rows in real_schema on first restore"); + let count_out = kubectl_exec( + ns, + &first_deploy, + &[ + "psql", + "-U", + "analytics", + "-d", + "myapp", + "-t", + "-A", + "-c", + "SELECT COUNT(*) FROM real_schema.items", + ], + ) + .await; + assert_eq!(count_out.trim(), "10"); + let first_restore_obj = restores .get(&first_restore_name) .await @@ -357,10 +484,7 @@ async fn persistent_schemas_conflict_fails_restore() { .expect("failed to get replica"); let replica_uid = replica_obj.uid().expect("replica has no UID"); - // Manually create a second restore from the same snapshot to trigger switchover. - // The "public" schema will already exist in this snapshot, so migration should - // be blocked and the restore should fail. - let second_restore_name = format!("{replica_name}-conflict"); + let second_restore_name = format!("{replica_name}-second"); println!("--- creating second restore: {second_restore_name}"); let second_restore = PostgresPhysicalRestore::new( @@ -410,13 +534,32 @@ async fn persistent_schemas_conflict_fails_restore() { .await .expect("failed to create second restore"); - println!("--- waiting for second restore to reach Failed phase (schema conflict)"); + println!("--- waiting for schema migration to complete"); + timeout(LONG_PHASE_TIMEOUT, async { + loop { + if let Ok(replica) = replicas.get(replica_name).await { + let phase = replica + .status + .as_ref() + .and_then(|s| s.schema_migration_phase.as_deref()); + println!("[{replica_name}] schemaMigrationPhase: {phase:?}"); + if phase == Some("complete") { + return; + } + } + sleep(POLL_INTERVAL).await; + } + }) + .await + .expect("timed out waiting for schema migration to complete"); + + println!("--- waiting for second restore to become Active"); timeout(LONG_PHASE_TIMEOUT, async { loop { if let Ok(restore) = restores.get(&second_restore_name).await { let phase = restore.status.as_ref().and_then(|s| s.phase.as_ref()); println!("[{second_restore_name}] phase: {phase:?}"); - if phase == Some(&RestorePhase::Failed) { + if phase == Some(&RestorePhase::Active) { return; } } @@ -424,43 +567,70 @@ async fn persistent_schemas_conflict_fails_restore() { } }) .await - .expect("timed out waiting for second restore to fail due to schema conflict"); + .expect("timed out waiting for second restore to become Active"); - println!("--- verifying first restore is still Active"); - let first_restore_after = restores - .get(&first_restore_name) - .await - .expect("failed to get first restore after conflict"); + println!("--- verifying real_schema data was migrated to the second restore"); + let second_deploy = format!("deployment/{second_restore_name}"); + let migrated_count = kubectl_exec( + ns, + &second_deploy, + &[ + "psql", + "-U", + "analytics", + "-d", + "myapp", + "-t", + "-A", + "-c", + "SELECT COUNT(*) FROM real_schema.items", + ], + ) + .await; assert_eq!( - first_restore_after - .status - .as_ref() - .and_then(|s| s.phase.as_ref()), - Some(&RestorePhase::Active), - "first restore should remain Active after schema conflict" + migrated_count.trim(), + "10", + "expected 10 rows migrated into real_schema on second restore" ); - println!("--- verifying replica still points to first restore"); + println!("--- verifying nonexistent_schema does not exist on the second restore"); + let schema_exists = kubectl_exec( + ns, + &second_deploy, + &[ + "psql", + "-U", + "analytics", + "-d", + "myapp", + "-t", + "-A", + "-c", + "SELECT COUNT(*) FROM information_schema.schemata WHERE schema_name = 'nonexistent_schema'", + ], + ) + .await; + assert_eq!( + schema_exists.trim(), + "0", + "nonexistent_schema should not exist on the second restore" + ); + + println!("--- verifying replica is Ready after switchover"); let replica_after = replicas .get(replica_name) .await - .expect("failed to get replica after conflict"); + .expect("failed to get replica after switchover"); let status = replica_after .status .as_ref() .expect("replica has no status"); + assert_eq!(status.phase, Some(ReplicaPhase::Ready)); assert_eq!( status.current_restore.as_deref(), - Some(first_restore_name.as_str()), - "currentRestore should still be the first restore" - ); - - println!("--- verifying consecutiveRestoreFailures was incremented"); - assert!( - status.consecutive_restore_failures.unwrap_or(0) >= 1, - "consecutiveRestoreFailures should be at least 1 after schema conflict" + Some(second_restore_name.as_str()), ); - println!("--- persistent schema conflict correctly prevented switchover, cleaning up"); + println!("--- skip-missing-schema assertions passed, cleaning up"); cleanup_namespace(&client, ns, &[replica_name]).await; }