From 627864ac820b5b750b5b7fa708466c2f28268b56 Mon Sep 17 00:00:00 2001 From: Test Date: Sat, 16 May 2026 19:32:06 -0500 Subject: [PATCH] =?UTF-8?q?feat(governor):=20Lane=20H=20PR-3c3=20=E2=80=94?= =?UTF-8?q?=20apply=5Fcascade=5Fstep=5Fto=5Fpolicy=20field=20rewrites=20pe?= =?UTF-8?q?r=20spec=20table?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stacks on canary post-#1360 merge. PR-3c2 wired cascade evaluator into on_pressure_signal to update cascade_step. This PR-3c3 ships apply_cascade_step_to_policy — the pure function that ACTUALLY transforms tier_sizes/cadence/concurrency/ speculation/federation/consolidation per the cascade step. Per spec §'Adjustment Cascade' table: - Step 0: unchanged (normal operation) - Step 1: speculation_aggressiveness drops one notch toward Off (Aggressive → Balanced → Conservative → Off → Off) - Step 2: cumulative + personas_concurrent -= 1 (floor 1) + defer non-realtime (cadence_multipliers.delayed/.background = max(current, 2.0)) - Step 3: cumulative + tier_sizes.l1_lora_layers + l1_kv_tokens shrunk to 75% (floor 1) - Step 4: cumulative + federation_pull_cadence.pull_cadence_seconds = MAX_FEDERATION_PULL_CADENCE_SECONDS (3600s = once-per-hour) - Step 5: cumulative + consolidation_schedule = Manual (operator must explicitly trigger; substrate stops on its own under max pressure) Transformations are CUMULATIVE — step N includes all transformations from steps 1..N. Caller passes BASE policy (cascade_step=0) and step; function returns a NEW policy with cascade_step + transformations applied. Caller is responsible for bumping policy_version + updating committed_at_ms at publish time. Pure function — no I/O, no state, no globals. Deterministic. Anti-oscillation note (caller responsibility, documented in fn docstring): the spec's 'restore-speculation-one-step-later' rule lives in the WIRING layer (LocalSubstrateGovernor follow-up), not this pure transformation. When retreating N → N-1, caller applies step N-1 for everything EXCEPT speculation, which uses step N for one more cycle. This separation keeps apply_cascade_step_to_policy a clean deterministic mapping. Also documented (test pins this): apply_cascade_step_to_policy is NOT reversible from a transformed policy. apply(transformed, 0) does NOT restore base — the caller must hold the original base separately and re-apply step 0 from it. LocalSubstrateGovernor will need to evolve to store base + active separately (PR-3c4). Constants: - MAX_FEDERATION_PULL_CADENCE_SECONDS = 3600 (once-per-hour ceiling) Pinned by test to catch silent tuning. Tests: 46 passing on cargo test --lib --features metal,accelerate governor::cascade:: (30 from PR-3c1 + 16 new) NEW (16) for apply_cascade_step_to_policy: - step 0 == base except cascade_step (identity) - step 1 drops Aggressive → Balanced - step 1 covers full speculation ladder (4 variants) - step 2 personas-1 + cumulative speculation drop - step 2 personas floor at 1 (defensive) - step 2 stretches non-realtime cadence (delayed + background → 2.0) - step 2 doesn't shrink already-stretched cadence (max-not-set semantics) - step 3 shrinks l1 by 25% (8→6, 16384→12288) - step 3 l1 floors at 1 (1*0.75=0.75→0→max(0,1)=1) - step 4 federation_pull_cadence_seconds = MAX (60→3600) - step 5 consolidation = Manual - step 5 cumulative — all prior transformations applied - step > MAX clamps to MAX (defensive against caller bugs) - determinism - not reversible from transformed (documented limitation, test pinned) - MAX_FEDERATION_PULL_CADENCE_SECONDS const pinned Stack: - #1345 PR-1 governor-types (MERGED) - #1350 PR-2 TOML loader (MERGED) - #1352 PR-3a policy_selector (MERGED) - #1354 PR-3b LocalSubstrateGovernor (MERGED) - #1356 PR-3c1 cascade evaluator (MERGED) - #1360 PR-3c2 cascade wiring + time-in-step gate (MERGED) - This PR (PR-3c3): apply_cascade_step_to_policy field rewrites - Future PR-3c4: wire apply_cascade_step_to_policy into LocalSubstrateGovernor + restore-speculation-one-step-later semantics + base-vs-active policy split - Future PR-3d: file watcher (notify crate) - Future PR-4: PressureBroker → governor wiring VDD evidence N/A — pure transformation. Evidence with PR-3c4 wiring + PR-4 + downstream consumers reading the throttled policy. Coordination: explicit claim posted to airc 00:25Z; codex on orthogonal VDD work per their 00:25:13Z broadcast. No collision. --- .../continuum-core/src/governor/cascade.rs | 388 ++++++++++++++++++ 1 file changed, 388 insertions(+) diff --git a/src/workers/continuum-core/src/governor/cascade.rs b/src/workers/continuum-core/src/governor/cascade.rs index fda3ca4ea..618a1fca5 100644 --- a/src/workers/continuum-core/src/governor/cascade.rs +++ b/src/workers/continuum-core/src/governor/cascade.rs @@ -355,6 +355,112 @@ pub fn apply_action(current_step: u8, action: CascadeAction) -> u8 { } } +// ─── apply_cascade_step_to_policy (PR-3c3) ────────────────────────── + +/// Maximum federation pull cadence in seconds. Step 4 advance drops +/// the cadence to this value, slowing federation pulls to once-per-hour +/// when the substrate is under sustained pressure. Per spec. +pub const MAX_FEDERATION_PULL_CADENCE_SECONDS: u32 = 3600; + +/// Apply the per-step throttling transformations to a `GovernorPolicy` +/// to produce the next policy. Pure function — same `(base, step)` +/// always produces the same result. +/// +/// Per spec §"Adjustment Cascade" table: +/// +/// - Step 0: unchanged (normal operation) +/// - Step 1: drop `speculation_aggressiveness` by one notch (toward Off) +/// - Step 2: also `concurrency_caps.personas_concurrent -= 1` (min 1) +/// AND defer non-realtime (sets `cadence_multipliers.delayed` and +/// `.background` to max(current, 2.0)) +/// - Step 3: also shrink `tier_sizes.l1_lora_layers` and +/// `tier_sizes.l1_kv_tokens` by 25% (rounded down; min 1) +/// - Step 4: also `federation_pull_cadence.pull_cadence_seconds = +/// MAX_FEDERATION_PULL_CADENCE_SECONDS` +/// - Step 5: also `consolidation_schedule = Manual` (operator must +/// explicitly trigger consolidation; the substrate won't run it on +/// its own under maximum pressure) +/// +/// Transformations are CUMULATIVE — step 3 includes step 2's +/// transformations plus step 1's. Apply-at-step-N = apply [step 1, ... +/// step N] to base. Caller passes the BASE policy (the policy as +/// loaded from the file, with cascade_step = 0) so the transformations +/// always start from the same canonical state. +/// +/// `policy.cascade_step` is set to the supplied `step` parameter. +/// Other fields (policy_version, hardware_class, committed_at_ms) +/// are passed through unchanged — caller is responsible for bumping +/// version + updating timestamp at publish time. +/// +/// ## Anti-oscillation: restore-speculation-one-step-later +/// +/// Spec rule per §"Adjustment Cascade": when retreating, restore +/// speculation ONE STEP LATER than the rest of the policy. This +/// function is symmetric (applying step 0 == base policy), so the +/// "one step later" is the CALLER's responsibility: when retreating +/// from N → N-1, call this with `step = N-1` for everything EXCEPT +/// speculation, which uses `step = N` for one more cycle. That logic +/// lives in the wiring layer (PR-3c4 or `LocalSubstrateGovernor` +/// follow-up), not this pure transformation. +pub fn apply_cascade_step_to_policy( + base: &crate::governor::types::GovernorPolicy, + step: u8, +) -> crate::governor::types::GovernorPolicy { + let mut policy = base.clone(); + policy.cascade_step = step.min(CASCADE_STEP_MAX); + + // Step 1+: speculation drop + if step >= 1 { + policy.speculation_aggressiveness = drop_speculation_level(base.speculation_aggressiveness); + } + + // Step 2+: personas_concurrent -= 1, defer non-realtime + if step >= 2 { + policy.concurrency_caps.personas_concurrent = + base.concurrency_caps.personas_concurrent.saturating_sub(1).max(1); + // delayed + background cadence stretched (max with 2.0 so + // already-stretched values aren't shrunk) + policy.cadence_multipliers.delayed = base.cadence_multipliers.delayed.max(2.0); + policy.cadence_multipliers.background = base.cadence_multipliers.background.max(2.0); + } + + // Step 3+: shrink l1 by 25% + if step >= 3 { + policy.tier_sizes.l1_lora_layers = + ((base.tier_sizes.l1_lora_layers as f32 * 0.75) as u32).max(1); + policy.tier_sizes.l1_kv_tokens = + ((base.tier_sizes.l1_kv_tokens as f32 * 0.75) as u32).max(1); + } + + // Step 4+: federation cadence to max + if step >= 4 { + policy.federation_pull_cadence.pull_cadence_seconds = MAX_FEDERATION_PULL_CADENCE_SECONDS; + } + + // Step 5: consolidation Manual + if step >= 5 { + policy.consolidation_schedule = + crate::governor::types::ConsolidationSchedule::Manual; + } + + policy +} + +/// Drop the speculation level by one notch toward Off. Pure helper. +/// Off → Off (already minimum), Conservative → Off, Balanced → +/// Conservative, Aggressive → Balanced. +fn drop_speculation_level( + level: crate::governor::types::SpeculationLevel, +) -> crate::governor::types::SpeculationLevel { + use crate::governor::types::SpeculationLevel::*; + match level { + Off => Off, + Conservative => Off, + Balanced => Conservative, + Aggressive => Balanced, + } +} + #[cfg(test)] mod tests { use super::*; @@ -758,4 +864,286 @@ mod tests { ); assert_eq!(action, CascadeAction::EmergencyAdvanceToMax); } + + // ===== apply_cascade_step_to_policy (PR-3c3) ===== + + use crate::governor::types::{ + CadenceMultipliers, ConcurrencyCaps, ConsolidationSchedule, FederationCadence, + GovernorPolicy, HardwareClass, PowerSource, RecallScoreWeights, SpeculationLevel, + TargetSilicon, ThermalClass, TierSizes, + }; + + fn base_policy_5090() -> GovernorPolicy { + // Approximation of the spec's 5090 anchor policy. Used as the + // canonical base for cascade-step tests. + GovernorPolicy { + policy_version: 1, + hardware_class: HardwareClass { + silicon: TargetSilicon::NvidiaCuda, + silicon_model: "RTX 5090".into(), + vram_mb: 32 * 1024, + system_ram_mb: 64 * 1024, + power_source: PowerSource::Plugged, + thermal_class: ThermalClass::Workstation, + battery_pct: None, + thermal_headroom_pct: None, + }, + tier_sizes: TierSizes { + l1_lora_layers: 8, + l1_kv_tokens: 16384, + l2_lora_layers: 16, + l3_lora_layers: 40, + l3_engrams: 10240, + }, + cadence_multipliers: CadenceMultipliers { + realtime: 1.0, + delayed: 1.0, + background: 1.5, + }, + concurrency_caps: ConcurrencyCaps { + personas_concurrent: 8, + inference_lanes: 4, + foundry_lanes: 1, + sentinel_lanes: 2, + }, + speculation_aggressiveness: SpeculationLevel::Aggressive, + consolidation_schedule: ConsolidationSchedule::Idle, + federation_pull_cadence: FederationCadence { + pull_cadence_seconds: 60, + }, + recall_score_weights: RecallScoreWeights { + semantic: 0.4, + outcome_history: 0.3, + recency: 0.1, + tier_proximity: 0.1, + provenance_trust: 0.1, + }, + cascade_step: 0, + committed_at_ms: 1000, + } + } + + /// What this catches: step 0 == base (cascade unchanged, no + /// throttling applied). Identity case — pinning that the function + /// doesn't accidentally modify the base policy when step=0. + #[test] + fn apply_step_0_equals_base_except_cascade_step() { + let base = base_policy_5090(); + let after = apply_cascade_step_to_policy(&base, 0); + assert_eq!(after.cascade_step, 0); + assert_eq!(after.speculation_aggressiveness, base.speculation_aggressiveness); + assert_eq!( + after.concurrency_caps.personas_concurrent, + base.concurrency_caps.personas_concurrent + ); + assert_eq!(after.tier_sizes.l1_lora_layers, base.tier_sizes.l1_lora_layers); + assert_eq!(after.consolidation_schedule, base.consolidation_schedule); + } + + /// What this catches: step 1 drops speculation by one notch. + /// Aggressive → Balanced (then Balanced → Conservative, Conservative + /// → Off via separate base policies in the next test). + #[test] + fn apply_step_1_drops_speculation_aggressive_to_balanced() { + let base = base_policy_5090(); + assert_eq!(base.speculation_aggressiveness, SpeculationLevel::Aggressive); + let after = apply_cascade_step_to_policy(&base, 1); + assert_eq!(after.cascade_step, 1); + assert_eq!(after.speculation_aggressiveness, SpeculationLevel::Balanced); + } + + /// What this catches: speculation drop ladder covers every variant. + /// Aggressive→Balanced, Balanced→Conservative, Conservative→Off, + /// Off→Off (already minimum). + #[test] + fn apply_step_1_speculation_drops_one_notch_per_variant() { + for (input, expected) in &[ + (SpeculationLevel::Aggressive, SpeculationLevel::Balanced), + (SpeculationLevel::Balanced, SpeculationLevel::Conservative), + (SpeculationLevel::Conservative, SpeculationLevel::Off), + (SpeculationLevel::Off, SpeculationLevel::Off), + ] { + let mut base = base_policy_5090(); + base.speculation_aggressiveness = *input; + let after = apply_cascade_step_to_policy(&base, 1); + assert_eq!( + after.speculation_aggressiveness, *expected, + "from {input:?} should drop to {expected:?}" + ); + } + } + + /// What this catches: step 2 personas_concurrent decreases by 1 + /// (5090 has 8 → step 2 = 7). Cumulative with step 1's speculation + /// drop. + #[test] + fn apply_step_2_drops_personas_concurrent_and_keeps_speculation_drop() { + let base = base_policy_5090(); + let after = apply_cascade_step_to_policy(&base, 2); + assert_eq!(after.cascade_step, 2); + assert_eq!(after.concurrency_caps.personas_concurrent, 7); // 8 - 1 + // Cumulative: step 1's speculation drop still applies + assert_eq!(after.speculation_aggressiveness, SpeculationLevel::Balanced); + } + + /// What this catches: step 2 personas_concurrent floor at 1. + /// Defensive — a base with 1 persona shouldn't go to 0 (kills the + /// inference pool entirely). + #[test] + fn apply_step_2_personas_concurrent_floor_at_one() { + let mut base = base_policy_5090(); + base.concurrency_caps.personas_concurrent = 1; + let after = apply_cascade_step_to_policy(&base, 2); + assert_eq!(after.concurrency_caps.personas_concurrent, 1); + } + + /// What this catches: step 2 stretches non-realtime cadence + /// multipliers to at least 2.0. Realtime stays unchanged. + #[test] + fn apply_step_2_stretches_non_realtime_cadence() { + let base = base_policy_5090(); + let after = apply_cascade_step_to_policy(&base, 2); + assert_eq!(after.cadence_multipliers.realtime, base.cadence_multipliers.realtime); + assert!(after.cadence_multipliers.delayed >= 2.0); + assert!(after.cadence_multipliers.background >= 2.0); + } + + /// What this catches: step 2 doesn't SHRINK already-stretched + /// cadence multipliers. If base already has background = 3.0, step + /// 2 keeps 3.0 (uses max). + #[test] + fn apply_step_2_doesnt_shrink_already_stretched_cadence() { + let mut base = base_policy_5090(); + base.cadence_multipliers.background = 3.0; + let after = apply_cascade_step_to_policy(&base, 2); + assert_eq!(after.cadence_multipliers.background, 3.0); + } + + /// What this catches: step 3 shrinks l1_lora_layers + l1_kv_tokens + /// by ~25%. 8 * 0.75 = 6. 16384 * 0.75 = 12288. + #[test] + fn apply_step_3_shrinks_l1_25_percent() { + let base = base_policy_5090(); + let after = apply_cascade_step_to_policy(&base, 3); + assert_eq!(after.cascade_step, 3); + assert_eq!(after.tier_sizes.l1_lora_layers, 6); // 8 * 0.75 + assert_eq!(after.tier_sizes.l1_kv_tokens, 12288); // 16384 * 0.75 + // L2/L3 untouched at step 3 + assert_eq!(after.tier_sizes.l2_lora_layers, base.tier_sizes.l2_lora_layers); + } + + /// What this catches: l1 floor at 1 when base is already small. + /// 1 * 0.75 = 0.75 → floor 0 → max(0, 1) = 1. + #[test] + fn apply_step_3_l1_floors_at_one() { + let mut base = base_policy_5090(); + base.tier_sizes.l1_lora_layers = 1; + base.tier_sizes.l1_kv_tokens = 1; + let after = apply_cascade_step_to_policy(&base, 3); + assert_eq!(after.tier_sizes.l1_lora_layers, 1); + assert_eq!(after.tier_sizes.l1_kv_tokens, 1); + } + + /// What this catches: step 4 federation cadence = max + /// (MAX_FEDERATION_PULL_CADENCE_SECONDS). Slows pulls to once- + /// per-hour under sustained pressure. + #[test] + fn apply_step_4_maxes_federation_cadence() { + let base = base_policy_5090(); + assert_eq!(base.federation_pull_cadence.pull_cadence_seconds, 60); + let after = apply_cascade_step_to_policy(&base, 4); + assert_eq!(after.cascade_step, 4); + assert_eq!( + after.federation_pull_cadence.pull_cadence_seconds, + MAX_FEDERATION_PULL_CADENCE_SECONDS + ); + } + + /// What this catches: step 5 consolidation = Manual. Suspends + /// automatic consolidation under maximum pressure (operator must + /// explicitly trigger; substrate stops doing it on its own). + #[test] + fn apply_step_5_consolidation_manual() { + let base = base_policy_5090(); + assert_eq!(base.consolidation_schedule, ConsolidationSchedule::Idle); + let after = apply_cascade_step_to_policy(&base, 5); + assert_eq!(after.cascade_step, 5); + assert_eq!(after.consolidation_schedule, ConsolidationSchedule::Manual); + } + + /// What this catches: step 5 is CUMULATIVE — all prior step + /// transformations also applied. Speculation dropped + personas + /// reduced + tier_sizes shrunk + federation maxed + consolidation + /// Manual. The full-throttle state. + #[test] + fn apply_step_5_cumulative_all_transformations() { + let base = base_policy_5090(); + let after = apply_cascade_step_to_policy(&base, 5); + // Step 1 + assert_eq!(after.speculation_aggressiveness, SpeculationLevel::Balanced); + // Step 2 + assert_eq!(after.concurrency_caps.personas_concurrent, 7); + assert!(after.cadence_multipliers.delayed >= 2.0); + // Step 3 + assert_eq!(after.tier_sizes.l1_lora_layers, 6); + // Step 4 + assert_eq!( + after.federation_pull_cadence.pull_cadence_seconds, + MAX_FEDERATION_PULL_CADENCE_SECONDS + ); + // Step 5 + assert_eq!(after.consolidation_schedule, ConsolidationSchedule::Manual); + } + + /// What this catches: step value > MAX is clamped to MAX. Defensive + /// against caller bugs (passes 7 instead of 5). + #[test] + fn apply_step_above_max_clamps_to_max() { + let base = base_policy_5090(); + let after = apply_cascade_step_to_policy(&base, 99); + assert_eq!(after.cascade_step, CASCADE_STEP_MAX); + // Should have all step-5 transformations + assert_eq!(after.consolidation_schedule, ConsolidationSchedule::Manual); + } + + /// What this catches: pure-function determinism. Same inputs → + /// same output. Tests pin this so the caller can cache the + /// transformation result if the (base_policy, step) tuple is stable. + #[test] + fn apply_cascade_step_is_deterministic() { + let base = base_policy_5090(); + let a = apply_cascade_step_to_policy(&base, 3); + let b = apply_cascade_step_to_policy(&base, 3); + assert_eq!(a, b); + } + + /// What this catches: applying step N then step 0 to the result + /// does NOT restore base — the step transformations are NOT + /// reversible from a transformed policy. Caller MUST keep the + /// original base + re-apply step 0 from it (which is what the + /// LocalSubstrateGovernor does — stores base separately from + /// active). + #[test] + fn apply_cascade_step_not_reversible_via_step_0_on_transformed() { + let base = base_policy_5090(); + let throttled = apply_cascade_step_to_policy(&base, 3); + let reset_attempt = apply_cascade_step_to_policy(&throttled, 0); + // step is 0 again + assert_eq!(reset_attempt.cascade_step, 0); + // But tier_sizes is STILL shrunk (step 0 doesn't undo step 3's + // shrink — it just doesn't re-apply it from a now-shrunk base). + assert_eq!( + reset_attempt.tier_sizes.l1_lora_layers, + throttled.tier_sizes.l1_lora_layers, + "step 0 from transformed policy ≠ base; caller MUST hold base separately" + ); + } + + /// What this catches: MAX_FEDERATION_PULL_CADENCE_SECONDS const + /// is the spec's max-cadence value. Drift catcher — if someone + /// tunes this without updating the spec, test fails. + #[test] + fn max_federation_cadence_const_pinned() { + assert_eq!(MAX_FEDERATION_PULL_CADENCE_SECONDS, 3600); + } }