Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 236 additions & 21 deletions src/workers/continuum-core/src/governor/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//! ## Scope of PR-3b
//!
//! - `LocalSubstrateGovernor` struct holding `Arc<ArcSwap<GovernorPolicy>>`
//! + `Mutex<GovernorSnapshot>` (snapshot history is mutex-protected;
//! plus `Mutex<GovernorSnapshot>` (snapshot history is mutex-protected;
//! policy reads are arc_swap'd lock-free)
//! - Impl `SubstrateGovernor` trait: `current_policy + on_hardware_detected
//! + on_pressure_signal + snapshot`
Expand Down Expand Up @@ -45,13 +45,30 @@
//! - Policy directory discovery (PR-3d); callers must provide explicit
//! candidates via `set_candidates`

use crate::governor::cascade::{apply_action, evaluate_next_step, CascadeAction, CascadeThresholds};
use crate::governor::policy_selector::{select_policy, PolicySelectionError};
use crate::governor::types::{GovernorPolicy, GovernorSnapshot, HardwareClass, PressureSignal};
use crate::governor::PolicyFile;
use crate::governor::SubstrateGovernor;
use arc_swap::ArcSwap;
use std::sync::{Arc, Mutex};

/// Minimum time the cascade must stay in a step before advancing
/// further. Per spec §"Adjustment Cascade": step 1 must be active
/// for more than 30 seconds before advancing to step 2; same shape
/// for step 2 to 3 (30s), step 3 to 4 (60s). PR-3c2 uses a single
/// conservative value for all transitions; PR-3c3 can per-step-tune
/// if the spec's 30s/30s/60s ladder matters.
///
/// EmergencyAdvanceToMax bypasses this gate entirely — thermal
/// Critical + battery < emergency_pct skip straight to max regardless
/// of time-in-step.
///
/// Retreat is not gated by time-in-step — the cascade may retreat as
/// soon as conditions clear (the all-clear exit threshold IS the
/// hysteresis; doubling-up with a time gate would over-throttle).
pub const MIN_TIME_IN_STEP_MS: u64 = 30_000;

