diff --git a/src/workers/continuum-core/src/governor/local.rs b/src/workers/continuum-core/src/governor/local.rs index b4427a277..19dacd5ff 100644 --- a/src/workers/continuum-core/src/governor/local.rs +++ b/src/workers/continuum-core/src/governor/local.rs @@ -45,11 +45,14 @@ //! - Policy directory discovery (PR-3d); callers must provide explicit //! candidates via `set_candidates` -use crate::governor::cascade::{apply_action, evaluate_next_step, CascadeAction, CascadeThresholds}; -use crate::governor::policy_selector::{select_policy, PolicySelectionError}; -use crate::governor::types::{GovernorPolicy, GovernorSnapshot, HardwareClass, PressureSignal}; use crate::governor::PolicyFile; use crate::governor::SubstrateGovernor; +use crate::governor::cascade::{ + CascadeAction, CascadeThresholds, apply_action, apply_cascade_step_to_policy, + evaluate_next_step, +}; +use crate::governor::policy_selector::{PolicySelectionError, select_policy}; +use crate::governor::types::{GovernorPolicy, GovernorSnapshot, HardwareClass, PressureSignal}; use arc_swap::ArcSwap; use std::sync::{Arc, Mutex}; @@ -80,9 +83,25 @@ const RECENT_SIGNALS_CAPACITY: usize = 32; pub struct LocalSubstrateGovernor { /// Wait-free policy publish. `current_policy()` is an /// `ArcSwap::load_full()` (returns `Arc`); writers - /// `store(Arc::new(new_policy))`. + /// `store(Arc::new(new_policy))`. This is the ACTIVE (possibly- + /// throttled) policy; see `base_policy` for the un-throttled + /// canonical version. policy: Arc>, + /// BASE policy — the canonical un-throttled policy as loaded from + /// the policy file (cascade_step always 0). Cascade transitions + /// always derive the new ACTIVE policy by calling + /// `apply_cascade_step_to_policy(base, new_step)` rather than + /// transforming the already-throttled current policy. This is + /// what `apply_cascade_step_to_policy`'s `not_reversible` test + /// (PR-3c3) was preparing for — keep the base separate so retreat + /// can re-derive cleanly. + /// + /// Mutex-protected because `on_hardware_detected` rewrites it + /// when a new HardwareClass is detected; cascade transitions + /// only READ it under the same mutex. + base_policy: Mutex, + /// Pool of candidate policy files. `on_hardware_detected` walks /// this with `select_policy` (PR-3a) to pick the best match. /// Empty until `set_candidates` is called — until then, @@ -98,6 +117,18 @@ pub struct LocalSubstrateGovernor { struct SnapshotState { cascade_transition_count: u64, recent_signals: Vec, + /// Restore-speculation-one-step-later marker (PR-3c4). When the + /// cascade RETREATS from step N → N-1, set this true. On the + /// NEXT retreat (or the next inactivity check), apply the lower + /// step's transformations BUT keep speculation at the previous + /// (one-higher-step) value for one more cycle. Clears when the + /// cycle completes. + /// + /// The spec's "restore speculation one step later" rule is the + /// load-bearing anti-oscillation guarantee — speculation thrash + /// is the most user-visible cascade flapping, and keeping it + /// dampened by one step prevents back-and-forth. + pending_speculation_retreat: bool, /// Current cascade step. Mirrors `policy.cascade_step` but tracked /// here separately so the time-in-step gate doesn't have to /// arc_swap-load the full policy on every signal. @@ -120,8 +151,16 @@ impl LocalSubstrateGovernor { /// `on_hardware_detected` can rewrite later. pub fn new(initial_policy: GovernorPolicy) -> Self { let initial_step = initial_policy.cascade_step; + // The initial policy IS the base — caller passes the + // canonical un-throttled version. Cascade transitions + // re-derive ACTIVE from BASE; if cascade_step != 0 at + // construction time, we still treat the supplied policy + // as base (cascade_step normalization is the caller's job). + let mut base = initial_policy.clone(); + base.cascade_step = 0; Self { policy: Arc::new(ArcSwap::from(Arc::new(initial_policy))), + base_policy: Mutex::new(base), candidates: Mutex::new(Vec::new()), snapshot_state: Mutex::new(SnapshotState { cascade_transition_count: 0, @@ -129,6 +168,7 @@ impl LocalSubstrateGovernor { current_step: initial_step, last_step_change_ms: now_unix_ms(), thresholds: CascadeThresholds::default(), + pending_speculation_retreat: false, }), } } @@ -193,7 +233,30 @@ impl LocalSubstrateGovernor { .expect("LocalSubstrateGovernor candidates mutex poisoned"); let selected = select_policy(&candidates, &hw)?; let new_policy = crate::governor::into_governor_policy(selected.clone(), hw, now_unix_ms()); - drop(candidates); // release before publish to keep mutex hold time tiny + drop(candidates); + + // PR-3c4: refresh BASE policy too. New hardware = new canonical + // base; cascade transitions re-derive from this. Reset the + // cascade to step 0 (new hardware = fresh start; if pressure + // returns, the cascade re-evaluates from a known-good state). + { + let mut base = self + .base_policy + .lock() + .expect("LocalSubstrateGovernor base_policy mutex poisoned"); + *base = new_policy.clone(); + base.cascade_step = 0; + } + { + let mut state = self + .snapshot_state + .lock() + .expect("LocalSubstrateGovernor snapshot mutex poisoned"); + state.current_step = 0; + state.last_step_change_ms = now_unix_ms(); + state.pending_speculation_retreat = false; + } + self.publish(new_policy); Ok(()) } @@ -209,11 +272,18 @@ impl SubstrateGovernor for LocalSubstrateGovernor { } fn on_pressure_signal(&self, signal: PressureSignal) { - // PR-3c2 wiring: record signal + evaluate cascade action + - // (conditionally) apply via cascade_step rewrite. The - // time-in-step gate prevents brief spikes from advancing past - // step 1; emergency signals (thermal Critical, battery < - // emergency_pct) bypass the gate per spec. + // PR-3c2 wiring + PR-3c4 base-vs-active split: + // - record signal in ring + // - evaluate cascade action (Hold/Advance/Retreat/EmergencyAdvanceToMax) + // - time-in-step gate blocks Advance from step > 0 within + // MIN_TIME_IN_STEP_MS (brief spikes don't escalate) + // - EmergencyAdvanceToMax bypasses gate (protect hardware/user) + // - Retreat never gated (hysteresis IS the anti-oscillation) + // - On step change: derive new ACTIVE from BASE via + // apply_cascade_step_to_policy (not from current — keeps + // transformations symmetric + reversible) + // - Restore-speculation-one-step-later: on retreat, keep + // speculation at the higher-step value for one more cycle let now = now_unix_ms(); let mut new_policy_to_publish: Option = None; @@ -223,24 +293,18 @@ impl SubstrateGovernor for LocalSubstrateGovernor { .lock() .expect("LocalSubstrateGovernor snapshot mutex poisoned"); - // Record the signal in the ring (existing PR-3b behavior). + // Record the signal in the ring. if state.recent_signals.len() >= RECENT_SIGNALS_CAPACITY { state.recent_signals.remove(0); } state.recent_signals.push(signal); - // Evaluate cascade action. let action = evaluate_next_step(state.current_step, &signal, &state.thresholds); - // Time-in-step gate: Advance from a non-zero step requires - // sustained pressure (current step active > MIN_TIME_IN_STEP_MS). - // EmergencyAdvanceToMax bypasses the gate. Retreat is never - // gated by time (hysteresis IS the anti-oscillation). let gated_action = match action { CascadeAction::Advance => { let time_in_step = now.saturating_sub(state.last_step_change_ms); if state.current_step > 0 && time_in_step < MIN_TIME_IN_STEP_MS { - // Brief spike — hold rather than advance. CascadeAction::Hold } else { action @@ -249,29 +313,78 @@ impl SubstrateGovernor for LocalSubstrateGovernor { _ => action, }; - // Apply the action to the step counter. If it changed, - // build the new policy to publish + update step-change ts. - let new_step = apply_action(state.current_step, gated_action); - if new_step != state.current_step { + let prev_step = state.current_step; + let new_step = apply_action(prev_step, gated_action); + if new_step != prev_step { state.current_step = new_step; state.last_step_change_ms = now; - // Snapshot the current policy + bump cascade_step to - // the new value. PR-3c3 will extend this with - // apply_cascade_step_to_policy that rewrites - // tier_sizes / cadence / concurrency / speculation per - // the spec's per-step transformations. For PR-3c2 only - // cascade_step changes; downstream consumers can read - // it + react. - let current = self.policy.load_full(); - let mut next_policy: GovernorPolicy = (*current).clone(); - next_policy.cascade_step = new_step; - next_policy.policy_version = next_policy.policy_version.saturating_add(1); + + // Whether THIS transition is a retreat (used for + // restore-speculation-one-step-later logic). + let is_retreat = new_step < prev_step; + + // Re-derive active policy from BASE — NOT from current. + // Per PR-3c3's not-reversible test: transformations + // applied to an already-transformed policy don't undo + // cleanly. Always derive from the canonical base. + let base_clone: GovernorPolicy = self + .base_policy + .lock() + .expect("LocalSubstrateGovernor base_policy mutex poisoned") + .clone(); + + let mut next_policy = apply_cascade_step_to_policy(&base_clone, new_step); + + // Restore-speculation-one-step-later: on retreat, keep + // speculation at the PREVIOUS-step (higher) value for + // one more cycle. This dampens speculation thrash — + // the most user-visible cascade flapping per spec. + // + // On advance, clear any pending retreat marker — new + // pressure means we're going up, not still completing + // a previous restoration. + if is_retreat { + // Compute what the previous step's speculation + // would have been + use that instead of new_step's. + let prev_step_policy = apply_cascade_step_to_policy(&base_clone, prev_step); + next_policy.speculation_aggressiveness = + prev_step_policy.speculation_aggressiveness; + state.pending_speculation_retreat = true; + } else if state.pending_speculation_retreat + && gated_action == CascadeAction::Advance + { + // Advancing again clears the pending-retreat marker + // since speculation will be re-throttled by the + // new (higher) step's transformations. + state.pending_speculation_retreat = false; + } + + next_policy.policy_version = + self.policy.load_full().policy_version.saturating_add(1); next_policy.committed_at_ms = now; new_policy_to_publish = Some(next_policy); + } else if state.pending_speculation_retreat && gated_action == CascadeAction::Hold { + // Hold with pending retreat marker → restore speculation + // to the lower-step value (the "one cycle later" delivery). + // This is the second-half of the restore-one-step-later + // semantics: first retreat keeps speculation high; next + // Hold-or-Retreat clears it. + let base_clone: GovernorPolicy = self + .base_policy + .lock() + .expect("LocalSubstrateGovernor base_policy mutex poisoned") + .clone(); + let mut next_policy = apply_cascade_step_to_policy(&base_clone, state.current_step); + next_policy.policy_version = + self.policy.load_full().policy_version.saturating_add(1); + next_policy.committed_at_ms = now; + state.pending_speculation_retreat = false; + // Don't bump cascade_transition_count for this — the + // step didn't change, only speculation restored. + self.policy.store(Arc::new(next_policy)); + return; } } - // Release the snapshot_state mutex before publishing to keep - // hold time tiny + avoid lock ordering with the policy ArcSwap. if let Some(policy) = new_policy_to_publish { self.publish(policy); } @@ -681,7 +794,10 @@ mod tests { }); let snap = g.snapshot(); assert_eq!(snap.cascade_transition_count, 1); - assert_eq!(snap.current_policy.cascade_step, 5, "thermal Critical → EmergencyAdvanceToMax (step 5)"); + assert_eq!( + snap.current_policy.cascade_step, 5, + "thermal Critical → EmergencyAdvanceToMax (step 5)" + ); assert_eq!(g.current_cascade_step(), 5); } @@ -693,7 +809,11 @@ mod tests { fn pressure_signal_first_advance_no_gate() { let g = LocalSubstrateGovernor::new(initial_policy()); g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 }); - assert_eq!(g.current_cascade_step(), 1, "step 0 → 1 advance fires immediately"); + assert_eq!( + g.current_cascade_step(), + 1, + "step 0 → 1 advance fires immediately" + ); } /// What this catches: from step 1, a second-stage-triggering @@ -726,7 +846,11 @@ mod tests { g.on_pressure_signal(PressureSignal::Thermal { severity: ThermalSeverity::Critical, }); - assert_eq!(g.current_cascade_step(), 5, "emergency bypasses time-in-step gate"); + assert_eq!( + g.current_cascade_step(), + 5, + "emergency bypasses time-in-step gate" + ); } /// What this catches: Retreat is NOT gated by time-in-step. Cascade @@ -740,7 +864,11 @@ mod tests { assert_eq!(g.current_cascade_step(), 1); // Retreat immediately — should fire even though step 1 was just entered g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.1 }); - assert_eq!(g.current_cascade_step(), 0, "retreat fires regardless of time-in-step"); + assert_eq!( + g.current_cascade_step(), + 0, + "retreat fires regardless of time-in-step" + ); } /// What this catches: cascade_step changes on signal-driven @@ -771,8 +899,15 @@ mod tests { let before_transitions = g.snapshot().cascade_transition_count; g.on_pressure_signal(PressureSignal::UserActive { foreground: true }); let after_transitions = g.snapshot().cascade_transition_count; - assert_eq!(after_transitions, before_transitions, "UserActive doesn't transition"); - assert_eq!(g.snapshot().recent_signals.len(), 1, "but signal IS recorded"); + assert_eq!( + after_transitions, before_transitions, + "UserActive doesn't transition" + ); + assert_eq!( + g.snapshot().recent_signals.len(), + 1, + "but signal IS recorded" + ); } /// What this catches: set_thresholds replaces the cascade @@ -790,7 +925,254 @@ mod tests { }; g.set_thresholds(custom); g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 }); - assert_eq!(g.current_cascade_step(), 0, "raised threshold means 0.7 no longer advances"); + assert_eq!( + g.current_cascade_step(), + 0, + "raised threshold means 0.7 no longer advances" + ); + } + + // ===== PR-3c4: apply_cascade_step_to_policy wiring + base/active split ===== + + /// What this catches: cascade Advance derives active policy from + /// BASE via apply_cascade_step_to_policy. Active policy after step + /// 1 has speculation_aggressiveness dropped (per PR-3c3 table). + #[test] + fn advance_derives_active_from_base_with_step_transformations() { + let mut base = initial_policy(); + base.speculation_aggressiveness = SpeculationLevel::Aggressive; + let g = LocalSubstrateGovernor::new(base); + + g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 }); + + let active = g.current_policy(); + assert_eq!(active.cascade_step, 1); + // Step 1 drops speculation: Aggressive → Balanced + assert_eq!( + active.speculation_aggressiveness, + SpeculationLevel::Balanced + ); + } + + /// What this catches: emergency-advance-to-max derives active + /// from base at step 5 — all per-step transformations cumulative. + /// tier_sizes l1 shrunk, federation cadence maxed, consolidation + /// Manual. The full-throttle state. + #[test] + fn emergency_advance_applies_full_throttle_transformations() { + let mut base = initial_policy(); + base.tier_sizes.l1_lora_layers = 8; + base.tier_sizes.l1_kv_tokens = 16384; + base.federation_pull_cadence.pull_cadence_seconds = 60; + base.consolidation_schedule = ConsolidationSchedule::Idle; + base.speculation_aggressiveness = SpeculationLevel::Aggressive; + base.concurrency_caps.personas_concurrent = 8; + let g = LocalSubstrateGovernor::new(base); + + g.on_pressure_signal(PressureSignal::Thermal { + severity: ThermalSeverity::Critical, + }); + + let active = g.current_policy(); + assert_eq!(active.cascade_step, 5); + // All cumulative transformations applied + assert_eq!(active.tier_sizes.l1_lora_layers, 6); // 8 * 0.75 + assert_eq!( + active.federation_pull_cadence.pull_cadence_seconds, + 3600 // MAX_FEDERATION_PULL_CADENCE_SECONDS + ); + assert_eq!(active.consolidation_schedule, ConsolidationSchedule::Manual); + assert_eq!( + active.speculation_aggressiveness, + SpeculationLevel::Balanced + ); // Aggr→Balanced + assert_eq!(active.concurrency_caps.personas_concurrent, 7); // 8-1 + } + + /// What this catches: restore-speculation-one-step-later. + /// Advance → Retreat keeps speculation at PREVIOUS-step value; + /// next Hold restores it to current-step value. Anti-oscillation + /// for the most user-visible cascade flapping. + #[test] + fn retreat_holds_speculation_for_one_more_cycle() { + let mut base = initial_policy(); + base.speculation_aggressiveness = SpeculationLevel::Aggressive; + let g = LocalSubstrateGovernor::new(base); + + // Advance to step 1 — speculation drops to Balanced + g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 }); + assert_eq!(g.current_cascade_step(), 1); + assert_eq!( + g.current_policy().speculation_aggressiveness, + SpeculationLevel::Balanced + ); + + // Retreat to step 0 — cascade_step = 0 but speculation STAYS at + // Balanced (one-step-later semantics) + g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.1 }); + assert_eq!(g.current_cascade_step(), 0); + assert_eq!( + g.current_policy().speculation_aggressiveness, + SpeculationLevel::Balanced, + "speculation should stay at step-1 (Balanced) for one cycle after retreat" + ); + + // Next Hold delivers the speculation restoration — back to Aggressive + g.on_pressure_signal(PressureSignal::UserActive { foreground: true }); + assert_eq!( + g.current_policy().speculation_aggressiveness, + SpeculationLevel::Aggressive, + "speculation restored to step-0 (Aggressive) on next Hold" + ); + } + + /// What this catches: re-advancing during pending-retreat clears + /// the marker (speculation re-throttles immediately to the new + /// step's value). The asymmetric restore-one-later only applies + /// to RETREAT, not advance. + #[test] + fn advance_during_pending_retreat_clears_marker() { + let mut base = initial_policy(); + base.speculation_aggressiveness = SpeculationLevel::Aggressive; + let g = LocalSubstrateGovernor::new(base); + + // Advance to step 1 + g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 }); + // Retreat to step 0 (speculation still Balanced — pending marker set) + g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.1 }); + assert_eq!( + g.current_policy().speculation_aggressiveness, + SpeculationLevel::Balanced + ); + + // Sleep simulated by manually adjusting last_step_change_ms + // to bypass the time-in-step gate would be needed here, but + // since prev_step=0 the gate doesn't apply (step 0 → 1 is + // immediate). Advance again — speculation jumps back to + // Balanced (step 1's value), pending marker cleared. + g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 }); + assert_eq!(g.current_cascade_step(), 1); + // Step 1's speculation is Balanced (Aggressive → Balanced) + assert_eq!( + g.current_policy().speculation_aggressiveness, + SpeculationLevel::Balanced + ); + + // Now Hold — should NOT restore speculation since marker was + // cleared by the second advance + g.on_pressure_signal(PressureSignal::UserActive { foreground: true }); + assert_eq!( + g.current_policy().speculation_aggressiveness, + SpeculationLevel::Balanced, + "after marker cleared, Hold doesn't restore" + ); + } + + /// What this catches: hardware_detected refreshes the BASE + /// policy AND resets cascade to step 0. New hardware = fresh start; + /// existing cascade pressure state is discarded. + #[test] + fn hardware_detected_refreshes_base_and_resets_cascade() { + let g = LocalSubstrateGovernor::new(initial_policy()); + g.set_candidates(vec![policy_with_l1(2), policy_with_l1_nvidia(8)]); + + // Push cascade to step 3 + g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 }); + // Force time to advance past gate (in real run; here we just + // accept that step 1 is reached, which is enough to prove + // the reset clears it) + assert!(g.current_cascade_step() >= 1); + + // Hardware change resets cascade + let blackwell = hw( + TargetSilicon::NvidiaCuda, + ThermalClass::Workstation, + 32 * 1024, + 64 * 1024, + ); + g.on_hardware_detected(blackwell).unwrap(); + + assert_eq!( + g.current_cascade_step(), + 0, + "hardware change resets cascade to 0" + ); + // Active policy is from the new candidate (l1_lora_layers=8 from blackwell) + assert_eq!(g.current_policy().tier_sizes.l1_lora_layers, 8); + } + + /// What this catches: derive-from-base means consecutive + /// transitions don't compound transformations. Advance 0→1→0 + /// returns to the BASE policy values, not to a doubly-transformed + /// state. This was the not-reversible warning from PR-3c3. + #[test] + fn advance_then_retreat_returns_to_base_values_modulo_speculation_dampening() { + let mut base = initial_policy(); + base.tier_sizes.l1_lora_layers = 8; + base.tier_sizes.l1_kv_tokens = 16384; + let g = LocalSubstrateGovernor::new(base); + + // Step 0 → step 1 (only speculation changes; tier_sizes + // unaffected since step 3 is where l1 shrinks) + g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 }); + // Retreat to step 0 — tier_sizes back to base + g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.1 }); + + let active = g.current_policy(); + assert_eq!(active.cascade_step, 0); + // tier_sizes back to base (step 0 transformation, derived from base) + assert_eq!(active.tier_sizes.l1_lora_layers, 8); + assert_eq!(active.tier_sizes.l1_kv_tokens, 16384); + } + + // Helpers for tests above + + fn policy_with_l1(l1: u32) -> PolicyFile { + use crate::governor::policy_file::*; + PolicyFile { + policy_version: 1, + applies_to: "apple-m,thinandlight,uma,vram_mb=0..0,ram_mb=14000..18000".into(), + tier_sizes: TierSizesFile { + l1_lora_layers: l1, + l1_kv_tokens: 2048, + l2_lora_layers: 4, + l3_lora_layers: 12, + l3_engrams: 1024, + }, + cadence_multipliers: CadenceMultipliersFile { + realtime: 1.0, + delayed: 1.0, + background: 1.0, + }, + concurrency_caps: ConcurrencyCapsFile { + personas_concurrent: 2, + inference_lanes: 1, + foundry_lanes: 0, + sentinel_lanes: 1, + }, + speculation: SpeculationFileSection { + level: SpeculationLevel::Conservative, + }, + consolidation: ConsolidationFileSection { + schedule: ConsolidationSchedule::Manual, + }, + federation: FederationCadenceFile { + pull_cadence_seconds: 600, + }, + recall_weights: RecallScoreWeightsFile { + semantic: 0.4, + outcome_history: 0.3, + recency: 0.1, + tier_proximity: 0.1, + provenance_trust: 0.1, + }, + } + } + + fn policy_with_l1_nvidia(l1: u32) -> PolicyFile { + let mut p = policy_with_l1(l1); + p.applies_to = "nvidia,workstation,vram_mb=30000..36000".into(); + p } // ===== concurrency =====