Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ jobs:
namespaces: >-
test-persistent-schemas
test-ps-conflict
test-ps-skip-missing
needs_non_pg_snapshot: false
needs_cnpg: false

Expand Down
218 changes: 129 additions & 89 deletions src/controllers/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,85 @@ pub async fn reconcile(replica: Arc<PostgresPhysicalReplica>, ctx: Arc<Context>)
}
}

// Check consecutive failures before the in-progress restore early return,
// so that suspension takes priority and the phase doesn't get stuck at
// Restoring when all restores are failing.
let consecutive_failures = replica
.status
.as_ref()
.and_then(|s| s.consecutive_restore_failures)
.unwrap_or(0);

const MAX_CONSECUTIVE_FAILURES: u32 = 3;
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
if active_restore.is_some() {
replica.update_phase(client, ReplicaPhase::Ready).await?;
} else {
replica.update_phase(client, ReplicaPhase::Pending).await?;
}

let already_suspended = replica
.status
.as_ref()
.and_then(|s| {
s.conditions
.iter()
.find(|c| c.type_ == "RestoreSchedulingSuspended")
})
.is_some_and(|c| c.status == "True");

if !already_suspended {
warn!(
replica = name,
consecutive_failures,
"restore scheduling suspended after {MAX_CONSECUTIVE_FAILURES} consecutive failures"
);
replica
.update_condition(
client,
"RestoreSchedulingSuspended",
"True",
"ConsecutiveFailures",
&format!(
"Scheduling suspended after {consecutive_failures} consecutive restore failures. \
Fix the underlying issue and reset with: kubectl patch postgresphysicalreplica {name} \
-n {namespace} --subresource=status --type=merge \
-p '{{\"status\":{{\"consecutiveRestoreFailures\":0}}}}'"
),
)
.await?;
}
return Ok(Action::requeue(Duration::from_secs(300)));
}

// Clear RestoreSchedulingSuspended if it was previously set but the
// counter has since been reset below the threshold (e.g. manual patch).
if replica
.status
.as_ref()
.and_then(|s| {
s.conditions
.iter()
.find(|c| c.type_ == "RestoreSchedulingSuspended")
})
.is_some_and(|c| c.status == "True")
{
info!(
replica = name,
consecutive_failures,
"clearing RestoreSchedulingSuspended condition (counter below threshold)"
);
replica
.update_condition(
client,
"RestoreSchedulingSuspended",
"False",
"CounterReset",
"Consecutive failure counter reset below threshold",
)
.await?;
}

// Decide whether to trigger a new restore
if let Some(in_progress) = in_progress_restore {
let phase = in_progress.status.as_ref().and_then(|s| s.phase.as_ref());
Expand Down Expand Up @@ -972,84 +1051,6 @@ pub async fn reconcile(replica: Arc<PostgresPhysicalReplica>, ctx: Arc<Context>)

let schedule_decision = replica.check_schedule();

let consecutive_failures = replica
.status
.as_ref()
.and_then(|s| s.consecutive_restore_failures)
.unwrap_or(0);

const MAX_CONSECUTIVE_FAILURES: u32 = 3;
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
// Update phase even while suspended so it doesn't stay stuck at
// "Restoring" after all in-progress restores have failed.
if active_restore.is_some() {
replica.update_phase(client, ReplicaPhase::Ready).await?;
} else {
replica.update_phase(client, ReplicaPhase::Pending).await?;
}

let already_suspended = replica
.status
.as_ref()
.and_then(|s| {
s.conditions
.iter()
.find(|c| c.type_ == "RestoreSchedulingSuspended")
})
.is_some_and(|c| c.status == "True");

if !already_suspended {
warn!(
replica = name,
consecutive_failures,
"restore scheduling suspended after {MAX_CONSECUTIVE_FAILURES} consecutive failures"
);
replica
.update_condition(
client,
"RestoreSchedulingSuspended",
"True",
"ConsecutiveFailures",
&format!(
"Scheduling suspended after {consecutive_failures} consecutive restore failures. \
Fix the underlying issue and reset with: kubectl patch postgresphysicalreplica {name} \
-n {namespace} --subresource=status --type=merge \
-p '{{\"status\":{{\"consecutiveRestoreFailures\":0}}}}'"
),
)
.await?;
}
return Ok(Action::requeue(Duration::from_secs(300)));
}

// Clear RestoreSchedulingSuspended if it was previously set but the
// counter has since been reset below the threshold (e.g. manual patch).
if replica
.status
.as_ref()
.and_then(|s| {
s.conditions
.iter()
.find(|c| c.type_ == "RestoreSchedulingSuspended")
})
.is_some_and(|c| c.status == "True")
{
info!(
replica = name,
consecutive_failures,
"clearing RestoreSchedulingSuspended condition (counter below threshold)"
);
replica
.update_condition(
client,
"RestoreSchedulingSuspended",
"False",
"CounterReset",
"Consecutive failure counter reset below threshold",
)
.await?;
}

let should_restore = never_restored
|| active_restore_deleted
|| matches!(schedule_decision, ScheduleDecision::Trigger);
Expand Down Expand Up @@ -1295,6 +1296,41 @@ async fn reconcile_schema_migration(
)
.await?;