/// Maximum number of recent pressure signals retained in the snapshot.
/// The ring evicts oldest-first. Diagnostic — operators look at the
/// last N events to understand "why did the governor cascade just now."
Expand Down Expand Up @@ -81,23 +98,60 @@ pub struct LocalSubstrateGovernor {
struct SnapshotState {
cascade_transition_count: u64,
recent_signals: Vec<PressureSignal>,
/// Current cascade step. Mirrors `policy.cascade_step` but tracked
/// here separately so the time-in-step gate doesn't have to
/// arc_swap-load the full policy on every signal.
current_step: u8,
/// Unix-ms timestamp the cascade last transitioned (advance or
/// retreat). Used by the time-in-step gate to enforce the spec's
/// "step N must be active > 30s before advancing to step N+1"
/// rule. PR-3c2 uses a single value (`MIN_TIME_IN_STEP_MS`); PR-3c3
/// may per-step-tune if the spec's ladder matters.
last_step_change_ms: u64,
/// Cascade thresholds — used by `evaluate_next_step`. Carried in
/// the state so PR-3c3 can hot-reload them when the policy file
/// changes (PR-3d's file watcher).
thresholds: CascadeThresholds,
}

impl LocalSubstrateGovernor {
/// Construct with an initial policy. The governor starts ready to
/// serve `current_policy()` immediately. `set_candidates` +
/// `on_hardware_detected` can rewrite later.
pub fn new(initial_policy: GovernorPolicy) -> Self {
let initial_step = initial_policy.cascade_step;
Self {
policy: Arc::new(ArcSwap::from(Arc::new(initial_policy))),
candidates: Mutex::new(Vec::new()),
snapshot_state: Mutex::new(SnapshotState {
cascade_transition_count: 0,
recent_signals: Vec::with_capacity(RECENT_SIGNALS_CAPACITY),
current_step: initial_step,
last_step_change_ms: now_unix_ms(),
thresholds: CascadeThresholds::default(),
}),
}
}

/// Override the cascade thresholds (PR-3d wires the policy-file
/// hot-reload path; for PR-3c2 callers can set manually for tests).
pub fn set_thresholds(&self, thresholds: CascadeThresholds) {
let mut state = self
.snapshot_state
.lock()
.expect("LocalSubstrateGovernor snapshot mutex poisoned");
state.thresholds = thresholds;
}

/// Current cascade step. Diagnostic — tests + telemetry consumers
/// can introspect without going through snapshot().
pub fn current_cascade_step(&self) -> u8 {
self.snapshot_state
.lock()
.expect("LocalSubstrateGovernor snapshot mutex poisoned")
.current_step
}

/// Set the pool of candidate policy files used by
/// `on_hardware_detected`. Replaces any prior candidates atomically.
/// PR-3d (file watcher) calls this on file-system change events.
Expand Down Expand Up @@ -155,19 +209,72 @@ impl SubstrateGovernor for LocalSubstrateGovernor {
}

fn on_pressure_signal(&self, signal: PressureSignal) {
let mut state = self
.snapshot_state
.lock()
.expect("LocalSubstrateGovernor snapshot mutex poisoned");
if state.recent_signals.len() >= RECENT_SIGNALS_CAPACITY {
// Drop oldest (front). With a Vec this is O(N) but N=32
// so cost is trivial; using VecDeque would shave a few
// ns but adds an enum-discriminant cost to every read.
state.recent_signals.remove(0);
// PR-3c2 wiring: record signal + evaluate cascade action +
// (conditionally) apply via cascade_step rewrite. The
// time-in-step gate prevents brief spikes from advancing past
// step 1; emergency signals (thermal Critical, battery <
// emergency_pct) bypass the gate per spec.
let now = now_unix_ms();
let mut new_policy_to_publish: Option<GovernorPolicy> = None;

{
let mut state = self
.snapshot_state
.lock()
.expect("LocalSubstrateGovernor snapshot mutex poisoned");

// Record the signal in the ring (existing PR-3b behavior).
if state.recent_signals.len() >= RECENT_SIGNALS_CAPACITY {
state.recent_signals.remove(0);
}
state.recent_signals.push(signal);

// Evaluate cascade action.
let action = evaluate_next_step(state.current_step, &signal, &state.thresholds);

// Time-in-step gate: Advance from a non-zero step requires
// sustained pressure (current step active > MIN_TIME_IN_STEP_MS).
// EmergencyAdvanceToMax bypasses the gate. Retreat is never
// gated by time (hysteresis IS the anti-oscillation).
let gated_action = match action {
CascadeAction::Advance => {
let time_in_step = now.saturating_sub(state.last_step_change_ms);
if state.current_step > 0 && time_in_step < MIN_TIME_IN_STEP_MS {
// Brief spike — hold rather than advance.
CascadeAction::Hold
} else {
action
}
}
_ => action,
};

// Apply the action to the step counter. If it changed,
// build the new policy to publish + update step-change ts.
let new_step = apply_action(state.current_step, gated_action);
if new_step != state.current_step {
state.current_step = new_step;
state.last_step_change_ms = now;
// Snapshot the current policy + bump cascade_step to
// the new value. PR-3c3 will extend this with
// apply_cascade_step_to_policy that rewrites
// tier_sizes / cadence / concurrency / speculation per
// the spec's per-step transformations. For PR-3c2 only
// cascade_step changes; downstream consumers can read
// it + react.
let current = self.policy.load_full();
let mut next_policy: GovernorPolicy = (*current).clone();
next_policy.cascade_step = new_step;
next_policy.policy_version = next_policy.policy_version.saturating_add(1);
next_policy.committed_at_ms = now;
new_policy_to_publish = Some(next_policy);
}
}
// Release the snapshot_state mutex before publishing to keep
// hold time tiny + avoid lock ordering with the policy ArcSwap.
if let Some(policy) = new_policy_to_publish {
self.publish(policy);
}
state.recent_signals.push(signal);
// PR-3c will conditionally bump cascade_transition_count here
// when a signal crosses a threshold. PR-3b just records.
}

fn snapshot(&self) -> GovernorSnapshot {
Expand Down Expand Up @@ -561,23 +668,131 @@ mod tests {
assert_eq!(g.snapshot().cascade_transition_count, 0);
}

/// What this catches: on_pressure_signal does NOT increment
/// cascade_transition_count in PR-3b (signal-recording only; PR-3c
/// adds the threshold-crossing → transition logic). Pinned so PR-3c
/// has to land + update this test together.
/// What this catches (UPDATED in PR-3c2): on_pressure_signal NOW
/// drives transitions via the cascade evaluator. Thermal Critical
/// is an emergency signal — jumps cascade_step to MAX (5)
/// regardless of time-in-step. transition_count increments by 1
/// (one publish from step 0 → step 5).
#[test]
fn pressure_signal_does_not_transition_in_pr3b() {
fn pressure_signal_thermal_critical_emergency_advances() {
let g = LocalSubstrateGovernor::new(initial_policy());
g.on_pressure_signal(PressureSignal::Thermal {
severity: ThermalSeverity::Critical,
});
let snap = g.snapshot();
assert_eq!(snap.cascade_transition_count, 1);
assert_eq!(snap.current_policy.cascade_step, 5, "thermal Critical → EmergencyAdvanceToMax (step 5)");
assert_eq!(g.current_cascade_step(), 5);
}

/// What this catches: from step 0, a single signal exceeding the
/// step-0 → step-1 threshold advances to step 1 immediately. No
/// time-in-step gate for step 0 → step 1 (per spec — brief spikes
/// CAN enter step 1, gate applies to step 1 → 2 and beyond).
#[test]
fn pressure_signal_first_advance_no_gate() {
let g = LocalSubstrateGovernor::new(initial_policy());
g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 });
assert_eq!(g.current_cascade_step(), 1, "step 0 → 1 advance fires immediately");
}

/// What this catches: from step 1, a second-stage-triggering
/// signal arriving in < MIN_TIME_IN_STEP_MS is HELD (downgraded
/// from Advance to Hold). Brief spikes don't escalate.
#[test]
fn pressure_signal_step_1_to_2_gated_by_time_in_step() {
let g = LocalSubstrateGovernor::new(initial_policy());
// Advance to step 1
g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 });
assert_eq!(g.current_cascade_step(), 1);
// Immediately try to advance to step 2 — should be HELD
g.on_pressure_signal(PressureSignal::SystemMemHigh { used_pct: 95 });
assert_eq!(
g.snapshot().cascade_transition_count,
0,
"PR-3b: signal recording only; PR-3c adds threshold-driven transitions"
g.current_cascade_step(),
1,
"step 1 → 2 advance within MIN_TIME_IN_STEP_MS should be Held"
);
}

/// What this catches: EmergencyAdvanceToMax bypasses the time-in-step
/// gate. Even if step 1 was entered 1ms ago, thermal Critical jumps
/// to step 5 immediately. Protects hardware.
#[test]
fn emergency_bypasses_time_in_step_gate() {
let g = LocalSubstrateGovernor::new(initial_policy());
g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 });
assert_eq!(g.current_cascade_step(), 1);
// Emergency immediately after — should jump to 5 not Hold
g.on_pressure_signal(PressureSignal::Thermal {
severity: ThermalSeverity::Critical,
});
assert_eq!(g.current_cascade_step(), 5, "emergency bypasses time-in-step gate");
}

