Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
388 changes: 388 additions & 0 deletions src/workers/continuum-core/src/governor/cascade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,112 @@ pub fn apply_action(current_step: u8, action: CascadeAction) -> u8 {
}
}

// ─── apply_cascade_step_to_policy (PR-3c3) ──────────────────────────

/// Maximum federation pull cadence in seconds. Step 4 advance drops
/// the cadence to this value, slowing federation pulls to once-per-hour
/// when the substrate is under sustained pressure. Per spec.
pub const MAX_FEDERATION_PULL_CADENCE_SECONDS: u32 = 3600;

/// Apply the per-step throttling transformations to a `GovernorPolicy`
/// to produce the next policy. Pure function — same `(base, step)`
/// always produces the same result.
///
/// Per spec §"Adjustment Cascade" table:
///
/// - Step 0: unchanged (normal operation)
/// - Step 1: drop `speculation_aggressiveness` by one notch (toward Off)
/// - Step 2: also `concurrency_caps.personas_concurrent -= 1` (min 1)
/// AND defer non-realtime (sets `cadence_multipliers.delayed` and
/// `.background` to max(current, 2.0))
/// - Step 3: also shrink `tier_sizes.l1_lora_layers` and
/// `tier_sizes.l1_kv_tokens` by 25% (rounded down; min 1)
/// - Step 4: also `federation_pull_cadence.pull_cadence_seconds =
/// MAX_FEDERATION_PULL_CADENCE_SECONDS`
/// - Step 5: also `consolidation_schedule = Manual` (operator must
/// explicitly trigger consolidation; the substrate won't run it on
/// its own under maximum pressure)
///
/// Transformations are CUMULATIVE — step 3 includes step 2's
/// transformations plus step 1's. Apply-at-step-N = apply [step 1, ...
/// step N] to base. Caller passes the BASE policy (the policy as
/// loaded from the file, with cascade_step = 0) so the transformations
/// always start from the same canonical state.
///
/// `policy.cascade_step` is set to the supplied `step` parameter.
/// Other fields (policy_version, hardware_class, committed_at_ms)
/// are passed through unchanged — caller is responsible for bumping
/// version + updating timestamp at publish time.
///
/// ## Anti-oscillation: restore-speculation-one-step-later
///
/// Spec rule per §"Adjustment Cascade": when retreating, restore
/// speculation ONE STEP LATER than the rest of the policy. This
/// function is symmetric (applying step 0 == base policy), so the
/// "one step later" is the CALLER's responsibility: when retreating
/// from N → N-1, call this with `step = N-1` for everything EXCEPT
/// speculation, which uses `step = N` for one more cycle. That logic
/// lives in the wiring layer (PR-3c4 or `LocalSubstrateGovernor`
/// follow-up), not this pure transformation.
pub fn apply_cascade_step_to_policy(
base: &crate::governor::types::GovernorPolicy,
step: u8,
) -> crate::governor::types::GovernorPolicy {
let mut policy = base.clone();
policy.cascade_step = step.min(CASCADE_STEP_MAX);

// Step 1+: speculation drop
if step >= 1 {
policy.speculation_aggressiveness = drop_speculation_level(base.speculation_aggressiveness);
}

// Step 2+: personas_concurrent -= 1, defer non-realtime
if step >= 2 {
policy.concurrency_caps.personas_concurrent =
base.concurrency_caps.personas_concurrent.saturating_sub(1).max(1);
// delayed + background cadence stretched (max with 2.0 so
// already-stretched values aren't shrunk)
policy.cadence_multipliers.delayed = base.cadence_multipliers.delayed.max(2.0);
policy.cadence_multipliers.background = base.cadence_multipliers.background.max(2.0);
}

// Step 3+: shrink l1 by 25%
if step >= 3 {
policy.tier_sizes.l1_lora_layers =
((base.tier_sizes.l1_lora_layers as f32 * 0.75) as u32).max(1);
policy.tier_sizes.l1_kv_tokens =
((base.tier_sizes.l1_kv_tokens as f32 * 0.75) as u32).max(1);
}

// Step 4+: federation cadence to max
if step >= 4 {
policy.federation_pull_cadence.pull_cadence_seconds = MAX_FEDERATION_PULL_CADENCE_SECONDS;
}

// Step 5: consolidation Manual
if step >= 5 {
policy.consolidation_schedule =
crate::governor::types::ConsolidationSchedule::Manual;
}

policy
}

