@@ -894,6 +894,85 @@ pub async fn reconcile(replica: Arc<PostgresPhysicalReplica>, ctx: Arc<Context>)
894894 }
895895 }
896896
897+ // Check consecutive failures before the in-progress restore early return,
898+ // so that suspension takes priority and the phase doesn't get stuck at
899+ // Restoring when all restores are failing.
900+ let consecutive_failures = replica
901+ . status
902+ . as_ref ( )
903+ . and_then ( |s| s. consecutive_restore_failures )
904+ . unwrap_or ( 0 ) ;
905+
906+ const MAX_CONSECUTIVE_FAILURES : u32 = 3 ;
907+ if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
908+ if active_restore. is_some ( ) {
909+ replica. update_phase ( client, ReplicaPhase :: Ready ) . await ?;
910+ } else {
911+ replica. update_phase ( client, ReplicaPhase :: Pending ) . await ?;
912+ }
913+
914+ let already_suspended = replica
915+ . status
916+ . as_ref ( )
917+ . and_then ( |s| {
918+ s. conditions
919+ . iter ( )
920+ . find ( |c| c. type_ == "RestoreSchedulingSuspended" )
921+ } )
922+ . is_some_and ( |c| c. status == "True" ) ;
923+
924+ if !already_suspended {
925+ warn ! (
926+ replica = name,
927+ consecutive_failures,
928+ "restore scheduling suspended after {MAX_CONSECUTIVE_FAILURES} consecutive failures"
929+ ) ;
930+ replica
931+ . update_condition (
932+ client,
933+ "RestoreSchedulingSuspended" ,
934+ "True" ,
935+ "ConsecutiveFailures" ,
936+ & format ! (
937+ "Scheduling suspended after {consecutive_failures} consecutive restore failures. \
938+ Fix the underlying issue and reset with: kubectl patch postgresphysicalreplica {name} \
939+ -n {namespace} --subresource=status --type=merge \
940+ -p '{{\" status\" :{{\" consecutiveRestoreFailures\" :0}}}}'"
941+ ) ,
942+ )
943+ . await ?;
944+ }
945+ return Ok ( Action :: requeue ( Duration :: from_secs ( 300 ) ) ) ;
946+ }
947+
948+ // Clear RestoreSchedulingSuspended if it was previously set but the
949+ // counter has since been reset below the threshold (e.g. manual patch).
950+ if replica
951+ . status
952+ . as_ref ( )
953+ . and_then ( |s| {
954+ s. conditions
955+ . iter ( )
956+ . find ( |c| c. type_ == "RestoreSchedulingSuspended" )
957+ } )
958+ . is_some_and ( |c| c. status == "True" )
959+ {
960+ info ! (
961+ replica = name,
962+ consecutive_failures,
963+ "clearing RestoreSchedulingSuspended condition (counter below threshold)"
964+ ) ;
965+ replica
966+ . update_condition (
967+ client,
968+ "RestoreSchedulingSuspended" ,
969+ "False" ,
970+ "CounterReset" ,
971+ "Consecutive failure counter reset below threshold" ,
972+ )
973+ . await ?;
974+ }
975+
897976 // Decide whether to trigger a new restore
898977 if let Some ( in_progress) = in_progress_restore {
899978 let phase = in_progress. status . as_ref ( ) . and_then ( |s| s. phase . as_ref ( ) ) ;
@@ -972,84 +1051,6 @@ pub async fn reconcile(replica: Arc<PostgresPhysicalReplica>, ctx: Arc<Context>)
9721051
9731052 let schedule_decision = replica. check_schedule ( ) ;
9741053
975- let consecutive_failures = replica
976- . status
977- . as_ref ( )
978- . and_then ( |s| s. consecutive_restore_failures )
979- . unwrap_or ( 0 ) ;
980-
981- const MAX_CONSECUTIVE_FAILURES : u32 = 3 ;
982- if consecutive_failures >= MAX_CONSECUTIVE_FAILURES {
983- // Update phase even while suspended so it doesn't stay stuck at
984- // "Restoring" after all in-progress restores have failed.
985- if active_restore. is_some ( ) {
986- replica. update_phase ( client, ReplicaPhase :: Ready ) . await ?;
987- } else {
988- replica. update_phase ( client, ReplicaPhase :: Pending ) . await ?;
989- }
990-
991- let already_suspended = replica
992- . status
993- . as_ref ( )
994- . and_then ( |s| {
995- s. conditions
996- . iter ( )
997- . find ( |c| c. type_ == "RestoreSchedulingSuspended" )
998- } )
999- . is_some_and ( |c| c. status == "True" ) ;
1000-
1001- if !already_suspended {
1002- warn ! (
1003- replica = name,
1004- consecutive_failures,
1005- "restore scheduling suspended after {MAX_CONSECUTIVE_FAILURES} consecutive failures"
1006- ) ;
1007- replica
1008- . update_condition (
1009- client,
1010- "RestoreSchedulingSuspended" ,
1011- "True" ,
1012- "ConsecutiveFailures" ,
1013- & format ! (
1014- "Scheduling suspended after {consecutive_failures} consecutive restore failures. \
1015- Fix the underlying issue and reset with: kubectl patch postgresphysicalreplica {name} \
1016- -n {namespace} --subresource=status --type=merge \
1017- -p '{{\" status\" :{{\" consecutiveRestoreFailures\" :0}}}}'"
1018- ) ,
1019- )
1020- . await ?;
1021- }
1022- return Ok ( Action :: requeue ( Duration :: from_secs ( 300 ) ) ) ;
1023- }
1024-
1025- // Clear RestoreSchedulingSuspended if it was previously set but the
1026- // counter has since been reset below the threshold (e.g. manual patch).
1027- if replica
1028- . status
1029- . as_ref ( )
1030- . and_then ( |s| {
1031- s. conditions
1032- . iter ( )
1033- . find ( |c| c. type_ == "RestoreSchedulingSuspended" )
1034- } )
1035- . is_some_and ( |c| c. status == "True" )
1036- {
1037- info ! (
1038- replica = name,
1039- consecutive_failures,
1040- "clearing RestoreSchedulingSuspended condition (counter below threshold)"
1041- ) ;
1042- replica
1043- . update_condition (
1044- client,
1045- "RestoreSchedulingSuspended" ,
1046- "False" ,
1047- "CounterReset" ,
1048- "Consecutive failure counter reset below threshold" ,
1049- )
1050- . await ?;
1051- }
1052-
10531054 let should_restore = never_restored
10541055 || active_restore_deleted
10551056 || matches ! ( schedule_decision, ScheduleDecision :: Trigger ) ;
0 commit comments