// Filter out schemas that don't exist on the source. This happens when the
// user adds a schema to persistent_schemas before actually creating it.
let all_schemas: &[String] = schemas;
let schemas = query_existing_schemas(
client,
namespace,
&old_restore_name,
&source_dbname,
&reader_user,
&reader_password,
all_schemas,
ctx.use_port_forward(),
)
.await?;
let existing_set: HashSet<&str> = schemas.iter().map(String::as_str).collect();
let skipped: Vec<&String> = all_schemas
.iter()
.filter(|s| !existing_set.contains(s.as_str()))
.collect();
if !skipped.is_empty() {
warn!(
restore = %old_restore_name,
skipped = ?skipped,
"persistent schemas not found on source, skipping"
);
}

if schemas.is_empty() {
info!(
replica = %replica_name,
"no persistent schemas exist on source, skipping migration"
);
return Ok(true);
}

// Measure the actual on-disk database size of the source restore and
// compute how much the persistent schemas have grown beyond the original
// snapshot. This delta is stored in the replica status so the next
Expand Down Expand Up @@ -1357,14 +1393,14 @@ async fn reconcile_schema_migration(
// Check that none of the persistent schemas already exist in the snapshot.
// If they do, the pg_dump|psql migration would conflict, so we must fail
// the restore instead of attempting migration.
let conflicting = check_snapshot_schema_conflicts(
let conflicting = query_existing_schemas(
client,
namespace,
&new_restore_name,
&target_dbname,
&reader_user,
&reader_password,
schemas,
&schemas,
ctx.use_port_forward(),
)
.await?;
Expand Down Expand Up @@ -1452,7 +1488,7 @@ async fn reconcile_schema_migration(
&new_restore_name,
&source_dbname,
&target_dbname,
schemas,
&schemas,
&reader_secret_name,
&target_superuser_secret_name,
&callback_url,
Expand Down Expand Up @@ -1480,14 +1516,15 @@ async fn reconcile_schema_migration(
Ok(false) // Job created, not complete yet
}

/// Check whether any of the persistent schemas already exist in the snapshot
/// (i.e. in the new restore database before migration). Returns the list of
/// schema names that were found.
/// Query which of the requested schemas actually exist in a restore's database.
/// Returns schemas in the same order as the input slice, using the system
/// catalog (`pg_catalog.pg_namespace`) which is always fully visible regardless
/// of schema-level privileges.
#[expect(
clippy::too_many_arguments,
reason = "internal helper with tightly-coupled params"
)]
async fn check_snapshot_schema_conflicts(
async fn query_existing_schemas(
client: &Client,
namespace: &str,
restore_name: &str,
Expand All @@ -1511,14 +1548,17 @@ async fn check_snapshot_schema_conflicts(
let rows = conn
.client
.query(
"SELECT schema_name FROM information_schema.schemata \
WHERE schema_name = ANY($1)",
"SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname = ANY($1)",
&[&schemas],
)
.await?;

let found: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
Ok(found)
let found: HashSet<String> = rows.iter().map(|r| r.get::<_, String>(0)).collect();
Ok(schemas
.iter()
.filter(|s| found.contains(s.as_str()))
.cloned()
.collect())
}

pub fn error_policy(
Expand Down
32 changes: 30 additions & 2 deletions tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
use std::{collections::BTreeMap, time::Duration};

use jiff::Span;

use k8s_openapi::{
ByteString,
api::{
batch::v1::Job,
core::v1::{Secret, SecretReference},
core::v1::{LocalObjectReference, Secret, SecretReference},
},
apimachinery::pkg::api::resource::Quantity,
};
Expand All @@ -21,7 +22,7 @@ use kube::{
use postgres_restore_operator::{
types::{
OverlayDatabaseConfig, PostgresPhysicalReplica, PostgresPhysicalReplicaSpec,
PostgresPhysicalRestore, ReplicaPhase, RestorePhase,
PostgresPhysicalRestore, PostgresPhysicalRestoreSpec, ReplicaPhase, RestorePhase,
},
util::TimeSpan,
};
Expand Down Expand Up @@ -325,3 +326,30 @@ pub async fn wait_for_pod_ready(ns: &str, pod: &str, timeout_dur: Duration) {
.await
.unwrap_or_else(|_| panic!("timed out waiting for pod {pod} to be ready in namespace {ns}"));
}

pub fn build_second_restore(
name: &str,
ns: &str,
first_restore: &PostgresPhysicalRestore,
replica: &PostgresPhysicalReplica,
) -> PostgresPhysicalRestore {
let mut restore = PostgresPhysicalRestore::new(
name,
PostgresPhysicalRestoreSpec {
replica: LocalObjectReference {
name: replica.name_any(),
},
snapshot: first_restore.spec.snapshot.clone(),
snapshot_size: first_restore.spec.snapshot_size.clone(),
snapshot_time: None,
storage_size: first_restore.spec.storage_size.clone(),
},
);
restore.metadata.namespace = Some(ns.to_string());
restore.metadata.labels = Some(BTreeMap::from([(
"pgro.bes.au/replica".to_string(),
replica.name_any(),
)]));
restore.metadata.owner_references = Some(vec![replica.owner_reference()]);
restore
}
Loading