From a4ba57900fdcc882c04dcf9b22500ec4aadd22fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fe=CC=81lix=20Saparelli?= Date: Sat, 28 Feb 2026 18:37:50 +1300 Subject: [PATCH 1/4] feat: skip persistent schemas that don't exist on source during migration When the persistent_schemas list includes a schema that was never created on the source restore, the operator now queries the source database to filter the list before launching the migration job. Missing schemas are logged at warn level and silently skipped. If none of the listed schemas exist on the source, migration is skipped entirely and the switchover proceeds immediately. --- .github/workflows/integration.yml | 1 + src/controllers/replica.rs | 78 ++++++++- tests/persistent_schemas.rs | 260 ++++++++++++++++++++++++++++++ 3 files changed, 337 insertions(+), 2 deletions(-) diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 96e906f..4c934d8 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -72,6 +72,7 @@ jobs: namespaces: >- test-persistent-schemas test-ps-conflict + test-ps-skip-missing needs_non_pg_snapshot: false needs_cnpg: false diff --git a/src/controllers/replica.rs b/src/controllers/replica.rs index 6bd50d1..4d4e354 100644 --- a/src/controllers/replica.rs +++ b/src/controllers/replica.rs @@ -1295,6 +1295,28 @@ async fn reconcile_schema_migration( ) .await?; + // Filter out schemas that don't exist on the source. This happens when the + // user adds a schema to persistent_schemas before actually creating it. + let schemas = filter_existing_source_schemas( + client, + namespace, + &old_restore_name, + &source_dbname, + &reader_user, + &reader_password, + schemas, + ctx.use_port_forward(), + ) + .await?; + + if schemas.is_empty() { + info!( + replica = %replica_name, + "no persistent schemas exist on source, skipping migration" + ); + return Ok(true); + } + // Measure the actual on-disk database size of the source restore and // compute how much the persistent schemas have grown beyond the original // snapshot. This delta is stored in the replica status so the next @@ -1364,7 +1386,7 @@ async fn reconcile_schema_migration( &target_dbname, &reader_user, &reader_password, - schemas, + &schemas, ctx.use_port_forward(), ) .await?; @@ -1452,7 +1474,7 @@ async fn reconcile_schema_migration( &new_restore_name, &source_dbname, &target_dbname, - schemas, + &schemas, &reader_secret_name, &target_superuser_secret_name, &callback_url, @@ -1480,6 +1502,58 @@ async fn reconcile_schema_migration( Ok(false) // Job created, not complete yet } +/// Query the source restore and return only those schemas from the requested +/// list that actually exist. Schemas that are not present are logged and +/// silently skipped so that migration does not fail when the user has listed +/// a schema they have not yet created. +#[expect( + clippy::too_many_arguments, + reason = "internal helper with tightly-coupled params" +)] +async fn filter_existing_source_schemas( + client: &Client, + namespace: &str, + restore_name: &str, + dbname: &str, + user: &str, + password: &str, + schemas: &[String], + use_port_forward: bool, +) -> Result> { + let conn = overlay::connect::connect_to_restore( + client, + namespace, + restore_name, + dbname, + user, + password, + use_port_forward, + ) + .await?; + + let rows = conn + .client + .query( + "SELECT schema_name FROM information_schema.schemata \ + WHERE schema_name = ANY($1)", + &[&schemas], + ) + .await?; + + let existing: Vec = rows.iter().map(|r| r.get(0)).collect(); + + let skipped: Vec<&String> = schemas.iter().filter(|s| !existing.contains(s)).collect(); + if !skipped.is_empty() { + warn!( + restore = %restore_name, + skipped = ?skipped, + "persistent schemas not found on source, skipping" + ); + } + + Ok(existing) +} + /// Check whether any of the persistent schemas already exist in the snapshot /// (i.e. in the new restore database before migration). Returns the list of /// schema names that were found. diff --git a/tests/persistent_schemas.rs b/tests/persistent_schemas.rs index ba79f3b..6460937 100644 --- a/tests/persistent_schemas.rs +++ b/tests/persistent_schemas.rs @@ -464,3 +464,263 @@ async fn persistent_schemas_conflict_fails_restore() { println!("--- persistent schema conflict correctly prevented switchover, cleaning up"); cleanup_namespace(&client, ns, &[replica_name]).await; } + +/// When `persistent_schemas` includes a schema that was never created on the +/// source restore, migration should skip it and still succeed. +#[tokio::test] +#[ignore = "requires a running Kubernetes cluster with MinIO and kopia"] +async fn persistent_schemas_skip_missing_on_source() { + let client = make_client().await; + let ns = "test-ps-skip-missing"; + let replica_name = "ps-skip-replica"; + + setup_namespace(&client, ns).await; + cleanup_namespace(&client, ns, &[replica_name]).await; + + let secrets: Api = Api::namespaced(client.clone(), ns); + let replicas: Api = Api::namespaced(client.clone(), ns); + let restores: Api = Api::namespaced(client.clone(), ns); + + println!("--- creating kopia secret"); + secrets + .create( + &PostParams::default(), + &build_kopia_secret(ns, "ps-skip-kopia-creds", "test-bucket"), + ) + .await + .expect("failed to create kopia secret"); + + // Configure persistent_schemas with one real schema and one that will + // never be created. The operator should skip the missing one. + println!( + "--- creating replica with persistent_schemas: [\"real_schema\", \"nonexistent_schema\"]" + ); + let mut replica = build_replica( + replica_name, + "ps-skip-kopia-creds", + ReplicaOpts { + read_only: false, + ..Default::default() + }, + ); + replica.spec.persistent_schemas = Some(vec![ + "real_schema".to_string(), + "nonexistent_schema".to_string(), + ]); + replica.metadata.namespace = Some(ns.into()); + replicas + .create(&PostParams::default(), &replica) + .await + .expect("failed to create replica"); + + println!("--- waiting for first restore to become Active"); + let first_restore_name = + wait_for_restore_phase(&restores, replica_name, RestorePhase::Active, PHASE_TIMEOUT).await; + wait_for_replica_phase(&replicas, replica_name, ReplicaPhase::Ready, PHASE_TIMEOUT).await; + println!("--- first restore active: {first_restore_name}"); + + let first_deploy = format!("deployment/{first_restore_name}"); + + // Only create real_schema; nonexistent_schema is intentionally never created. + println!("--- creating real_schema with data in the first restore"); + kubectl_exec( + ns, + &first_deploy, + &[ + "psql", + "-U", + "analytics", + "-d", + "myapp", + "-c", + "CREATE SCHEMA real_schema; \ + CREATE TABLE real_schema.items (id serial PRIMARY KEY, val text NOT NULL); \ + INSERT INTO real_schema.items (val) \ + SELECT 'item-' || i FROM generate_series(1, 10) AS i", + ], + ) + .await; + + println!("--- verifying 10 rows in real_schema on first restore"); + let count_out = kubectl_exec( + ns, + &first_deploy, + &[ + "psql", + "-U", + "analytics", + "-d", + "myapp", + "-t", + "-A", + "-c", + "SELECT COUNT(*) FROM real_schema.items", + ], + ) + .await; + assert_eq!(count_out.trim(), "10"); + + let first_restore_obj = restores + .get(&first_restore_name) + .await + .expect("failed to get first restore"); + let snapshot_id = first_restore_obj.spec.snapshot.clone(); + let snapshot_size = first_restore_obj.spec.snapshot_size.clone(); + let storage_size = first_restore_obj.spec.storage_size.clone(); + + let replica_obj = replicas + .get(replica_name) + .await + .expect("failed to get replica"); + let replica_uid = replica_obj.uid().expect("replica has no UID"); + + let second_restore_name = format!("{replica_name}-second"); + println!("--- creating second restore: {second_restore_name}"); + + let second_restore = PostgresPhysicalRestore::new( + &second_restore_name, + PostgresPhysicalRestoreSpec { + replica: LocalObjectReference { + name: replica_name.into(), + }, + snapshot: snapshot_id, + snapshot_size, + snapshot_time: None, + storage_size, + }, + ); + + let mut restore_value = serde_json::to_value(&second_restore).unwrap(); + if let Some(meta) = restore_value + .as_object_mut() + .and_then(|o| o.get_mut("metadata")) + .and_then(|m| m.as_object_mut()) + { + meta.insert( + "namespace".to_string(), + serde_json::Value::String(ns.to_string()), + ); + meta.insert( + "labels".to_string(), + serde_json::json!({ "pgro.bes.au/replica": replica_name }), + ); + meta.insert( + "ownerReferences".to_string(), + serde_json::json!([{ + "apiVersion": "pgro.bes.au/v1alpha1", + "kind": "PostgresPhysicalReplica", + "name": replica_name, + "uid": replica_uid, + "controller": true, + "blockOwnerDeletion": true, + }]), + ); + } + + let second_restore_resource: PostgresPhysicalRestore = + serde_json::from_value(restore_value).unwrap(); + restores + .create(&PostParams::default(), &second_restore_resource) + .await + .expect("failed to create second restore"); + + println!("--- waiting for schema migration to complete"); + timeout(LONG_PHASE_TIMEOUT, async { + loop { + if let Ok(replica) = replicas.get(replica_name).await { + let phase = replica + .status + .as_ref() + .and_then(|s| s.schema_migration_phase.as_deref()); + println!("[{replica_name}] schemaMigrationPhase: {phase:?}"); + if phase == Some("complete") { + return; + } + } + sleep(POLL_INTERVAL).await; + } + }) + .await + .expect("timed out waiting for schema migration to complete"); + + println!("--- waiting for second restore to become Active"); + timeout(LONG_PHASE_TIMEOUT, async { + loop { + if let Ok(restore) = restores.get(&second_restore_name).await { + let phase = restore.status.as_ref().and_then(|s| s.phase.as_ref()); + println!("[{second_restore_name}] phase: {phase:?}"); + if phase == Some(&RestorePhase::Active) { + return; + } + } + sleep(POLL_INTERVAL).await; + } + }) + .await + .expect("timed out waiting for second restore to become Active"); + + println!("--- verifying real_schema data was migrated to the second restore"); + let second_deploy = format!("deployment/{second_restore_name}"); + let migrated_count = kubectl_exec( + ns, + &second_deploy, + &[ + "psql", + "-U", + "analytics", + "-d", + "myapp", + "-t", + "-A", + "-c", + "SELECT COUNT(*) FROM real_schema.items", + ], + ) + .await; + assert_eq!( + migrated_count.trim(), + "10", + "expected 10 rows migrated into real_schema on second restore" + ); + + println!("--- verifying nonexistent_schema does not exist on the second restore"); + let schema_exists = kubectl_exec( + ns, + &second_deploy, + &[ + "psql", + "-U", + "analytics", + "-d", + "myapp", + "-t", + "-A", + "-c", + "SELECT COUNT(*) FROM information_schema.schemata WHERE schema_name = 'nonexistent_schema'", + ], + ) + .await; + assert_eq!( + schema_exists.trim(), + "0", + "nonexistent_schema should not exist on the second restore" + ); + + println!("--- verifying replica is Ready after switchover"); + let replica_after = replicas + .get(replica_name) + .await + .expect("failed to get replica after switchover"); + let status = replica_after + .status + .as_ref() + .expect("replica has no status"); + assert_eq!(status.phase, Some(ReplicaPhase::Ready)); + assert_eq!( + status.current_restore.as_deref(), + Some(second_restore_name.as_str()), + ); + + println!("--- skip-missing-schema assertions passed, cleaning up"); + cleanup_namespace(&client, ns, &[replica_name]).await; +} From 20dc87d6d0add8c90c9c02e165e8fe1651e70541 Mon Sep 17 00:00:00 2001 From: "review-hero[bot]" <2896273+review-hero[bot]@users.noreply.github.com> Date: Sat, 28 Feb 2026 06:24:04 +0000 Subject: [PATCH 2/4] fix: auto-fix review suggestions --- src/controllers/replica.rs | 91 +++++++++------------------- tests/helpers.rs | 32 +++++++++- tests/persistent_schemas.rs | 115 ++++-------------------------------- 3 files changed, 69 insertions(+), 169 deletions(-) diff --git a/src/controllers/replica.rs b/src/controllers/replica.rs index 4d4e354..8e555ce 100644 --- a/src/controllers/replica.rs +++ b/src/controllers/replica.rs @@ -1297,17 +1297,30 @@ async fn reconcile_schema_migration( // Filter out schemas that don't exist on the source. This happens when the // user adds a schema to persistent_schemas before actually creating it. - let schemas = filter_existing_source_schemas( + let all_schemas: &[String] = schemas; + let schemas = query_existing_schemas( client, namespace, &old_restore_name, &source_dbname, &reader_user, &reader_password, - schemas, + all_schemas, ctx.use_port_forward(), ) .await?; + let existing_set: HashSet<&str> = schemas.iter().map(String::as_str).collect(); + let skipped: Vec<&String> = all_schemas + .iter() + .filter(|s| !existing_set.contains(s.as_str())) + .collect(); + if !skipped.is_empty() { + warn!( + restore = %old_restore_name, + skipped = ?skipped, + "persistent schemas not found on source, skipping" + ); + } if schemas.is_empty() { info!( @@ -1379,7 +1392,7 @@ async fn reconcile_schema_migration( // Check that none of the persistent schemas already exist in the snapshot. // If they do, the pg_dump|psql migration would conflict, so we must fail // the restore instead of attempting migration. - let conflicting = check_snapshot_schema_conflicts( + let conflicting = query_existing_schemas( client, namespace, &new_restore_name, @@ -1502,66 +1515,15 @@ async fn reconcile_schema_migration( Ok(false) // Job created, not complete yet } -/// Query the source restore and return only those schemas from the requested -/// list that actually exist. Schemas that are not present are logged and -/// silently skipped so that migration does not fail when the user has listed -/// a schema they have not yet created. -#[expect( - clippy::too_many_arguments, - reason = "internal helper with tightly-coupled params" -)] -async fn filter_existing_source_schemas( - client: &Client, - namespace: &str, - restore_name: &str, - dbname: &str, - user: &str, - password: &str, - schemas: &[String], - use_port_forward: bool, -) -> Result> { - let conn = overlay::connect::connect_to_restore( - client, - namespace, - restore_name, - dbname, - user, - password, - use_port_forward, - ) - .await?; - - let rows = conn - .client - .query( - "SELECT schema_name FROM information_schema.schemata \ - WHERE schema_name = ANY($1)", - &[&schemas], - ) - .await?; - - let existing: Vec = rows.iter().map(|r| r.get(0)).collect(); - - let skipped: Vec<&String> = schemas.iter().filter(|s| !existing.contains(s)).collect(); - if !skipped.is_empty() { - warn!( - restore = %restore_name, - skipped = ?skipped, - "persistent schemas not found on source, skipping" - ); - } - - Ok(existing) -} - -/// Check whether any of the persistent schemas already exist in the snapshot -/// (i.e. in the new restore database before migration). Returns the list of -/// schema names that were found. +/// Query which of the requested schemas actually exist in a restore's database. +/// Returns schemas in the same order as the input slice, using the system +/// catalog (`pg_catalog.pg_namespace`) which is always fully visible regardless +/// of schema-level privileges. #[expect( clippy::too_many_arguments, reason = "internal helper with tightly-coupled params" )] -async fn check_snapshot_schema_conflicts( +async fn query_existing_schemas( client: &Client, namespace: &str, restore_name: &str, @@ -1585,14 +1547,17 @@ async fn check_snapshot_schema_conflicts( let rows = conn .client .query( - "SELECT schema_name FROM information_schema.schemata \ - WHERE schema_name = ANY($1)", + "SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname = ANY($1)", &[&schemas], ) .await?; - let found: Vec = rows.iter().map(|r| r.get(0)).collect(); - Ok(found) + let found: HashSet = rows.iter().map(|r| r.get::<_, String>(0)).collect(); + Ok(schemas + .iter() + .filter(|s| found.contains(s.as_str())) + .cloned() + .collect()) } pub fn error_policy( diff --git a/tests/helpers.rs b/tests/helpers.rs index da7aa80..d9ee7e3 100644 --- a/tests/helpers.rs +++ b/tests/helpers.rs @@ -6,11 +6,12 @@ use std::{collections::BTreeMap, time::Duration}; use jiff::Span; + use k8s_openapi::{ ByteString, api::{ batch::v1::Job, - core::v1::{Secret, SecretReference}, + core::v1::{LocalObjectReference, Secret, SecretReference}, }, apimachinery::pkg::api::resource::Quantity, }; @@ -21,7 +22,7 @@ use kube::{ use postgres_restore_operator::{ types::{ OverlayDatabaseConfig, PostgresPhysicalReplica, PostgresPhysicalReplicaSpec, - PostgresPhysicalRestore, ReplicaPhase, RestorePhase, + PostgresPhysicalRestore, PostgresPhysicalRestoreSpec, ReplicaPhase, RestorePhase, }, util::TimeSpan, }; @@ -325,3 +326,30 @@ pub async fn wait_for_pod_ready(ns: &str, pod: &str, timeout_dur: Duration) { .await .unwrap_or_else(|_| panic!("timed out waiting for pod {pod} to be ready in namespace {ns}")); } + +pub fn build_second_restore( + name: &str, + ns: &str, + first_restore: &PostgresPhysicalRestore, + replica: &PostgresPhysicalReplica, +) -> PostgresPhysicalRestore { + let mut restore = PostgresPhysicalRestore::new( + name, + PostgresPhysicalRestoreSpec { + replica: LocalObjectReference { + name: replica.name_any(), + }, + snapshot: first_restore.spec.snapshot.clone(), + snapshot_size: first_restore.spec.snapshot_size.clone(), + snapshot_time: None, + storage_size: first_restore.spec.storage_size.clone(), + }, + ); + restore.metadata.namespace = Some(ns.to_string()); + restore.metadata.labels = Some(BTreeMap::from([( + "pgro.bes.au/replica".to_string(), + replica.name_any(), + )])); + restore.metadata.owner_references = Some(vec![replica.owner_reference()]); + restore +} diff --git a/tests/persistent_schemas.rs b/tests/persistent_schemas.rs index 6460937..332684b 100644 --- a/tests/persistent_schemas.rs +++ b/tests/persistent_schemas.rs @@ -1,9 +1,6 @@ -use k8s_openapi::api::core::v1::{LocalObjectReference, Secret}; -use kube::{Api, ResourceExt, api::PostParams}; -use postgres_restore_operator::types::{ - PostgresPhysicalReplica, PostgresPhysicalRestore, PostgresPhysicalRestoreSpec, ReplicaPhase, - RestorePhase, -}; +use k8s_openapi::api::core::v1::Secret; +use kube::{Api, api::PostParams}; +use postgres_restore_operator::types::{PostgresPhysicalReplica, PostgresPhysicalRestore, ReplicaPhase, RestorePhase}; use tokio::time::{sleep, timeout}; use helpers::*; @@ -100,69 +97,24 @@ async fn persistent_schemas_migration() { "expected 42 rows in persistent_data schema on first restore" ); - // Capture snapshot details from the first restore to create a second one let first_restore_obj = restores .get(&first_restore_name) .await .expect("failed to get first restore"); - let snapshot_id = first_restore_obj.spec.snapshot.clone(); - let snapshot_size = first_restore_obj.spec.snapshot_size.clone(); - let storage_size = first_restore_obj.spec.storage_size.clone(); - let replica_obj = replicas .get(replica_name) .await .expect("failed to get replica"); - let replica_uid = replica_obj.uid().expect("replica has no UID"); // Manually create a second restore from the same snapshot to trigger switchover let second_restore_name = format!("{replica_name}-second"); println!("--- creating second restore manually: {second_restore_name}"); - let second_restore = PostgresPhysicalRestore::new( - &second_restore_name, - PostgresPhysicalRestoreSpec { - replica: LocalObjectReference { - name: replica_name.into(), - }, - snapshot: snapshot_id, - snapshot_size, - snapshot_time: None, - storage_size, - }, - ); - - let mut restore_value = serde_json::to_value(&second_restore).unwrap(); - if let Some(meta) = restore_value - .as_object_mut() - .and_then(|o| o.get_mut("metadata")) - .and_then(|m| m.as_object_mut()) - { - meta.insert( - "namespace".to_string(), - serde_json::Value::String(ns.to_string()), - ); - meta.insert( - "labels".to_string(), - serde_json::json!({ "pgro.bes.au/replica": replica_name }), - ); - meta.insert( - "ownerReferences".to_string(), - serde_json::json!([{ - "apiVersion": "pgro.bes.au/v1alpha1", - "kind": "PostgresPhysicalReplica", - "name": replica_name, - "uid": replica_uid, - "controller": true, - "blockOwnerDeletion": true, - }]), - ); - } - - let second_restore_resource: PostgresPhysicalRestore = - serde_json::from_value(restore_value).unwrap(); restores - .create(&PostParams::default(), &second_restore_resource) + .create( + &PostParams::default(), + &build_second_restore(&second_restore_name, ns, &first_restore_obj, &replica_obj), + ) .await .expect("failed to create second restore"); @@ -342,20 +294,14 @@ async fn persistent_schemas_conflict_fails_restore() { wait_for_replica_phase(&replicas, replica_name, ReplicaPhase::Ready, PHASE_TIMEOUT).await; println!("--- first restore active: {first_restore_name}"); - // Capture snapshot details from the first restore to create a second one let first_restore_obj = restores .get(&first_restore_name) .await .expect("failed to get first restore"); - let snapshot_id = first_restore_obj.spec.snapshot.clone(); - let snapshot_size = first_restore_obj.spec.snapshot_size.clone(); - let storage_size = first_restore_obj.spec.storage_size.clone(); - let replica_obj = replicas .get(replica_name) .await .expect("failed to get replica"); - let replica_uid = replica_obj.uid().expect("replica has no UID"); // Manually create a second restore from the same snapshot to trigger switchover. // The "public" schema will already exist in this snapshot, so migration should @@ -363,50 +309,11 @@ async fn persistent_schemas_conflict_fails_restore() { let second_restore_name = format!("{replica_name}-conflict"); println!("--- creating second restore: {second_restore_name}"); - let second_restore = PostgresPhysicalRestore::new( - &second_restore_name, - PostgresPhysicalRestoreSpec { - replica: LocalObjectReference { - name: replica_name.into(), - }, - snapshot: snapshot_id, - snapshot_size, - snapshot_time: None, - storage_size, - }, - ); - - let mut restore_value = serde_json::to_value(&second_restore).unwrap(); - if let Some(meta) = restore_value - .as_object_mut() - .and_then(|o| o.get_mut("metadata")) - .and_then(|m| m.as_object_mut()) - { - meta.insert( - "namespace".to_string(), - serde_json::Value::String(ns.to_string()), - ); - meta.insert( - "labels".to_string(), - serde_json::json!({ "pgro.bes.au/replica": replica_name }), - ); - meta.insert( - "ownerReferences".to_string(), - serde_json::json!([{ - "apiVersion": "pgro.bes.au/v1alpha1", - "kind": "PostgresPhysicalReplica", - "name": replica_name, - "uid": replica_uid, - "controller": true, - "blockOwnerDeletion": true, - }]), - ); - } - - let second_restore_resource: PostgresPhysicalRestore = - serde_json::from_value(restore_value).unwrap(); restores - .create(&PostParams::default(), &second_restore_resource) + .create( + &PostParams::default(), + &build_second_restore(&second_restore_name, ns, &first_restore_obj, &replica_obj), + ) .await .expect("failed to create second restore"); From 8c6dce8196c5dc2e311c246ed668c814c9157f6e Mon Sep 17 00:00:00 2001 From: "review-hero[bot]" <2896273+review-hero[bot]@users.noreply.github.com> Date: Sat, 28 Feb 2026 06:28:30 +0000 Subject: [PATCH 3/4] fix: auto-fix 3 CI failures --- tests/persistent_schemas.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/persistent_schemas.rs b/tests/persistent_schemas.rs index 332684b..c924b01 100644 --- a/tests/persistent_schemas.rs +++ b/tests/persistent_schemas.rs @@ -1,6 +1,6 @@ -use k8s_openapi::api::core::v1::Secret; -use kube::{Api, api::PostParams}; -use postgres_restore_operator::types::{PostgresPhysicalReplica, PostgresPhysicalRestore, ReplicaPhase, RestorePhase}; +use k8s_openapi::api::core::v1::{LocalObjectReference, Secret}; +use kube::{Api, ResourceExt as _, api::PostParams}; +use postgres_restore_operator::types::{PostgresPhysicalReplica, PostgresPhysicalRestore, PostgresPhysicalRestoreSpec, ReplicaPhase, RestorePhase}; use tokio::time::{sleep, timeout}; use helpers::*; From 918f1d8accd9c58dd97d2a3f265dc32fd7b07e89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fe=CC=81lix=20Saparelli?= Date: Sun, 1 Mar 2026 00:40:02 +1300 Subject: [PATCH 4/4] fix: check consecutive failures before in-progress restore early return The in-progress restore check at line 898 was running before the consecutive failures check, setting the phase back to Restoring and returning early. This prevented the suspension block from correcting the phase, causing it to stay stuck at Restoring even while suspended. --- src/controllers/replica.rs | 157 ++++++++++++++++++------------------ tests/persistent_schemas.rs | 5 +- 2 files changed, 83 insertions(+), 79 deletions(-) diff --git a/src/controllers/replica.rs b/src/controllers/replica.rs index 8e555ce..8bbbc0d 100644 --- a/src/controllers/replica.rs +++ b/src/controllers/replica.rs @@ -894,6 +894,85 @@ pub async fn reconcile(replica: Arc, ctx: Arc) } } + // Check consecutive failures before the in-progress restore early return, + // so that suspension takes priority and the phase doesn't get stuck at + // Restoring when all restores are failing. + let consecutive_failures = replica + .status + .as_ref() + .and_then(|s| s.consecutive_restore_failures) + .unwrap_or(0); + + const MAX_CONSECUTIVE_FAILURES: u32 = 3; + if consecutive_failures >= MAX_CONSECUTIVE_FAILURES { + if active_restore.is_some() { + replica.update_phase(client, ReplicaPhase::Ready).await?; + } else { + replica.update_phase(client, ReplicaPhase::Pending).await?; + } + + let already_suspended = replica + .status + .as_ref() + .and_then(|s| { + s.conditions + .iter() + .find(|c| c.type_ == "RestoreSchedulingSuspended") + }) + .is_some_and(|c| c.status == "True"); + + if !already_suspended { + warn!( + replica = name, + consecutive_failures, + "restore scheduling suspended after {MAX_CONSECUTIVE_FAILURES} consecutive failures" + ); + replica + .update_condition( + client, + "RestoreSchedulingSuspended", + "True", + "ConsecutiveFailures", + &format!( + "Scheduling suspended after {consecutive_failures} consecutive restore failures. \ + Fix the underlying issue and reset with: kubectl patch postgresphysicalreplica {name} \ + -n {namespace} --subresource=status --type=merge \ + -p '{{\"status\":{{\"consecutiveRestoreFailures\":0}}}}'" + ), + ) + .await?; + } + return Ok(Action::requeue(Duration::from_secs(300))); + } + + // Clear RestoreSchedulingSuspended if it was previously set but the + // counter has since been reset below the threshold (e.g. manual patch). + if replica + .status + .as_ref() + .and_then(|s| { + s.conditions + .iter() + .find(|c| c.type_ == "RestoreSchedulingSuspended") + }) + .is_some_and(|c| c.status == "True") + { + info!( + replica = name, + consecutive_failures, + "clearing RestoreSchedulingSuspended condition (counter below threshold)" + ); + replica + .update_condition( + client, + "RestoreSchedulingSuspended", + "False", + "CounterReset", + "Consecutive failure counter reset below threshold", + ) + .await?; + } + // Decide whether to trigger a new restore if let Some(in_progress) = in_progress_restore { let phase = in_progress.status.as_ref().and_then(|s| s.phase.as_ref()); @@ -972,84 +1051,6 @@ pub async fn reconcile(replica: Arc, ctx: Arc) let schedule_decision = replica.check_schedule(); - let consecutive_failures = replica - .status - .as_ref() - .and_then(|s| s.consecutive_restore_failures) - .unwrap_or(0); - - const MAX_CONSECUTIVE_FAILURES: u32 = 3; - if consecutive_failures >= MAX_CONSECUTIVE_FAILURES { - // Update phase even while suspended so it doesn't stay stuck at - // "Restoring" after all in-progress restores have failed. - if active_restore.is_some() { - replica.update_phase(client, ReplicaPhase::Ready).await?; - } else { - replica.update_phase(client, ReplicaPhase::Pending).await?; - } - - let already_suspended = replica - .status - .as_ref() - .and_then(|s| { - s.conditions - .iter() - .find(|c| c.type_ == "RestoreSchedulingSuspended") - }) - .is_some_and(|c| c.status == "True"); - - if !already_suspended { - warn!( - replica = name, - consecutive_failures, - "restore scheduling suspended after {MAX_CONSECUTIVE_FAILURES} consecutive failures" - ); - replica - .update_condition( - client, - "RestoreSchedulingSuspended", - "True", - "ConsecutiveFailures", - &format!( - "Scheduling suspended after {consecutive_failures} consecutive restore failures. \ - Fix the underlying issue and reset with: kubectl patch postgresphysicalreplica {name} \ - -n {namespace} --subresource=status --type=merge \ - -p '{{\"status\":{{\"consecutiveRestoreFailures\":0}}}}'" - ), - ) - .await?; - } - return Ok(Action::requeue(Duration::from_secs(300))); - } - - // Clear RestoreSchedulingSuspended if it was previously set but the - // counter has since been reset below the threshold (e.g. manual patch). - if replica - .status - .as_ref() - .and_then(|s| { - s.conditions - .iter() - .find(|c| c.type_ == "RestoreSchedulingSuspended") - }) - .is_some_and(|c| c.status == "True") - { - info!( - replica = name, - consecutive_failures, - "clearing RestoreSchedulingSuspended condition (counter below threshold)" - ); - replica - .update_condition( - client, - "RestoreSchedulingSuspended", - "False", - "CounterReset", - "Consecutive failure counter reset below threshold", - ) - .await?; - } - let should_restore = never_restored || active_restore_deleted || matches!(schedule_decision, ScheduleDecision::Trigger); diff --git a/tests/persistent_schemas.rs b/tests/persistent_schemas.rs index c924b01..c055184 100644 --- a/tests/persistent_schemas.rs +++ b/tests/persistent_schemas.rs @@ -1,6 +1,9 @@ use k8s_openapi::api::core::v1::{LocalObjectReference, Secret}; use kube::{Api, ResourceExt as _, api::PostParams}; -use postgres_restore_operator::types::{PostgresPhysicalReplica, PostgresPhysicalRestore, PostgresPhysicalRestoreSpec, ReplicaPhase, RestorePhase}; +use postgres_restore_operator::types::{ + PostgresPhysicalReplica, PostgresPhysicalRestore, PostgresPhysicalRestoreSpec, ReplicaPhase, + RestorePhase, +}; use tokio::time::{sleep, timeout}; use helpers::*;