From a7edd535172d3be46a6aa072b9bb36a7b6c8ccf2 Mon Sep 17 00:00:00 2001 From: Test Date: Sat, 16 May 2026 16:51:17 -0500 Subject: [PATCH] =?UTF-8?q?feat(runtime):=20CBAR-PIECE-2=20PR-3=20?= =?UTF-8?q?=E2=80=94=20artifact=20dispatch=20via=20bus?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-3 of CBAR-SUBSTRATE PIECE-2 (artifact subscription / cadence / dispatch). PR-1 (#1321) shipped the ArtifactKey + ArtifactSelector + Cadence data types. PR-2 (#1323) added the three default-impl methods on ServiceModule (artifact_subscriptions / cadence / on_artifact_available) — pure trait surface, no dispatch yet. This PR wires the dispatcher. What it does - Runtime::register translates each opted-in module's ArtifactSelector::Exact into a synchronous bus.subscribe(key, module_name, true). Bus delivers via handle_event. - ServiceModule's default handle_event impl auto-routes when the incoming event_name matches one of the module's artifact_subscriptions, calling on_artifact_available. Existing modules with no artifact_subscriptions keep their current no-op default behavior — full backwards compat. What it does NOT do - ArtifactSelector::Prefix delivery. The bus's glob_matches splits on `:` not `/`, and the ArtifactKey separator convention isn't unified across producers yet. PR-3 emits warn! at registration time and silently no-ops the dispatch. Test pins the no-op so the follow-up that unifies the separator has a regression check to flip from expect-zero to expect-N. Design notes (per airc design pass with vhsm-scope airc-8a5e 2026-05-16 19:58Z) - Sync subscription (synchronous=true): bus's async tier sends to a broadcast channel that nothing in the runtime currently routes back to handle_event — synchronous=false would silently drop. The on_artifact_available docstring already mandates "cheap-and-return," so sync is safe; subscribers can tokio::spawn for heavy work. - Cadence routing split: Periodic uses the existing tick_interval path; EventDriven/OnArtifact use this new bus path; Mixed uses both. Wiring the bus path is unconditional when artifact_subscriptions is non-empty. - Modules that already override handle_event keep full control; they can call self.on_artifact_available(key, payload).await from inside their override to opt into the same auto-route behavior. Tests - runtime/runtime.rs piece_2_pr3_dispatch_tests (4 tests): - exact_selector_delivers_only_matching_key - prefix_selector_currently_no_ops_pending_separator_unification (pins the known gap) - module_without_artifact_subscriptions_receives_nothing (backwards compat guard for HealthModule / PressureBrokerModule / etc.) - multi_module_isolation_each_gets_only_matching_artifacts All 42 runtime:: tests pass (4 new + 38 existing including the PR-1/ PR-2 artifact_handle + service_module tests). Also pulls in the ts-rs generated bindings for ArtifactKey, ArtifactSelector, and Cadence that were missed in #1321/#1323 — these are required outputs of the Rust↔TS boundary contract (per CLAUDE.md "NEVER hand-write types that cross the Rust↔TS boundary"). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/shared/generated/runtime/ArtifactKey.ts | 14 + .../generated/runtime/ArtifactSelector.ts | 17 + src/shared/generated/runtime/Cadence.ts | 36 ++ src/shared/generated/runtime/index.ts | 3 + .../continuum-core/src/runtime/runtime.rs | 330 ++++++++++++++++++ .../src/runtime/service_module.rs | 28 +- 6 files changed, 426 insertions(+), 2 deletions(-) create mode 100644 src/shared/generated/runtime/ArtifactKey.ts create mode 100644 src/shared/generated/runtime/ArtifactSelector.ts create mode 100644 src/shared/generated/runtime/Cadence.ts diff --git a/src/shared/generated/runtime/ArtifactKey.ts b/src/shared/generated/runtime/ArtifactKey.ts new file mode 100644 index 000000000..5e1865429 --- /dev/null +++ b/src/shared/generated/runtime/ArtifactKey.ts @@ -0,0 +1,14 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +/** + * Stable identifier for an artifact stream. Producer-side modules + * declare a key when they publish; consumer-side modules name a key + * when they subscribe. + * + * Format convention (not enforced): `/.`. E.g. + * `paging/broker.snapshot`, `cognition/rate_proposals.result`, + * `inference_capability/registry.peer_announced`. The runtime does + * not parse the structure — it's a string match. Convention is for + * humans reading subscription lists, not the dispatcher. + */ +export type ArtifactKey = string; diff --git a/src/shared/generated/runtime/ArtifactSelector.ts b/src/shared/generated/runtime/ArtifactSelector.ts new file mode 100644 index 000000000..15b5bcca2 --- /dev/null +++ b/src/shared/generated/runtime/ArtifactSelector.ts @@ -0,0 +1,17 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { ArtifactKey } from "./ArtifactKey"; + +/** + * What a subscriber wants to be notified about. + * + * `Exact` — match one specific `ArtifactKey` (the common case). + * `Prefix` — match every key starting with a string (e.g. a persona + * module wanting every `cognition/*` artifact). + * + * Glob/regex deliberately omitted: the matcher is the hot path the + * runtime walks every publish, and string-prefix is cheap + covers + * the cases we have. If a future module needs glob, it can compose + * `Prefix` + filter in its own handler — keeps the matcher fast for + * the 99% case. + */ +export type ArtifactSelector = { "kind": "exact", "value": ArtifactKey } | { "kind": "prefix", "value": string }; diff --git a/src/shared/generated/runtime/Cadence.ts b/src/shared/generated/runtime/Cadence.ts new file mode 100644 index 000000000..375baef19 --- /dev/null +++ b/src/shared/generated/runtime/Cadence.ts @@ -0,0 +1,36 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +/** + * How the runtime should drive a module's work surface. PR-2 adds + * this as an Optional field on `ModuleConfig`; modules that don't + * declare a cadence keep their current behavior (purely reactive to + * commands and events). + * + * `Periodic(Duration)` — broker-paced tick at the given interval. The + * runtime calls `tick()` at this cadence. Duration is the requested + * floor — broker can stretch under pressure (no hardcoded ceiling + * anywhere; broker decides per pressure state). + * + * `EventDriven` — woken only when one of the module's + * `event_subscriptions` fires. No periodic call. Lowest overhead + * for modules that genuinely have nothing to do until something + * external happens. + * + * `OnArtifact` — woken when an artifact this module subscribes to is + * published. Composes with subscriptions: subscriber list lives in + * `ModuleConfig.artifact_subscriptions` (PR-2); cadence says "wake + * me on those subscriptions, otherwise rest." + * + * `Mixed` — periodic tick AND artifact wakes. For modules that + * need a heartbeat (e.g. cache TTL eviction) plus reactive bursts. + * + * Deliberately no `OnDemand` / `Manual` variant. Every supervised + * task has a cadence policy the supervisor knows; a module that + * truly never wakes shouldn't exist as a registered module. + */ +export type Cadence = { "kind": "periodic", +/** + * Requested floor on tick interval. ms over the wire so the + * TS side doesn't have to handle bigint Duration shape. + */ +intervalMs: number, } | { "kind": "eventDriven" } | { "kind": "onArtifact" } | { "kind": "mixed", intervalMs: number, }; diff --git a/src/shared/generated/runtime/index.ts b/src/shared/generated/runtime/index.ts index bdfb47501..1cfe40435 100644 --- a/src/shared/generated/runtime/index.ts +++ b/src/shared/generated/runtime/index.ts @@ -2,6 +2,9 @@ // Source: generator/generate-rust-bindings.ts // Re-generate: npx tsx generator/generate-rust-bindings.ts +export type { ArtifactKey } from './ArtifactKey'; +export type { ArtifactSelector } from './ArtifactSelector'; +export type { Cadence } from './Cadence'; export type { ChannelTickConfig } from './ChannelTickConfig'; export type { CommandTiming } from './CommandTiming'; export type { ModuleInfo } from './ModuleInfo'; diff --git a/src/workers/continuum-core/src/runtime/runtime.rs b/src/workers/continuum-core/src/runtime/runtime.rs index e6de9527c..0d0107229 100644 --- a/src/workers/continuum-core/src/runtime/runtime.rs +++ b/src/workers/continuum-core/src/runtime/runtime.rs @@ -82,6 +82,62 @@ impl Runtime { self.bus.subscribe(pattern, config.name, false); } + // PIECE-2 PR-3: wire artifact_subscriptions onto the same bus. + // Each ArtifactSelector translates to a bus subscription: + // Exact(k) → bus.subscribe(k, name, true) + // Prefix(p) → KNOWN GAP, no-op + warn (see below) + // + // Subscribed `synchronous: true` so MessageBus::publish dispatches + // inline through handle_event. The async tier (synchronous=false) + // sends to a broadcast channel that nothing in the runtime + // currently auto-routes back to handle_event — synchronous=false + // would silently drop. Sync is safe because on_artifact_available + // is contract-bound to cheap-and-return (see its docstring); if + // a subscriber needs heavy work, it can `tokio::spawn` inside + // the handler. + // + // Delivery: bus calls handle_event with event_name = key; the + // default handle_event impl in service_module.rs auto-dispatches + // to on_artifact_available when the incoming key matches one of + // this module's artifact_subscriptions. Modules that override + // handle_event keep full control and can call + // on_artifact_available themselves if they want. + // + // Cadence routing split (per airc design check w/ vhsm-scope + // airc-8a5e, 2026-05-16 19:58Z): + // Cadence::EventDriven | OnArtifact → this bus path + // Cadence::Periodic → existing tick_interval path + // Cadence::Mixed → both + // We always wire bus subscriptions when artifact_subscriptions + // is non-empty; the tick_interval path is wired separately by + // start_tick_loops. + for selector in module.artifact_subscriptions() { + match selector { + super::ArtifactSelector::Exact(key) => { + self.bus.subscribe(key.as_str(), config.name, true); + } + super::ArtifactSelector::Prefix(p) => { + // KNOWN GAP: bus glob_matches (message_bus.rs:245) + // splits on `:` not `/` — Prefix("cognition/") → + // bus pattern matches nothing because the matcher + // only sees one colon-segment on either side. + // Resolving requires choosing one separator + // convention for ArtifactKey + aligning bus events + // to match. PR-3 ships Exact-only support; Prefix + // is silently no-op'd until convention is unified + // (separate slice). Pinned by a test that asserts + // the no-op so the follow-up has a regression check + // to flip. + warn!( + "Module '{}' uses ArtifactSelector::Prefix({:?}) but bus glob_matches \ + uses colon-segmented patterns — prefix delivery is not wired in PR-3. \ + Use Exact selectors until separator convention is unified.", + config.name, p + ); + } + } + } + if config.max_concurrency > 0 { self.concurrency_limits.insert( config.name, @@ -374,3 +430,277 @@ impl Runtime { Ok(()) } } + +#[cfg(test)] +mod piece_2_pr3_dispatch_tests { + //! PIECE-2 PR-3 dispatch tests. + //! + //! Proves the registration → bus.subscribe → handle_event → + //! on_artifact_available chain wires correctly for + //! ArtifactSelector::Exact, that ArtifactSelector::Prefix is a + //! pinned no-op pending separator unification, and that modules + //! NOT opted-in see no artifact dispatch (backwards-compat + //! guarantee). + //! + //! Test fixture: a tracking module that records every + //! on_artifact_available call into a shared Vec the test asserts + //! against after publishing. + use super::*; + use crate::runtime::artifact_handle::{ArtifactKey, ArtifactSelector}; + use crate::runtime::service_module::{ + CommandResult, ModuleConfig, ModulePriority, ServiceModule, + }; + use async_trait::async_trait; + use parking_lot::Mutex; + use std::any::Any; + use std::sync::Arc; + + struct RecordingModule { + name: &'static str, + subscriptions: Vec, + received: Arc>>, + } + + impl RecordingModule { + fn new( + name: &'static str, + subscriptions: Vec, + ) -> (Arc, Arc>>) { + let received = Arc::new(Mutex::new(Vec::new())); + let module = Arc::new(Self { + name, + subscriptions, + received: received.clone(), + }); + (module, received) + } + } + + #[async_trait] + impl ServiceModule for RecordingModule { + fn config(&self) -> ModuleConfig { + ModuleConfig { + name: self.name, + priority: ModulePriority::Normal, + command_prefixes: &[], + event_subscriptions: &[], + needs_dedicated_thread: false, + max_concurrency: 0, + tick_interval: None, + } + } + async fn initialize(&self, _ctx: &ModuleContext) -> Result<(), String> { + Ok(()) + } + async fn handle_command( + &self, + _command: &str, + _params: serde_json::Value, + ) -> Result { + Err("not handled".to_string()) + } + fn artifact_subscriptions(&self) -> Vec { + self.subscriptions.clone() + } + async fn on_artifact_available( + &self, + key: &ArtifactKey, + value: serde_json::Value, + ) -> Result<(), String> { + self.received.lock().push((key.clone(), value)); + Ok(()) + } + fn as_any(&self) -> &dyn Any { + self + } + } + + /// What this catches: ArtifactSelector::Exact translates to a + /// literal bus pattern. Publishing the matching key delivers via + /// the default handle_event → on_artifact_available chain; + /// publishing a non-matching key does not. + #[tokio::test] + async fn exact_selector_delivers_only_matching_key() { + let runtime = Runtime::new(); + let (module, received) = RecordingModule::new( + "exact-recorder", + vec![ArtifactSelector::Exact(ArtifactKey::from( + "paging/broker.snapshot", + ))], + ); + runtime.register(module); + + runtime + .bus() + .publish( + "paging/broker.snapshot", + serde_json::json!({"pressure": 0.42}), + runtime.registry(), + ) + .await; + + // Different key — not delivered. + runtime + .bus() + .publish( + "cognition/rate_proposals.result", + serde_json::json!({"foo": "bar"}), + runtime.registry(), + ) + .await; + + // Prefix-shaped collision — not delivered (Exact must be + // string-equality, not prefix-equality). + runtime + .bus() + .publish( + "paging/broker.snapshot.delta", + serde_json::json!({"foo": "bar"}), + runtime.registry(), + ) + .await; + + let calls = received.lock().clone(); + assert_eq!( + calls.len(), + 1, + "exact selector should deliver only the literal match; got {:?}", + calls + .iter() + .map(|(k, _)| k.as_str().to_string()) + .collect::>() + ); + assert_eq!(calls[0].0.as_str(), "paging/broker.snapshot"); + assert_eq!(calls[0].1["pressure"], 0.42); + } + + /// What this catches (PR-3 known gap): ArtifactSelector::Prefix + /// is wired but silently no-ops because bus glob_matches uses + /// colon-segmented patterns and ArtifactKey convention isn't + /// unified. This test pins the gap so a future PR that unifies + /// the separator must update this test to "Prefix actually + /// delivers." Don't delete the test — flipping it from + /// expect-zero to expect-N is the exact regression check the + /// follow-up needs. + #[tokio::test] + async fn prefix_selector_currently_no_ops_pending_separator_unification() { + let runtime = Runtime::new(); + let (module, received) = RecordingModule::new( + "prefix-recorder", + vec![ArtifactSelector::Prefix("cognition/".to_string())], + ); + runtime.register(module); + + runtime + .bus() + .publish( + "cognition/rate_proposals.result", + serde_json::json!({}), + runtime.registry(), + ) + .await; + runtime + .bus() + .publish( + "cognition/generate_recipe.result", + serde_json::json!({}), + runtime.registry(), + ) + .await; + + assert_eq!( + received.lock().len(), + 0, + "PR-3 known gap: Prefix selectors silently no-op until \ + separator convention is unified across ArtifactKey + bus \ + matcher. When unified, this assertion should become \ + assert_eq!(calls.len(), 2) and the test name updated." + ); + } + + /// What this catches: a module that declares NO artifact_subscriptions + /// receives NOTHING. Backwards-compat: every existing module + /// (HealthModule, PressureBrokerModule, …) keeps its current + /// behavior — the new default handle_event is a no-op for + /// non-opted-in modules. + #[tokio::test] + async fn module_without_artifact_subscriptions_receives_nothing() { + let runtime = Runtime::new(); + let (module, received) = RecordingModule::new("non-opted-in", vec![]); + runtime.register(module); + + runtime + .bus() + .publish( + "paging/broker.snapshot", + serde_json::json!({}), + runtime.registry(), + ) + .await; + runtime + .bus() + .publish( + "anything/at/all", + serde_json::json!({}), + runtime.registry(), + ) + .await; + + assert!( + received.lock().is_empty(), + "module with empty subscriptions must receive nothing" + ); + } + + /// What this catches: two modules with different subscription + /// sets each receive ONLY their matching events. Multi-subscriber + /// isolation. + #[tokio::test] + async fn multi_module_isolation_each_gets_only_matching_artifacts() { + let runtime = Runtime::new(); + let (a, received_a) = RecordingModule::new( + "module-a", + vec![ArtifactSelector::Exact(ArtifactKey::from( + "persona/inbox.frame_ready", + ))], + ); + let (b, received_b) = RecordingModule::new( + "module-b", + vec![ArtifactSelector::Exact(ArtifactKey::from( + "paging/broker.snapshot", + ))], + ); + runtime.register(a); + runtime.register(b); + + runtime + .bus() + .publish( + "persona/inbox.frame_ready", + serde_json::json!({"id": "frame-1"}), + runtime.registry(), + ) + .await; + runtime + .bus() + .publish( + "paging/broker.snapshot", + serde_json::json!({"pressure": 0.5}), + runtime.registry(), + ) + .await; + + let a_keys: Vec = received_a + .lock() + .iter() + .map(|(k, _)| k.as_str().to_string()) + .collect(); + let b_keys: Vec = received_b + .lock() + .iter() + .map(|(k, _)| k.as_str().to_string()) + .collect(); + assert_eq!(a_keys, vec!["persona/inbox.frame_ready".to_string()]); + assert_eq!(b_keys, vec!["paging/broker.snapshot".to_string()]); + } +} diff --git a/src/workers/continuum-core/src/runtime/service_module.rs b/src/workers/continuum-core/src/runtime/service_module.rs index 770e6cb13..b9be560a1 100644 --- a/src/workers/continuum-core/src/runtime/service_module.rs +++ b/src/workers/continuum-core/src/runtime/service_module.rs @@ -153,8 +153,32 @@ pub trait ServiceModule: Send + Sync + Any { /// Handle an event published on the message bus. /// Only called for events matching event_subscriptions globs. - /// Default: no-op (most modules only handle commands). - async fn handle_event(&self, _event_name: &str, _payload: Value) -> Result<(), String> { + /// + /// Default behavior (PIECE-2 PR-3): auto-route to + /// `on_artifact_available` when `event_name` matches one of this + /// module's `artifact_subscriptions`. This is what makes the + /// artifact dispatch path work without every module overriding + /// `handle_event` manually — the runtime subscribes the module's + /// artifact keys to the bus, the bus delivers via `handle_event`, + /// and the default impl forwards to `on_artifact_available`. + /// + /// Modules with `event_subscriptions` (glob patterns on the bus + /// that are NOT artifact keys) MUST override `handle_event` — + /// otherwise a bus event matching their glob will be silently + /// checked against `artifact_subscriptions` and dropped if it + /// doesn't match. Overriding restores explicit control; from an + /// override the module can still call + /// `self.on_artifact_available(key, payload).await` to opt into + /// the same auto-route behavior. + async fn handle_event(&self, event_name: &str, payload: Value) -> Result<(), String> { + let subs = self.artifact_subscriptions(); + if subs.is_empty() { + return Ok(()); + } + let key = ArtifactKey::from(event_name); + if subs.iter().any(|sel| sel.matches(&key)) { + return self.on_artifact_available(&key, payload).await; + } Ok(()) }