/// What this catches: Retreat is NOT gated by time-in-step. Cascade
/// can retreat as soon as conditions clear (per spec — the hysteresis
/// gap IS the anti-oscillation; doubling-up with a time gate would
/// over-throttle).
#[test]
fn retreat_not_gated_by_time_in_step() {
let g = LocalSubstrateGovernor::new(initial_policy());
g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 });
assert_eq!(g.current_cascade_step(), 1);
// Retreat immediately — should fire even though step 1 was just entered
g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.1 });
assert_eq!(g.current_cascade_step(), 0, "retreat fires regardless of time-in-step");
}

/// What this catches: cascade_step changes on signal-driven
/// transitions DO publish a new policy (policy_version bumps,
/// committed_at_ms updates, cascade_step is the new value).
#[test]
fn signal_driven_transition_publishes_new_policy() {
let g = LocalSubstrateGovernor::new(initial_policy());
let before = g.current_policy();
assert_eq!(before.cascade_step, 0);
let before_version = before.policy_version;

g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 });

let after = g.current_policy();
assert_eq!(after.cascade_step, 1);
assert!(after.policy_version > before_version);
assert!(after.committed_at_ms >= before.committed_at_ms);
}

/// What this catches: signals that don't trigger transitions
/// (e.g. UserActive) do NOT publish a new policy. The
/// recent_signals ring still records, but cascade_transition_count
/// stays.
#[test]
fn non_transitioning_signals_dont_publish() {
let g = LocalSubstrateGovernor::new(initial_policy());
let before_transitions = g.snapshot().cascade_transition_count;
g.on_pressure_signal(PressureSignal::UserActive { foreground: true });
let after_transitions = g.snapshot().cascade_transition_count;
assert_eq!(after_transitions, before_transitions, "UserActive doesn't transition");
assert_eq!(g.snapshot().recent_signals.len(), 1, "but signal IS recorded");
}

/// What this catches: set_thresholds replaces the cascade
/// threshold values used by on_pressure_signal. PR-3d's file
/// watcher uses this to hot-reload policy.
#[test]
fn set_thresholds_changes_evaluation_behavior() {
use crate::governor::cascade::CascadeThresholds;
let g = LocalSubstrateGovernor::new(initial_policy());
// Raise the speculation-advance threshold to 0.9 so 0.7 (which
// would advance with default 0.5) now Holds.
let custom = CascadeThresholds {
spec_miss_rate_advance: 0.9,
..CascadeThresholds::default()
};
g.set_thresholds(custom);
g.on_pressure_signal(PressureSignal::SpeculationMissRate { rate: 0.7 });
assert_eq!(g.current_cascade_step(), 0, "raised threshold means 0.7 no longer advances");
}

// ===== concurrency =====

/// What this catches: many concurrent reads return the current
Expand Down
Loading