From 645467f66d08abed7c1efd8e7a1f5bcfd4b92ec5 Mon Sep 17 00:00:00 2001 From: Test Date: Sat, 16 May 2026 19:11:59 -0500 Subject: [PATCH] =?UTF-8?q?feat(governor):=20Lane=20H=20PR-3c2=20=E2=80=94?= =?UTF-8?q?=20wire=20cascade=20evaluator=20into=20on=5Fpressure=5Fsignal?= =?UTF-8?q?=20+=20time-in-step=20gate?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../continuum-core/src/governor/local.rs | 257 ++++++++++++++++-- 1 file changed, 236 insertions(+), 21 deletions(-) diff --git a/src/workers/continuum-core/src/governor/local.rs b/src/workers/continuum-core/src/governor/local.rs index 2002fb934..b4427a277 100644 --- a/src/workers/continuum-core/src/governor/local.rs +++ b/src/workers/continuum-core/src/governor/local.rs @@ -10,7 +10,7 @@ //! ## Scope of PR-3b //! //! - `LocalSubstrateGovernor` struct holding `Arc>` -//! + `Mutex` (snapshot history is mutex-protected; +//! plus `Mutex` (snapshot history is mutex-protected; //! policy reads are arc_swap'd lock-free) //! - Impl `SubstrateGovernor` trait: `current_policy + on_hardware_detected //! + on_pressure_signal + snapshot` @@ -45,6 +45,7 @@ //! - Policy directory discovery (PR-3d); callers must provide explicit //! candidates via `set_candidates` +use crate::governor::cascade::{apply_action, evaluate_next_step, CascadeAction, CascadeThresholds}; use crate::governor::policy_selector::{select_policy, PolicySelectionError}; use crate::governor::types::{GovernorPolicy, GovernorSnapshot, HardwareClass, PressureSignal}; use crate::governor::PolicyFile; @@ -52,6 +53,22 @@ use crate::governor::SubstrateGovernor; use arc_swap::ArcSwap; use std::sync::{Arc, Mutex}; +/// Minimum time the cascade must stay in a step before advancing +/// further. Per spec §"Adjustment Cascade": step 1 must be active +/// for more than 30 seconds before advancing to step 2; same shape +/// for step 2 to 3 (30s), step 3 to 4 (60s). PR-3c2 uses a single +/// conservative value for all transitions; PR-3c3 can per-step-tune +/// if the spec's 30s/30s/60s ladder matters. +/// +/// EmergencyAdvanceToMax bypasses this gate entirely — thermal +/// Critical + battery < emergency_pct skip straight to max regardless +/// of time-in-step. +/// +/// Retreat is not gated by time-in-step — the cascade may retreat as +/// soon as conditions clear (the all-clear exit threshold IS the +/// hysteresis; doubling-up with a time gate would over-throttle). +pub const MIN_TIME_IN_STEP_MS: u64 = 30_000; + /// Maximum number of recent pressure signals retained in the snapshot. /// The ring evicts oldest-first. Diagnostic — operators look at the /// last N events to understand "why did the governor cascade just now." @@ -81,6 +98,20 @@ pub struct LocalSubstrateGovernor { struct SnapshotState { cascade_transition_count: u64, recent_signals: Vec, + /// Current cascade step. Mirrors `policy.cascade_step` but tracked + /// here separately so the time-in-step gate doesn't have to + /// arc_swap-load the full policy on every signal. + current_step: u8, + /// Unix-ms timestamp the cascade last transitioned (advance or + /// retreat). Used by the time-in-step gate to enforce the spec's + /// "step N must be active > 30s before advancing to step N+1" + /// rule. PR-3c2 uses a single value (`MIN_TIME_IN_STEP_MS`); PR-3c3 + /// may per-step-tune if the spec's ladder matters. + last_step_change_ms: u64, + /// Cascade thresholds — used by `evaluate_next_step`. Carried in + /// the state so PR-3c3 can hot-reload them when the policy file + /// changes (PR-3d's file watcher). + thresholds: CascadeThresholds, } impl LocalSubstrateGovernor { @@ -88,16 +119,39 @@ impl LocalSubstrateGovernor { /// serve `current_policy()` immediately. `set_candidates` + /// `on_hardware_detected` can rewrite later. pub fn new(initial_policy: GovernorPolicy) -> Self { + let initial_step = initial_policy.cascade_step; Self { policy: Arc::new(ArcSwap::from(Arc::new(initial_policy))), candidates: Mutex::new(Vec::new()), snapshot_state: Mutex::new(SnapshotState { cascade_transition_count: 0, recent_signals: Vec::with_capacity(RECENT_SIGNALS_CAPACITY), + current_step: initial_step, + last_step_change_ms: now_unix_ms(), + thresholds: CascadeThresholds::default(), }), } } + /// Override the cascade thresholds (PR-3d wires the policy-file + /// hot-reload path; for PR-3c2 callers can set manually for tests). + pub fn set_thresholds(&self, thresholds: CascadeThresholds) { + let mut state = self + .snapshot_state + .lock() + .expect("LocalSubstrateGovernor snapshot mutex poisoned"); + state.thresholds = thresholds; + } + + /// Current cascade step. Diagnostic — tests + telemetry consumers + /// can introspect without going through snapshot(). + pub fn current_cascade_step(&self) -> u8 { + self.snapshot_state + .lock() + .expect("LocalSubstrateGovernor snapshot mutex poisoned") + .current_step + } + /// Set the pool of candidate policy files used by /// `on_hardware_detected`. Replaces any prior candidates atomically. /// PR-3d (file watcher) calls this on file-system change events. @@ -155,19 +209,72 @@ impl SubstrateGovernor for LocalSubstrateGovernor { } fn on_pressure_signal(&self, signal: PressureSignal) { - let mut state = self - .snapshot_state - .lock() - .expect("LocalSubstrateGovernor snapshot mutex poisoned"); - if state.recent_signals.len() >= RECENT_SIGNALS_CAPACITY { - // Drop oldest (front). With a Vec this is O(N) but N=32 - // so cost is trivial; using VecDeque would shave a few - // ns but adds an enum-discriminant cost to every read. - state.recent_signals.remove(0); + // PR-3c2 wiring: record signal + evaluate cascade action + + // (conditionally) apply via cascade_step rewrite. The + // time-in-step gate prevents brief spikes from advancing past + // step 1; emergency signals (thermal Critical, battery < + // emergency_pct) bypass the gate per spec. + let now = now_unix_ms(); + let mut new_policy_to_publish: Option = None; + + { + let mut state = self + .snapshot_state + .lock() + .expect("LocalSubstrateGovernor snapshot mutex poisoned"); + + // Record the signal in the ring (existing PR-3b behavior). + if state.recent_signals.len() >= RECENT_SIGNALS_CAPACITY { + state.recent_signals.remove(0); + } + state.recent_signals.push(signal); + + // Evaluate cascade action. + let action = evaluate_next_step(state.current_step, &signal, &state.thresholds); + + // Time-in-step gate: Advance from a non-zero step requires + // sustained pressure (current step active > MIN_TIME_IN_STEP_MS). + // EmergencyAdvanceToMax bypasses the gate. Retreat is never + // gated by time (hysteresis IS the anti-oscillation). + let gated_action = match action { + CascadeAction::Advance => { + let time_in_step = now.saturating_sub(state.last_step_change_ms); + if state.current_step > 0 && time_in_step < MIN_TIME_IN_STEP_MS { + // Brief spike — hold rather than advance. + CascadeAction::Hold + } else { + action + } + } + _ => action, + }; + + // Apply the action to the step counter. If it changed, + // build the new policy to publish + update step-change ts. + let new_step = apply_action(state.current_step, gated_action); + if new_step != state.current_step { + state.current_step = new_step; + state.last_step_change_ms = now; + // Snapshot the current policy + bump cascade_step to + // the new value. PR-3c3 will extend this with + // apply_cascade_step_to_policy that rewrites + // tier_sizes / cadence / concurrency / speculation per + // the spec's per-step transformations. For PR-3c2 only + // cascade_step changes; downstream consumers can read + // it + react. + let current = self.policy.load_full(); + let mut next_policy: GovernorPolicy = (*current).clone(); + next_policy.cascade_step = new_step; + next_policy.policy_version = next_policy.policy_version.saturating_add(1); + next_policy.committed_at_ms = now; + new_policy_to_publish = Some(next_policy); + } + } + // Release the snapshot_state mutex before publishing to keep + // hold time tiny + avoid lock ordering with the policy ArcSwap. + if let Some(policy) = new_policy_to_publish { + self.publish(policy); } - state.recent_signals.push(signal); - // PR-3c will conditionally bump cascade_transition_count here - // when a signal crosses a threshold. PR-3b just records. } fn snapshot(&self) -> GovernorSnapshot { @@ -561,23 +668,131 @@ mod tests { assert_eq!(g.snapshot().cascade_transition_count, 0); } - /// What this catches: on_pressure_signal does NOT increment - /// cascade_transition_count in PR-3b (signal-recording only; PR-3c - /// adds the threshold-crossing → transition logic). Pinned so PR-3c - /// has to land + update this test together. + /// What this catches (UPDATED in PR-3c2): on_pressure_signal NOW + /// drives transitions via the cascade evaluator. Thermal Critical + /// is an emergency signal — jumps cascade_step to MAX (5) + /// regardless of time-in-step. transition_count increments by 1 + /// (one publish from step 0 → step 5). #[test] - fn pressure_signal_does_not_transition_in_pr3b() { + fn pressure_signal_thermal_critical_emergency_advances() { let g = LocalSubstrateGovernor::new(initial_policy()); g.on_pressure_signal(PressureSignal::Thermal { severity: ThermalSeverity::Critical, }); + let snap = g.snapshot(); + assert_eq!(snap.cascade_transition_count, 1); + assert_eq!(snap.current_policy.cascade_step, 5, "thermal Critical → EmergencyAdvanceToMax (step 5)"); + assert_eq!(g.current_cascade_step(), 5); + } + + /// What this catches: from step 0, a single signal exceeding the + /// step-0 → step-1 threshold advances to step 1 immediately. No + /// time-in-step gate for step 0 → step 1 (per spec — brief spikes + /// CAN enter step 1, gate applies to step 1 → 2 and beyond). + #[test] + fn pressure_signal_first_advance_no_gate() { + let g = LocalSubstrateGovernor::new(initial_policy()); + g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 }); + assert_eq!(g.current_cascade_step(), 1, "step 0 → 1 advance fires immediately"); + } + + /// What this catches: from step 1, a second-stage-triggering + /// signal arriving in < MIN_TIME_IN_STEP_MS is HELD (downgraded + /// from Advance to Hold). Brief spikes don't escalate. + #[test] + fn pressure_signal_step_1_to_2_gated_by_time_in_step() { + let g = LocalSubstrateGovernor::new(initial_policy()); + // Advance to step 1 + g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 }); + assert_eq!(g.current_cascade_step(), 1); + // Immediately try to advance to step 2 — should be HELD + g.on_pressure_signal(PressureSignal::SystemMemHigh { used_pct: 95 }); assert_eq!( - g.snapshot().cascade_transition_count, - 0, - "PR-3b: signal recording only; PR-3c adds threshold-driven transitions" + g.current_cascade_step(), + 1, + "step 1 → 2 advance within MIN_TIME_IN_STEP_MS should be Held" ); } + /// What this catches: EmergencyAdvanceToMax bypasses the time-in-step + /// gate. Even if step 1 was entered 1ms ago, thermal Critical jumps + /// to step 5 immediately. Protects hardware. + #[test] + fn emergency_bypasses_time_in_step_gate() { + let g = LocalSubstrateGovernor::new(initial_policy()); + g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 }); + assert_eq!(g.current_cascade_step(), 1); + // Emergency immediately after — should jump to 5 not Hold + g.on_pressure_signal(PressureSignal::Thermal { + severity: ThermalSeverity::Critical, + }); + assert_eq!(g.current_cascade_step(), 5, "emergency bypasses time-in-step gate"); + } + + /// What this catches: Retreat is NOT gated by time-in-step. Cascade + /// can retreat as soon as conditions clear (per spec — the hysteresis + /// gap IS the anti-oscillation; doubling-up with a time gate would + /// over-throttle). + #[test] + fn retreat_not_gated_by_time_in_step() { + let g = LocalSubstrateGovernor::new(initial_policy()); + g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 }); + assert_eq!(g.current_cascade_step(), 1); + // Retreat immediately — should fire even though step 1 was just entered + g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.1 }); + assert_eq!(g.current_cascade_step(), 0, "retreat fires regardless of time-in-step"); + } + + /// What this catches: cascade_step changes on signal-driven + /// transitions DO publish a new policy (policy_version bumps, + /// committed_at_ms updates, cascade_step is the new value). + #[test] + fn signal_driven_transition_publishes_new_policy() { + let g = LocalSubstrateGovernor::new(initial_policy()); + let before = g.current_policy(); + assert_eq!(before.cascade_step, 0); + let before_version = before.policy_version; + + g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 }); + + let after = g.current_policy(); + assert_eq!(after.cascade_step, 1); + assert!(after.policy_version > before_version); + assert!(after.committed_at_ms >= before.committed_at_ms); + } + + /// What this catches: signals that don't trigger transitions + /// (e.g. UserActive) do NOT publish a new policy. The + /// recent_signals ring still records, but cascade_transition_count + /// stays. + #[test] + fn non_transitioning_signals_dont_publish() { + let g = LocalSubstrateGovernor::new(initial_policy()); + let before_transitions = g.snapshot().cascade_transition_count; + g.on_pressure_signal(PressureSignal::UserActive { foreground: true }); + let after_transitions = g.snapshot().cascade_transition_count; + assert_eq!(after_transitions, before_transitions, "UserActive doesn't transition"); + assert_eq!(g.snapshot().recent_signals.len(), 1, "but signal IS recorded"); + } + + /// What this catches: set_thresholds replaces the cascade + /// threshold values used by on_pressure_signal. PR-3d's file + /// watcher uses this to hot-reload policy. + #[test] + fn set_thresholds_changes_evaluation_behavior() { + use crate::governor::cascade::CascadeThresholds; + let g = LocalSubstrateGovernor::new(initial_policy()); + // Raise the speculation-advance threshold to 0.9 so 0.7 (which + // would advance with default 0.5) now Holds. + let custom = CascadeThresholds { + spec_miss_rate_advance: 0.9, + ..CascadeThresholds::default() + }; + g.set_thresholds(custom); + g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 }); + assert_eq!(g.current_cascade_step(), 0, "raised threshold means 0.7 no longer advances"); + } + // ===== concurrency ===== /// What this catches: many concurrent reads return the current