diff --git a/src/shared/generated/runtime/CadenceHint.ts b/src/shared/generated/runtime/CadenceHint.ts new file mode 100644 index 000000000..399eaac96 --- /dev/null +++ b/src/shared/generated/runtime/CadenceHint.ts @@ -0,0 +1,8 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +/** + * A hint a region can pass back to the governor about preferred next + * tick cadence. The governor may honor or override; it owns the + * final policy. + */ +export type CadenceHint = "faster" | "hold" | "slower" | "sleep"; diff --git a/src/shared/generated/runtime/ComputeClass.ts b/src/shared/generated/runtime/ComputeClass.ts new file mode 100644 index 000000000..056eaf3eb --- /dev/null +++ b/src/shared/generated/runtime/ComputeClass.ts @@ -0,0 +1,7 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +/** + * Compute footprint class. Drives governor decisions about which + * regions to throttle first under compute/thermal pressure. + */ +export type ComputeClass = "bookkeeping" | "cpu" | "cpu-vectorized" | "inference-light" | "inference-heavy"; diff --git a/src/shared/generated/runtime/MemoryClass.ts b/src/shared/generated/runtime/MemoryClass.ts new file mode 100644 index 000000000..8de62f074 --- /dev/null +++ b/src/shared/generated/runtime/MemoryClass.ts @@ -0,0 +1,7 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +/** + * Memory footprint class. Drives governor decisions about which + * regions to throttle first under memory pressure. + */ +export type MemoryClass = "light" | "moderate" | "heavy" | "vram-sensitive"; diff --git a/src/shared/generated/runtime/PersonaLifecycle.ts b/src/shared/generated/runtime/PersonaLifecycle.ts new file mode 100644 index 000000000..578ba7747 --- /dev/null +++ b/src/shared/generated/runtime/PersonaLifecycle.ts @@ -0,0 +1,7 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +/** + * Persona lifecycle events relevant to regions (allow regions to + * allocate / deallocate per-persona state). + */ +export type PersonaLifecycle = { "kind": "created", persona_id: string, } | { "kind": "destroyed", persona_id: string, }; diff --git a/src/shared/generated/runtime/PressureLevel.ts b/src/shared/generated/runtime/PressureLevel.ts new file mode 100644 index 000000000..948634b6e --- /dev/null +++ b/src/shared/generated/runtime/PressureLevel.ts @@ -0,0 +1,7 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +/** + * Coarse system pressure level surfaced to regions so they can adjust + * internally without parsing every PressureSignal variant. + */ +export type PressureLevel = "nominal" | "moderate" | "high" | "critical"; diff --git a/src/shared/generated/runtime/PressureProfile.ts b/src/shared/generated/runtime/PressureProfile.ts new file mode 100644 index 000000000..d0c35e43a --- /dev/null +++ b/src/shared/generated/runtime/PressureProfile.ts @@ -0,0 +1,18 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { ComputeClass } from "./ComputeClass"; +import type { MemoryClass } from "./MemoryClass"; +import type { PressureSignalKind } from "./PressureSignalKind"; + +/** + * What a region declares about its resource footprint at registration + * time. The governor reads this once at register, then re-queries it + * when pressure shifts (regions may report different profiles after + * adapting under load — e.g., hippocampus drops from `Heavy` to + * `Moderate` when working memory is pruned). + */ +export type PressureProfile = { memory_class: MemoryClass, compute_class: ComputeClass, +/** + * Pressure kinds this region wants `on_signal` calls for. Other + * kinds are filtered out by the governor. + */ +responds_to: Array, }; diff --git a/src/shared/generated/runtime/PressureSignalKind.ts b/src/shared/generated/runtime/PressureSignalKind.ts new file mode 100644 index 000000000..6aa7ae326 --- /dev/null +++ b/src/shared/generated/runtime/PressureSignalKind.ts @@ -0,0 +1,11 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +/** + * Which kinds of pressure signals a region wants to receive via + * `on_signal`. The governor filters and routes signals based on this. + * + * Mirrors the variants of [`PressureSignal`] but is a kind-only enum + * (no payload) so it can be declared statically by a region at + * registration time. + */ +export type PressureSignalKind = "thermal" | "battery-low" | "system-mem-high" | "vram-high" | "user-active" | "inference-queue-depth" | "speculation-miss-rate"; diff --git a/src/shared/generated/runtime/RegionId.ts b/src/shared/generated/runtime/RegionId.ts new file mode 100644 index 000000000..7f102b639 --- /dev/null +++ b/src/shared/generated/runtime/RegionId.ts @@ -0,0 +1,11 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +/** + * Stable identifier for a brain region. Used by SubstrateGovernor for + * policy lookup and by telemetry/log streams for tagging events. + * + * Carries `Cow<'static, str>` so static IDs ("hippocampus") cost + * nothing and dynamic IDs (custom regions registered at runtime) are + * still supported. + */ +export type RegionId = string; diff --git a/src/shared/generated/runtime/RegionSignal.ts b/src/shared/generated/runtime/RegionSignal.ts new file mode 100644 index 000000000..907644534 --- /dev/null +++ b/src/shared/generated/runtime/RegionSignal.ts @@ -0,0 +1,11 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { PersonaLifecycle } from "./PersonaLifecycle"; +import type { PressureLevel } from "./PressureLevel"; +import type { SleepPhase } from "./SleepPhase"; + +/** + * Signals the substrate sends to regions out-of-band (not on the + * regular tick). Regions that don't care about a signal default to a + * no-op. + */ +export type RegionSignal = { "kind": "persona-lifecycle" } & PersonaLifecycle | { "kind": "sleep-transition", persona_id: string, phase: SleepPhase, } | { "kind": "system-pressure-changed", level: PressureLevel, }; diff --git a/src/shared/generated/runtime/RegionTelemetry.ts b/src/shared/generated/runtime/RegionTelemetry.ts new file mode 100644 index 000000000..70b4b5faa --- /dev/null +++ b/src/shared/generated/runtime/RegionTelemetry.ts @@ -0,0 +1,54 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { PressureSignal } from "../governor/PressureSignal"; +import type { RegionId } from "./RegionId"; + +/** + * Per-tick telemetry shape every brain region emits. + * + * Emitted on every tick. The substrate routes it to: + * + * - **The governor** — reads `consumed_since_last` / `published` to + * tune region budget (yield-learning loop, algorithm 7). + * - **The operator surface** — `./jtag region/stats` / `region/yield` + * read aggregate telemetry across personas. + * - **The substrate event stream** — `RegionTickCompleted` and + * `ReadyBufferUpdated` events for cross-region awareness. + */ +export type RegionTelemetry = { +/** + * Which region this came from. Stable string id. + */ +region_id: RegionId, +/** + * Persona scope. `None` means the tick was global (background + * work not tied to a specific persona). + */ +persona_id: string | null, +/** + * When this tick started (wall clock). + */ +tick_started_at: string, +/** + * How long the tick body ran. + */ +tick_duration: string, +/** + * Items the region published to ready-buffers this tick. + */ +published: number, +/** + * Items in the region's ready-buffers consumed by handlers since + * the last tick. + */ +consumed_since_last: number, +/** + * Handler `peek` calls that returned `None` since the last tick. + * Signals to the governor that the region should be upweighted + * (handlers are asking for stuff that's not staged yet). + */ +buffer_misses_since_last: number, +/** + * Pressure the region observed (DB slow, embedding queue full, + * etc.). Surfaced to the governor for cascade evaluation. + */ +pressure_observed?: PressureSignal, }; diff --git a/src/shared/generated/runtime/SleepPhase.ts b/src/shared/generated/runtime/SleepPhase.ts new file mode 100644 index 000000000..2ee8d837b --- /dev/null +++ b/src/shared/generated/runtime/SleepPhase.ts @@ -0,0 +1,8 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +/** + * Sleep/wake phases for the persona-level cognitive cycle. The sleep + * policy region (L0-4d) emits these; other regions react by changing + * their tick body (active vs idle vs sleep consolidation). + */ +export type SleepPhase = "active" | "idle" | "sleep"; diff --git a/src/shared/generated/runtime/TickOutcome.ts b/src/shared/generated/runtime/TickOutcome.ts new file mode 100644 index 000000000..138c76919 --- /dev/null +++ b/src/shared/generated/runtime/TickOutcome.ts @@ -0,0 +1,34 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { PressureSignal } from "../governor/PressureSignal"; +import type { CadenceHint } from "./CadenceHint"; + +/** + * Yield telemetry returned by every region tick. Feeds the substrate + * governor's yield-learning loop (algorithm 7 in + * COGNITION-ALGORITHMS.md, lands in L0-4c). + * + * Regions emit this from every tick. The governor reads aggregate + * (`consumed_since_last` vs `published`) to upweight regions whose + * output is being consumed by handlers and downweight regions whose + * output is ignored. + */ +export type TickOutcome = { +/** + * Items the region pre-staged this tick (publishes to ready-buffers). + */ +published: number, +/** + * Items in the region's ready-buffer that have been consumed by + * handlers since the last tick. The denominator for yield. + */ +consumed_since_last: number, +/** + * Pressure observation. If the region detected backpressure (DB + * slow, embedding queue full, etc.), reports it here for the + * governor. + */ +pressure_observed?: PressureSignal, +/** + * Optional next-tick hint (region requests faster/slower cadence). + */ +cadence_hint?: CadenceHint, }; diff --git a/src/workers/continuum-core/src/runtime/brain_region.rs b/src/workers/continuum-core/src/runtime/brain_region.rs new file mode 100644 index 000000000..ddcf7586d --- /dev/null +++ b/src/workers/continuum-core/src/runtime/brain_region.rs @@ -0,0 +1,476 @@ +//! BrainRegion — the cognitive-cycle trait every brain region implements. +//! +//! Companion to ServiceModule. Where ServiceModule handles command/event +//! routing (the existing dispatch surface), BrainRegion handles the +//! cognitive tick: continuous parallel computation, yield telemetry, +//! pressure registration, ready-buffer publishing. +//! +//! A real region (hippocampus, motor cortex, attention, sensory, sleep +//! policy) implements BOTH ServiceModule (for cmd/event surface) and +//! BrainRegion (for cognitive cycle). The runtime continues to dispatch +//! via ServiceModule. The substrate governor (lands L0-4c) dispatches +//! the cognitive tick via BrainRegion. +//! +//! Doctrine (from docs/architecture/BRAIN-REGIONS-SUBSTRATE.md): +//! +//! > No region of cognition runs on the hot path. Each region is its +//! > own RTOS task with its own tick. The handler dispatches and reads +//! > pre-staged results. The handler never blocks on recall, embedding, +//! > planning, or admission — those are continuously produced by their +//! > owning regions, in parallel, governed by SubstrateGovernor. +//! +//! ## L0-3a.0 scope (this slice) +//! +//! Pure typed surface. No region implementations. No governor +//! integration. No derive macro, no scaffold generator (those land +//! when ≥3 regions exist to motivate the abstraction — per the +//! outlier-validation strategy in CLAUDE.md). +//! +//! Later slices ship: L0-3a.1 HippocampusModule skeleton, L0-3a.2+ +//! per-algorithm bodies, L0-4a motor cortex, L0-4b attention, L0-4c +//! governor yield-learning integration. + +use crate::governor::types::PressureSignal; +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::borrow::Cow; +use ts_rs::TS; +use uuid::Uuid; + +// ─── Region identity ──────────────────────────────────────────────── + +/// Stable identifier for a brain region. Used by SubstrateGovernor for +/// policy lookup and by telemetry/log streams for tagging events. +/// +/// Carries `Cow<'static, str>` so static IDs ("hippocampus") cost +/// nothing and dynamic IDs (custom regions registered at runtime) are +/// still supported. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, TS)] +#[ts(export, export_to = "../../../shared/generated/runtime/RegionId.ts")] +pub struct RegionId(pub Cow<'static, str>); + +impl RegionId { + pub const fn from_static(id: &'static str) -> Self { + Self(Cow::Borrowed(id)) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl From<&'static str> for RegionId { + fn from(s: &'static str) -> Self { + Self::from_static(s) + } +} + +impl From for RegionId { + fn from(s: String) -> Self { + Self(Cow::Owned(s)) + } +} + +impl std::fmt::Display for RegionId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } +} + +// ─── Pressure profile ─────────────────────────────────────────────── + +/// Memory footprint class. Drives governor decisions about which +/// regions to throttle first under memory pressure. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, TS)] +#[serde(rename_all = "kebab-case")] +#[ts(export, export_to = "../../../shared/generated/runtime/MemoryClass.ts")] +pub enum MemoryClass { + /// Lightweight — small in-memory structures, no large caches. + Light, + /// Moderate — recall caches, salience maps, telemetry windows. + Moderate, + /// Heavy — engram graph, working memory ring, multiple ready-buffers. + Heavy, + /// VRAM-sensitive — touches GPU residency (genome region, inference-adjacent). + VramSensitive, +} + +/// Compute footprint class. Drives governor decisions about which +/// regions to throttle first under compute/thermal pressure. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, TS)] +#[serde(rename_all = "kebab-case")] +#[ts( + export, + export_to = "../../../shared/generated/runtime/ComputeClass.ts" +)] +pub enum ComputeClass { + /// Tick body is bookkeeping only — cheap. + Bookkeeping, + /// Tick body does scoring / graph traversal — CPU-bound but bounded. + Cpu, + /// Tick body invokes embedding / similarity / vectorized work. + CpuVectorized, + /// Tick body invokes inference (sub-token generation or scoring). + InferenceLight, + /// Tick body could invoke full inference. The governor MUST budget this carefully. + InferenceHeavy, +} + +/// Which kinds of pressure signals a region wants to receive via +/// `on_signal`. The governor filters and routes signals based on this. +/// +/// Mirrors the variants of [`PressureSignal`] but is a kind-only enum +/// (no payload) so it can be declared statically by a region at +/// registration time. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, TS)] +#[serde(rename_all = "kebab-case")] +#[ts( + export, + export_to = "../../../shared/generated/runtime/PressureSignalKind.ts" +)] +pub enum PressureSignalKind { + Thermal, + BatteryLow, + SystemMemHigh, + VramHigh, + UserActive, + InferenceQueueDepth, + SpeculationMissRate, +} + +/// What a region declares about its resource footprint at registration +/// time. The governor reads this once at register, then re-queries it +/// when pressure shifts (regions may report different profiles after +/// adapting under load — e.g., hippocampus drops from `Heavy` to +/// `Moderate` when working memory is pruned). +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[ts( + export, + export_to = "../../../shared/generated/runtime/PressureProfile.ts" +)] +pub struct PressureProfile { + pub memory_class: MemoryClass, + pub compute_class: ComputeClass, + /// Pressure kinds this region wants `on_signal` calls for. Other + /// kinds are filtered out by the governor. + pub responds_to: Vec, +} + +// ─── Tick outcome (yield telemetry) ───────────────────────────────── + +/// A hint a region can pass back to the governor about preferred next +/// tick cadence. The governor may honor or override; it owns the +/// final policy. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, TS)] +#[serde(rename_all = "kebab-case")] +#[ts(export, export_to = "../../../shared/generated/runtime/CadenceHint.ts")] +pub enum CadenceHint { + /// Tick faster than current cadence (region has urgent work). + Faster, + /// Hold current cadence. + Hold, + /// Tick slower than current cadence (region is idle / over-tasked relative to consumed yield). + Slower, + /// Sleep — region has nothing useful to do until a signal fires. + Sleep, +} + +/// Yield telemetry returned by every region tick. Feeds the substrate +/// governor's yield-learning loop (algorithm 7 in +/// COGNITION-ALGORITHMS.md, lands in L0-4c). +/// +/// Regions emit this from every tick. The governor reads aggregate +/// (`consumed_since_last` vs `published`) to upweight regions whose +/// output is being consumed by handlers and downweight regions whose +/// output is ignored. +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[ts(export, export_to = "../../../shared/generated/runtime/TickOutcome.ts")] +pub struct TickOutcome { + /// Items the region pre-staged this tick (publishes to ready-buffers). + #[ts(type = "number")] + pub published: usize, + + /// Items in the region's ready-buffer that have been consumed by + /// handlers since the last tick. The denominator for yield. + #[ts(type = "number")] + pub consumed_since_last: usize, + + /// Pressure observation. If the region detected backpressure (DB + /// slow, embedding queue full, etc.), reports it here for the + /// governor. + #[ts(optional)] + pub pressure_observed: Option, + + /// Optional next-tick hint (region requests faster/slower cadence). + #[ts(optional)] + pub cadence_hint: Option, +} + +impl TickOutcome { + /// Idle outcome — region had no work this tick. Convenience for + /// no-op ticks and tests. + pub fn idle() -> Self { + Self { + published: 0, + consumed_since_last: 0, + pressure_observed: None, + cadence_hint: None, + } + } +} + +// ─── Region signals ───────────────────────────────────────────────── + +/// Persona lifecycle events relevant to regions (allow regions to +/// allocate / deallocate per-persona state). +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[serde(rename_all = "kebab-case", tag = "kind")] +#[ts( + export, + export_to = "../../../shared/generated/runtime/PersonaLifecycle.ts" +)] +pub enum PersonaLifecycle { + Created { + #[ts(type = "string")] + persona_id: Uuid, + }, + Destroyed { + #[ts(type = "string")] + persona_id: Uuid, + }, +} + +/// Sleep/wake phases for the persona-level cognitive cycle. The sleep +/// policy region (L0-4d) emits these; other regions react by changing +/// their tick body (active vs idle vs sleep consolidation). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, TS)] +#[serde(rename_all = "kebab-case")] +#[ts(export, export_to = "../../../shared/generated/runtime/SleepPhase.ts")] +pub enum SleepPhase { + /// Persona is actively servicing — tick at high cadence, shallow consolidation. + Active, + /// Persona is idle but recently active — tick at moderate cadence, normal consolidation. + Idle, + /// Persona is in deep idle — tick at low cadence, deep consolidation + pruning. + Sleep, +} + +/// Coarse system pressure level surfaced to regions so they can adjust +/// internally without parsing every PressureSignal variant. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, TS)] +#[serde(rename_all = "kebab-case")] +#[ts( + export, + export_to = "../../../shared/generated/runtime/PressureLevel.ts" +)] +pub enum PressureLevel { + Nominal, + Moderate, + High, + Critical, +} + +/// Signals the substrate sends to regions out-of-band (not on the +/// regular tick). Regions that don't care about a signal default to a +/// no-op. +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[serde(rename_all = "kebab-case", tag = "kind")] +#[ts( + export, + export_to = "../../../shared/generated/runtime/RegionSignal.ts" +)] +pub enum RegionSignal { + PersonaLifecycle(PersonaLifecycle), + SleepTransition { + #[ts(type = "string")] + persona_id: Uuid, + phase: SleepPhase, + }, + SystemPressureChanged { + level: PressureLevel, + }, +} + +// ─── Region context ───────────────────────────────────────────────── + +/// What the substrate passes to a region's `tick` body. Carries the +/// substrate handles a region needs to do its work without reaching +/// for globals. +/// +/// L0-3a.0 ships the type; L0-3a.1+ adds real handles (ModuleContext +/// reference, governor handle, persona state map, etc.). For now it's +/// a placeholder so the trait signature compiles. +#[derive(Debug, Clone)] +pub struct RegionContext { + /// Tick number since region started. Useful for cadence-modulated + /// logic ("every 10th tick, do deeper work"). + pub tick_number: u64, + /// Optional persona scope — if the substrate is ticking the region + /// for one specific persona's slot, this is set. If `None`, the + /// region is ticking globally (background work). + pub persona_scope: Option, +} + +impl RegionContext { + pub fn global(tick_number: u64) -> Self { + Self { + tick_number, + persona_scope: None, + } + } + + pub fn for_persona(tick_number: u64, persona_id: Uuid) -> Self { + Self { + tick_number, + persona_scope: Some(persona_id), + } + } +} + +// ─── Region errors ────────────────────────────────────────────────── + +/// Errors a region can surface from `on_signal`. Tick failures use +/// `TickOutcome.pressure_observed` to signal degradation; signal +/// failures are explicit because the substrate may need to retry. +#[derive(Debug, thiserror::Error)] +pub enum RegionError { + #[error("region {0} rejected signal: {1}")] + SignalRejected(RegionId, String), + #[error("region {0} not ready: {1}")] + NotReady(RegionId, String), + #[error("region {0} internal error: {1}")] + Internal(RegionId, String), +} + +// ─── The trait ────────────────────────────────────────────────────── + +/// A cognitive subsystem (hippocampus, motor cortex, attention, +/// sensory, sleep policy). Each region runs its own tick on its own +/// tokio task, governed by SubstrateGovernor. +/// +/// A region typically also implements [`ServiceModule`](super::ServiceModule) +/// for command/event routing, but doesn't have to — pure cognitive +/// regions with no external command surface are valid. +/// +/// See `docs/architecture/BRAIN-REGIONS-SUBSTRATE.md` for the full +/// contract and `docs/architecture/COGNITION-ALGORITHMS.md` for what +/// runs inside the tick. +#[async_trait] +pub trait BrainRegion: Send + Sync + 'static { + /// Stable identifier. Used by SubstrateGovernor for policy lookup + /// and by telemetry/log streams for event tagging. + fn id(&self) -> RegionId; + + /// Pressure footprint declaration. Returned at registration time + /// and re-queried by the governor when pressure shifts. + fn pressure_profile(&self) -> PressureProfile; + + /// Run one tick. The substrate calls this on the region's own task + /// at the cadence governed by SubstrateGovernor. + /// + /// The body is responsible for: reading inputs (from shared state, + /// channels, or its own DB), producing pre-staged results, and + /// publishing them to the ready-buffer. + /// + /// Implementations MUST be idempotent on early return and MUST NOT + /// block indefinitely — the governor cancels long-running ticks + /// under pressure. + async fn tick(&self, ctx: &RegionContext) -> TickOutcome; + + /// React to a substrate-level signal. Defaults to a no-op so + /// regions that don't care about any signals can ignore the + /// surface entirely. + async fn on_signal(&self, _signal: RegionSignal) -> Result<(), RegionError> { + Ok(()) + } +} + +// ─── Tests ────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + /// A minimal region for trait validation. Verifies the trait is + /// object-safe, the default `on_signal` works, and an idle tick + /// outcome round-trips through the type system. + struct TestRegion { + id: RegionId, + } + + #[async_trait] + impl BrainRegion for TestRegion { + fn id(&self) -> RegionId { + self.id.clone() + } + + fn pressure_profile(&self) -> PressureProfile { + PressureProfile { + memory_class: MemoryClass::Light, + compute_class: ComputeClass::Bookkeeping, + responds_to: vec![], + } + } + + async fn tick(&self, _ctx: &RegionContext) -> TickOutcome { + TickOutcome::idle() + } + } + + #[tokio::test] + async fn test_region_implements_trait() { + let region: Box = Box::new(TestRegion { + id: RegionId::from_static("test"), + }); + let ctx = RegionContext::global(0); + let outcome = region.tick(&ctx).await; + assert_eq!(outcome.published, 0); + assert_eq!(outcome.consumed_since_last, 0); + assert!(outcome.pressure_observed.is_none()); + assert!(outcome.cadence_hint.is_none()); + } + + #[tokio::test] + async fn test_default_on_signal_is_noop() { + let region = TestRegion { + id: RegionId::from_static("test"), + }; + let signal = RegionSignal::SystemPressureChanged { + level: PressureLevel::Nominal, + }; + assert!(region.on_signal(signal).await.is_ok()); + } + + #[test] + fn test_region_id_static_construction() { + const ID: RegionId = RegionId::from_static("hippocampus"); + assert_eq!(ID.as_str(), "hippocampus"); + } + + #[test] + fn test_region_id_display() { + let id = RegionId::from_static("motor_cortex"); + assert_eq!(format!("{id}"), "motor_cortex"); + } + + #[test] + fn test_region_context_global_and_per_persona() { + let global = RegionContext::global(5); + assert_eq!(global.tick_number, 5); + assert!(global.persona_scope.is_none()); + + let persona_id = Uuid::new_v4(); + let scoped = RegionContext::for_persona(7, persona_id); + assert_eq!(scoped.tick_number, 7); + assert_eq!(scoped.persona_scope, Some(persona_id)); + } + + #[test] + fn test_tick_outcome_idle_constructor() { + let outcome = TickOutcome::idle(); + assert_eq!(outcome.published, 0); + assert_eq!(outcome.consumed_since_last, 0); + assert!(outcome.pressure_observed.is_none()); + assert!(outcome.cadence_hint.is_none()); + } +} diff --git a/src/workers/continuum-core/src/runtime/mod.rs b/src/workers/continuum-core/src/runtime/mod.rs index b3c07e4d3..a188226c6 100644 --- a/src/workers/continuum-core/src/runtime/mod.rs +++ b/src/workers/continuum-core/src/runtime/mod.rs @@ -25,12 +25,15 @@ use std::sync::Arc; use std::sync::OnceLock; pub mod artifact_handle; +pub mod brain_region; pub mod command_executor; pub mod control; pub mod message_bus; pub mod module_context; pub mod module_logger; pub mod module_metrics; +pub mod ready_buffer; +pub mod region_telemetry; pub mod registry; #[allow(clippy::module_inception)] pub mod runtime; @@ -38,6 +41,11 @@ pub mod service_module; pub mod shared_compute; pub use artifact_handle::{ArtifactKey, ArtifactSelector, Cadence}; +pub use brain_region::{ + BrainRegion, CadenceHint, ComputeClass, MemoryClass, PersonaLifecycle, PressureLevel, + PressureProfile, PressureSignalKind, RegionContext, RegionError, RegionId, RegionSignal, + SleepPhase, TickOutcome, +}; pub use command_executor::{ execute as execute_command, execute_json as execute_command_json, executor, init_executor, CommandExecutor, @@ -47,6 +55,8 @@ pub use message_bus::MessageBus; pub use module_context::ModuleContext; pub use module_logger::ModuleLogger; pub use module_metrics::{CommandTiming, ModuleMetrics, ModuleStats}; +pub use ready_buffer::{DashMapReadyBuffer, ReadyBuffer}; +pub use region_telemetry::RegionTelemetry; pub use registry::ModuleRegistry; pub use runtime::Runtime; pub use service_module::{ diff --git a/src/workers/continuum-core/src/runtime/ready_buffer.rs b/src/workers/continuum-core/src/runtime/ready_buffer.rs new file mode 100644 index 000000000..270a8fb6e --- /dev/null +++ b/src/workers/continuum-core/src/runtime/ready_buffer.rs @@ -0,0 +1,278 @@ +//! ReadyBuffer — the publish/peek surface that every brain region +//! uses to hand off pre-staged results to handlers without blocking. +//! +//! Doctrine (from docs/architecture/BRAIN-REGIONS-SUBSTRATE.md): +//! +//! > Empty buffer is a signal, not a block. If a handler reads and +//! > gets None, it proceeds with whatever degraded path the algorithm +//! > specifies. Slightly-stale context > stalled persona. +//! +//! ## Semantic rules +//! +//! - **Reads MUST NOT block** — handlers call `peek` on the hot path; +//! it MUST complete in microseconds and MUST NOT `await`. The +//! [`DashMapReadyBuffer`] default impl honors this via DashMap's +//! sharded locks. +//! - **Staleness is acceptable** — a ready value might be 100ms old; +//! that's better than blocking the handler 500ms to recompute. +//! - **Per-region buffers, not a global one** — hippocampus owns its +//! engram-prefetch buffer; motor cortex owns its candidate-utterance +//! buffer. They share the same trait shape but live in their own +//! region structs. +//! - **TTL eviction** is region-owned — regions decide what "stale" +//! means for their value type. +//! +//! ## L0-3a.0 scope (this slice) +//! +//! Trait definition + a single default `DashMap`-backed implementation. +//! No region-specific buffers yet (those land with their owning regions +//! in L0-3a.1+, L0-4a, L0-4b, etc.). + +use dashmap::DashMap; +use std::hash::Hash; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +// ─── The trait ────────────────────────────────────────────────────── + +/// Pre-staged result publishing for brain regions. Regions write +/// (`publish`), handlers read (`peek`). The buffer holds the freshest +/// value per key; older values are dropped on overwrite. +pub trait ReadyBuffer: Send + Sync { + /// The key type. Typically `(persona_id, channel_id)` or similar + /// composite identifying what the staged value is for. + type Key: Hash + Eq + Clone; + + /// The value type. Region-specific (engram set, candidate-utterance + /// list, salience snapshot, ...). + type Value: Clone; + + /// Synchronous read. Returns the freshest staged value for the + /// key, or `None`. + /// + /// Handlers call this on the hot path — it MUST NOT block, MUST + /// NOT await, and MUST complete in microseconds. + fn peek(&self, key: &Self::Key) -> Option; + + /// Region-side write. Atomically replaces the value for the key. + /// Older value (if any) is dropped. + fn publish(&self, key: Self::Key, value: Self::Value); + + /// TTL-style eviction sweep. Removes entries whose published-at + /// timestamp is older than `max_age`. Called by the substrate + /// under memory pressure or by the region itself on a sweep tick. + /// + /// Returns the number of entries evicted. + fn evict_stale(&self, max_age: Duration) -> usize; + + /// Current entry count. Used for telemetry and pressure reporting. + fn len(&self) -> usize; + + /// Convenience — most call sites care whether the buffer is empty + /// before deciding to sweep / report pressure. + fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +// ─── Default implementation ───────────────────────────────────────── + +/// Each entry stores its value plus the instant it was published, so +/// `evict_stale` can compute age without walking external state. +#[derive(Clone)] +struct TimestampedEntry { + value: V, + published_at: Instant, +} + +/// DashMap-backed [`ReadyBuffer`]. The default implementation for +/// regions that need a key→value mapping with sharded concurrent +/// access. +/// +/// Reads are sharded by key hash, so peek is wait-free in the common +/// case. Writes acquire the per-shard lock briefly to replace the +/// entry — well within the "microseconds" budget the peek contract +/// asks for. +pub struct DashMapReadyBuffer +where + K: Hash + Eq + Clone + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, +{ + inner: Arc>>, +} + +impl DashMapReadyBuffer +where + K: Hash + Eq + Clone + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, +{ + pub fn new() -> Self { + Self { + inner: Arc::new(DashMap::new()), + } + } + + /// Create with an initial shard capacity hint. Useful when the + /// region knows the working set size up front (e.g., one entry per + /// active persona). + pub fn with_capacity(capacity: usize) -> Self { + Self { + inner: Arc::new(DashMap::with_capacity(capacity)), + } + } +} + +impl Default for DashMapReadyBuffer +where + K: Hash + Eq + Clone + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, +{ + fn default() -> Self { + Self::new() + } +} + +impl Clone for DashMapReadyBuffer +where + K: Hash + Eq + Clone + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, +{ + /// Cheap clone — shares the underlying DashMap via `Arc`. Multiple + /// handles to the same buffer is the expected pattern (region + /// publishes, handlers read). + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + +impl ReadyBuffer for DashMapReadyBuffer +where + K: Hash + Eq + Clone + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, +{ + type Key = K; + type Value = V; + + fn peek(&self, key: &Self::Key) -> Option { + self.inner.get(key).map(|entry| entry.value.clone()) + } + + fn publish(&self, key: Self::Key, value: Self::Value) { + self.inner.insert( + key, + TimestampedEntry { + value, + published_at: Instant::now(), + }, + ); + } + + fn evict_stale(&self, max_age: Duration) -> usize { + let now = Instant::now(); + let stale_keys: Vec = self + .inner + .iter() + .filter(|entry| now.duration_since(entry.value().published_at) > max_age) + .map(|entry| entry.key().clone()) + .collect(); + let evicted = stale_keys.len(); + for key in stale_keys { + self.inner.remove(&key); + } + evicted + } + + fn len(&self) -> usize { + self.inner.len() + } +} + +// ─── Tests ────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_publish_then_peek_returns_value() { + let buf: DashMapReadyBuffer = DashMapReadyBuffer::new(); + buf.publish(1, "engram-set-1".to_string()); + assert_eq!(buf.peek(&1), Some("engram-set-1".to_string())); + } + + #[test] + fn test_peek_missing_key_returns_none() { + let buf: DashMapReadyBuffer = DashMapReadyBuffer::new(); + assert_eq!(buf.peek(&42), None); + } + + #[test] + fn test_publish_overwrites_previous_value() { + let buf: DashMapReadyBuffer = DashMapReadyBuffer::new(); + buf.publish(1, "old".to_string()); + buf.publish(1, "new".to_string()); + assert_eq!(buf.peek(&1), Some("new".to_string())); + } + + #[test] + fn test_evict_stale_removes_old_entries_keeps_fresh() { + let buf: DashMapReadyBuffer = DashMapReadyBuffer::new(); + buf.publish(1, "old".to_string()); + std::thread::sleep(Duration::from_millis(20)); + buf.publish(2, "fresh".to_string()); + + // Anything older than 10ms is evicted — key 1 goes, key 2 stays. + let evicted = buf.evict_stale(Duration::from_millis(10)); + assert_eq!(evicted, 1); + assert_eq!(buf.peek(&1), None); + assert_eq!(buf.peek(&2), Some("fresh".to_string())); + } + + #[test] + fn test_evict_stale_zero_max_age_clears_everything() { + let buf: DashMapReadyBuffer = DashMapReadyBuffer::new(); + buf.publish(1, "a".to_string()); + buf.publish(2, "b".to_string()); + let evicted = buf.evict_stale(Duration::ZERO); + assert_eq!(evicted, 2); + assert!(buf.is_empty()); + } + + #[test] + fn test_len_and_is_empty_reflect_state() { + let buf: DashMapReadyBuffer = DashMapReadyBuffer::new(); + assert!(buf.is_empty()); + assert_eq!(buf.len(), 0); + buf.publish(1, "x".to_string()); + assert!(!buf.is_empty()); + assert_eq!(buf.len(), 1); + } + + #[test] + fn test_clone_shares_underlying_storage() { + let buf_a: DashMapReadyBuffer = DashMapReadyBuffer::new(); + let buf_b = buf_a.clone(); + buf_a.publish(1, "from-a".to_string()); + // Both handles see the same value — Arc-shared inner DashMap. + assert_eq!(buf_b.peek(&1), Some("from-a".to_string())); + } + + #[test] + fn test_trait_object_usage() { + // Trait is dyn-compatible for handlers that don't care about + // the concrete type. + let buf: Box> = + Box::new(DashMapReadyBuffer::::new()); + buf.publish(1, "via-trait".to_string()); + assert_eq!(buf.peek(&1), Some("via-trait".to_string())); + } + + #[test] + fn test_with_capacity_constructor() { + let buf: DashMapReadyBuffer = DashMapReadyBuffer::with_capacity(64); + buf.publish(1, 100); + assert_eq!(buf.peek(&1), Some(100)); + } +} diff --git a/src/workers/continuum-core/src/runtime/region_telemetry.rs b/src/workers/continuum-core/src/runtime/region_telemetry.rs new file mode 100644 index 000000000..7b36de9a7 --- /dev/null +++ b/src/workers/continuum-core/src/runtime/region_telemetry.rs @@ -0,0 +1,145 @@ +//! RegionTelemetry — the structured event shape every brain region +//! emits per tick. +//! +//! Mandatory for every region. It's the only path the substrate +//! governor's yield-learning loop (algorithm 7) has into the regions +//! and the only operator surface for debugging cognitive cycles. +//! +//! Doctrine (from docs/architecture/BRAIN-REGIONS-SUBSTRATE.md): +//! +//! > Telemetry is mandatory for every region; it's the only way the +//! > yield-learning loop and the operator debugging path work. The +//! > derive macro generates the telemetry emission automatically. +//! +//! The derive macro lands later (once ≥3 regions exist to motivate +//! it); this slice ships the typed struct so regions can emit +//! manually. + +use super::brain_region::RegionId; +use crate::governor::types::PressureSignal; +use serde::{Deserialize, Serialize}; +use std::time::{Duration, SystemTime}; +use ts_rs::TS; +use uuid::Uuid; + +/// Per-tick telemetry shape every brain region emits. +/// +/// Emitted on every tick. The substrate routes it to: +/// +/// - **The governor** — reads `consumed_since_last` / `published` to +/// tune region budget (yield-learning loop, algorithm 7). +/// - **The operator surface** — `./jtag region/stats` / `region/yield` +/// read aggregate telemetry across personas. +/// - **The substrate event stream** — `RegionTickCompleted` and +/// `ReadyBufferUpdated` events for cross-region awareness. +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[ts( + export, + export_to = "../../../shared/generated/runtime/RegionTelemetry.ts" +)] +pub struct RegionTelemetry { + /// Which region this came from. Stable string id. + pub region_id: RegionId, + + /// Persona scope. `None` means the tick was global (background + /// work not tied to a specific persona). + #[ts(type = "string | null")] + pub persona_id: Option, + + /// When this tick started (wall clock). + #[ts(type = "string")] + pub tick_started_at: SystemTime, + + /// How long the tick body ran. + #[ts(type = "string")] + pub tick_duration: Duration, + + /// Items the region published to ready-buffers this tick. + #[ts(type = "number")] + pub published: usize, + + /// Items in the region's ready-buffers consumed by handlers since + /// the last tick. + #[ts(type = "number")] + pub consumed_since_last: usize, + + /// Handler `peek` calls that returned `None` since the last tick. + /// Signals to the governor that the region should be upweighted + /// (handlers are asking for stuff that's not staged yet). + #[ts(type = "number")] + pub buffer_misses_since_last: usize, + + /// Pressure the region observed (DB slow, embedding queue full, + /// etc.). Surfaced to the governor for cascade evaluation. + #[ts(optional)] + pub pressure_observed: Option, +} + +impl RegionTelemetry { + /// Compute the consumption fraction. Used by the governor to + /// upweight or downweight a region's budget. Returns `None` when + /// `published` is zero (no signal this tick — preserve prior + /// estimate rather than introducing a zero). + pub fn consumption_fraction(&self) -> Option { + if self.published == 0 { + None + } else { + Some(self.consumed_since_last as f32 / self.published as f32) + } + } + + /// Whether handlers were asking for data the region hadn't staged. + /// A positive value here is the governor's signal to give the + /// region more budget. + pub fn had_buffer_misses(&self) -> bool { + self.buffer_misses_since_last > 0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sample(published: usize, consumed: usize, misses: usize) -> RegionTelemetry { + RegionTelemetry { + region_id: RegionId::from_static("test"), + persona_id: Some(Uuid::nil()), + tick_started_at: SystemTime::UNIX_EPOCH, + tick_duration: Duration::from_millis(1), + published, + consumed_since_last: consumed, + buffer_misses_since_last: misses, + pressure_observed: None, + } + } + + #[test] + fn test_consumption_fraction_with_publishes() { + let t = sample(10, 7, 0); + assert_eq!(t.consumption_fraction(), Some(0.7)); + } + + #[test] + fn test_consumption_fraction_zero_published_returns_none() { + let t = sample(0, 0, 3); + assert_eq!(t.consumption_fraction(), None); + } + + #[test] + fn test_consumption_fraction_full_consumption() { + let t = sample(5, 5, 0); + assert_eq!(t.consumption_fraction(), Some(1.0)); + } + + #[test] + fn test_had_buffer_misses_true_when_positive() { + let t = sample(10, 5, 1); + assert!(t.had_buffer_misses()); + } + + #[test] + fn test_had_buffer_misses_false_when_zero() { + let t = sample(10, 5, 0); + assert!(!t.had_buffer_misses()); + } +}