/// Drop the speculation level by one notch toward Off. Pure helper.
/// Off → Off (already minimum), Conservative → Off, Balanced →
/// Conservative, Aggressive → Balanced.
fn drop_speculation_level(
level: crate::governor::types::SpeculationLevel,
) -> crate::governor::types::SpeculationLevel {
use crate::governor::types::SpeculationLevel::*;
match level {
Off => Off,
Conservative => Off,
Balanced => Conservative,
Aggressive => Balanced,
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -758,4 +864,286 @@ mod tests {
);
assert_eq!(action, CascadeAction::EmergencyAdvanceToMax);
}

// ===== apply_cascade_step_to_policy (PR-3c3) =====

use crate::governor::types::{
CadenceMultipliers, ConcurrencyCaps, ConsolidationSchedule, FederationCadence,
GovernorPolicy, HardwareClass, PowerSource, RecallScoreWeights, SpeculationLevel,
TargetSilicon, ThermalClass, TierSizes,
};

fn base_policy_5090() -> GovernorPolicy {
// Approximation of the spec's 5090 anchor policy. Used as the
// canonical base for cascade-step tests.
GovernorPolicy {
policy_version: 1,
hardware_class: HardwareClass {
silicon: TargetSilicon::NvidiaCuda,
silicon_model: "RTX 5090".into(),
vram_mb: 32 * 1024,
system_ram_mb: 64 * 1024,
power_source: PowerSource::Plugged,
thermal_class: ThermalClass::Workstation,
battery_pct: None,
thermal_headroom_pct: None,
},
tier_sizes: TierSizes {
l1_lora_layers: 8,
l1_kv_tokens: 16384,
l2_lora_layers: 16,
l3_lora_layers: 40,
l3_engrams: 10240,
},
cadence_multipliers: CadenceMultipliers {
realtime: 1.0,
delayed: 1.0,
background: 1.5,
},
concurrency_caps: ConcurrencyCaps {
personas_concurrent: 8,
inference_lanes: 4,
foundry_lanes: 1,
sentinel_lanes: 2,
},
speculation_aggressiveness: SpeculationLevel::Aggressive,
consolidation_schedule: ConsolidationSchedule::Idle,
federation_pull_cadence: FederationCadence {
pull_cadence_seconds: 60,
},
recall_score_weights: RecallScoreWeights {
semantic: 0.4,
outcome_history: 0.3,
recency: 0.1,
tier_proximity: 0.1,
provenance_trust: 0.1,
},
cascade_step: 0,
committed_at_ms: 1000,
}
}

/// What this catches: step 0 == base (cascade unchanged, no
/// throttling applied). Identity case — pinning that the function
/// doesn't accidentally modify the base policy when step=0.
#[test]
fn apply_step_0_equals_base_except_cascade_step() {
let base = base_policy_5090();
let after = apply_cascade_step_to_policy(&base, 0);
assert_eq!(after.cascade_step, 0);
assert_eq!(after.speculation_aggressiveness, base.speculation_aggressiveness);
assert_eq!(
after.concurrency_caps.personas_concurrent,
base.concurrency_caps.personas_concurrent
);
assert_eq!(after.tier_sizes.l1_lora_layers, base.tier_sizes.l1_lora_layers);
assert_eq!(after.consolidation_schedule, base.consolidation_schedule);
}

/// What this catches: step 1 drops speculation by one notch.
/// Aggressive → Balanced (then Balanced → Conservative, Conservative
/// → Off via separate base policies in the next test).
#[test]
fn apply_step_1_drops_speculation_aggressive_to_balanced() {
let base = base_policy_5090();
assert_eq!(base.speculation_aggressiveness, SpeculationLevel::Aggressive);
let after = apply_cascade_step_to_policy(&base, 1);
assert_eq!(after.cascade_step, 1);
assert_eq!(after.speculation_aggressiveness, SpeculationLevel::Balanced);
}

/// What this catches: speculation drop ladder covers every variant.
/// Aggressive→Balanced, Balanced→Conservative, Conservative→Off,
/// Off→Off (already minimum).
#[test]
fn apply_step_1_speculation_drops_one_notch_per_variant() {
for (input, expected) in &[
(SpeculationLevel::Aggressive, SpeculationLevel::Balanced),
(SpeculationLevel::Balanced, SpeculationLevel::Conservative),
(SpeculationLevel::Conservative, SpeculationLevel::Off),
(SpeculationLevel::Off, SpeculationLevel::Off),
] {
let mut base = base_policy_5090();
base.speculation_aggressiveness = *input;
let after = apply_cascade_step_to_policy(&base, 1);
assert_eq!(
after.speculation_aggressiveness, *expected,
"from {input:?} should drop to {expected:?}"
);
}
}

/// What this catches: step 2 personas_concurrent decreases by 1
/// (5090 has 8 → step 2 = 7). Cumulative with step 1's speculation
/// drop.
#[test]
fn apply_step_2_drops_personas_concurrent_and_keeps_speculation_drop() {
let base = base_policy_5090();
let after = apply_cascade_step_to_policy(&base, 2);
assert_eq!(after.cascade_step, 2);
assert_eq!(after.concurrency_caps.personas_concurrent, 7); // 8 - 1
// Cumulative: step 1's speculation drop still applies
assert_eq!(after.speculation_aggressiveness, SpeculationLevel::Balanced);
}

/// What this catches: step 2 personas_concurrent floor at 1.
/// Defensive — a base with 1 persona shouldn't go to 0 (kills the
/// inference pool entirely).
#[test]
fn apply_step_2_personas_concurrent_floor_at_one() {
let mut base = base_policy_5090();
base.concurrency_caps.personas_concurrent = 1;
let after = apply_cascade_step_to_policy(&base, 2);
assert_eq!(after.concurrency_caps.personas_concurrent, 1);
}

/// What this catches: step 2 stretches non-realtime cadence
/// multipliers to at least 2.0. Realtime stays unchanged.
#[test]
fn apply_step_2_stretches_non_realtime_cadence() {
let base = base_policy_5090();
let after = apply_cascade_step_to_policy(&base, 2);
assert_eq!(after.cadence_multipliers.realtime, base.cadence_multipliers.realtime);
assert!(after.cadence_multipliers.delayed >= 2.0);
assert!(after.cadence_multipliers.background >= 2.0);
}

/// What this catches: step 2 doesn't SHRINK already-stretched
/// cadence multipliers. If base already has background = 3.0, step
/// 2 keeps 3.0 (uses max).
#[test]
fn apply_step_2_doesnt_shrink_already_stretched_cadence() {
let mut base = base_policy_5090();
base.cadence_multipliers.background = 3.0;
let after = apply_cascade_step_to_policy(&base, 2);
assert_eq!(after.cadence_multipliers.background, 3.0);
}

/// What this catches: step 3 shrinks l1_lora_layers + l1_kv_tokens
/// by ~25%. 8 * 0.75 = 6. 16384 * 0.75 = 12288.
#[test]
fn apply_step_3_shrinks_l1_25_percent() {
let base = base_policy_5090();
let after = apply_cascade_step_to_policy(&base, 3);
assert_eq!(after.cascade_step, 3);
assert_eq!(after.tier_sizes.l1_lora_layers, 6); // 8 * 0.75
assert_eq!(after.tier_sizes.l1_kv_tokens, 12288); // 16384 * 0.75
// L2/L3 untouched at step 3
assert_eq!(after.tier_sizes.l2_lora_layers, base.tier_sizes.l2_lora_layers);
}

/// What this catches: l1 floor at 1 when base is already small.
/// 1 * 0.75 = 0.75 → floor 0 → max(0, 1) = 1.
#[test]
fn apply_step_3_l1_floors_at_one() {
let mut base = base_policy_5090();
base.tier_sizes.l1_lora_layers = 1;
base.tier_sizes.l1_kv_tokens = 1;
let after = apply_cascade_step_to_policy(&base, 3);
assert_eq!(after.tier_sizes.l1_lora_layers, 1);
assert_eq!(after.tier_sizes.l1_kv_tokens, 1);
}

/// What this catches: step 4 federation cadence = max
/// (MAX_FEDERATION_PULL_CADENCE_SECONDS). Slows pulls to once-
/// per-hour under sustained pressure.
#[test]
fn apply_step_4_maxes_federation_cadence() {
let base = base_policy_5090();
assert_eq!(base.federation_pull_cadence.pull_cadence_seconds, 60);
let after = apply_cascade_step_to_policy(&base, 4);
assert_eq!(after.cascade_step, 4);
assert_eq!(
after.federation_pull_cadence.pull_cadence_seconds,
MAX_FEDERATION_PULL_CADENCE_SECONDS
);
}

/// What this catches: step 5 consolidation = Manual. Suspends
/// automatic consolidation under maximum pressure (operator must
/// explicitly trigger; substrate stops doing it on its own).
#[test]
fn apply_step_5_consolidation_manual() {
let base = base_policy_5090();
assert_eq!(base.consolidation_schedule, ConsolidationSchedule::Idle);
let after = apply_cascade_step_to_policy(&base, 5);
assert_eq!(after.cascade_step, 5);
assert_eq!(after.consolidation_schedule, ConsolidationSchedule::Manual);
}

/// What this catches: step 5 is CUMULATIVE — all prior step
/// transformations also applied. Speculation dropped + personas
/// reduced + tier_sizes shrunk + federation maxed + consolidation
/// Manual. The full-throttle state.
#[test]
fn apply_step_5_cumulative_all_transformations() {
let base = base_policy_5090();
let after = apply_cascade_step_to_policy(&base, 5);
// Step 1
assert_eq!(after.speculation_aggressiveness, SpeculationLevel::Balanced);
// Step 2
assert_eq!(after.concurrency_caps.personas_concurrent, 7);
assert!(after.cadence_multipliers.delayed >= 2.0);
// Step 3
assert_eq!(after.tier_sizes.l1_lora_layers, 6);
// Step 4
assert_eq!(
after.federation_pull_cadence.pull_cadence_seconds,
MAX_FEDERATION_PULL_CADENCE_SECONDS
);
// Step 5
assert_eq!(after.consolidation_schedule, ConsolidationSchedule::Manual);
}

/// What this catches: step value > MAX is clamped to MAX. Defensive
/// against caller bugs (passes 7 instead of 5).
#[test]
fn apply_step_above_max_clamps_to_max() {
let base = base_policy_5090();
let after = apply_cascade_step_to_policy(&base, 99);
assert_eq!(after.cascade_step, CASCADE_STEP_MAX);
// Should have all step-5 transformations
assert_eq!(after.consolidation_schedule, ConsolidationSchedule::Manual);
}

/// What this catches: pure-function determinism. Same inputs →
/// same output. Tests pin this so the caller can cache the
/// transformation result if the (base_policy, step) tuple is stable.
#[test]
fn apply_cascade_step_is_deterministic() {
let base = base_policy_5090();
let a = apply_cascade_step_to_policy(&base, 3);
let b = apply_cascade_step_to_policy(&base, 3);
assert_eq!(a, b);
}

/// What this catches: applying step N then step 0 to the result
/// does NOT restore base — the step transformations are NOT
/// reversible from a transformed policy. Caller MUST keep the
/// original base + re-apply step 0 from it (which is what the
/// LocalSubstrateGovernor does — stores base separately from
/// active).
#[test]
fn apply_cascade_step_not_reversible_via_step_0_on_transformed() {
let base = base_policy_5090();
let throttled = apply_cascade_step_to_policy(&base, 3);
let reset_attempt = apply_cascade_step_to_policy(&throttled, 0);
// step is 0 again
assert_eq!(reset_attempt.cascade_step, 0);
// But tier_sizes is STILL shrunk (step 0 doesn't undo step 3's
// shrink — it just doesn't re-apply it from a now-shrunk base).
assert_eq!(
reset_attempt.tier_sizes.l1_lora_layers,
throttled.tier_sizes.l1_lora_layers,
"step 0 from transformed policy ≠ base; caller MUST hold base separately"
);
}

/// What this catches: MAX_FEDERATION_PULL_CADENCE_SECONDS const
/// is the spec's max-cadence value. Drift catcher — if someone
/// tunes this without updating the spec, test fails.
#[test]
fn max_federation_cadence_const_pinned() {
assert_eq!(MAX_FEDERATION_PULL_CADENCE_SECONDS, 3600);
}
}
Loading