From c2724f6d1f8564ec923a4af58843877afb02163e Mon Sep 17 00:00:00 2001 From: Test Date: Sat, 16 May 2026 17:07:15 -0500 Subject: [PATCH] feat(runtime): real Prefix dispatch via dedicated artifact path on MessageBus MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to #1339 (CBAR-PIECE-2 PR-3 — artifact dispatch via bus). What this fixes - PR-3 routed ArtifactSelector::Exact through the bus's standard glob_matches path, which works for Exact but fails for Prefix: glob_matches splits on `:` not `/`, so Prefix("cognition/") matches nothing through the existing matcher. PR-3 emitted warn! and pinned the no-op with a regression test. What this changes - Add MessageBus::subscribe_artifact(selector, module_name) — sibling to MessageBus::subscribe but routes via ArtifactSelector::matches (Exact / Prefix on the full slash-convention key) instead of the colon-segmented glob_matches. - MessageBus::publish now walks the artifact subscriber list in addition to the event subscriber list. Two coexisting matchers on the same publish path: event_subscriptions → glob_matches (colon-segmented) artifact_subscriptions → ArtifactSelector::matches (full key) - Runtime::register routes all ArtifactSelector variants (Exact AND Prefix) through subscribe_artifact. No more warn!, no separator translation, no PR-3-shaped gap. - Delivery is synchronous through the dedicated path because on_artifact_available is contract-bound to cheap-and-return. Tests - runtime/runtime.rs piece_2_pr3_dispatch_tests prefix_selector_currently_no_ops_pending_separator_unification renamed and flipped to prefix_selector_delivers_matching_keys_and_skips_others — verifies BOTH that the selector delivers matching keys AND that non-matching keys (different prefix) are correctly excluded. - All 42 runtime:: tests pass (no regressions on the Exact, empty- subscriptions, or multi-module isolation tests). Why a dedicated path instead of unifying the separator - ArtifactKey convention is `/.` (slash + dot); the event bus convention is `::` (colon-segmented). They're semantically different — events are colon-segmented for per-segment globbing (`data:*:created`), artifacts are slash/dot-structured for module/surface namespacing without glob semantics. ArtifactSelector::matches is the right matcher for the latter; glob_matches is the right matcher for the former. Forcing one to fit the other would muddy both. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../continuum-core/src/runtime/message_bus.rs | 80 ++++++++++- .../continuum-core/src/runtime/runtime.rs | 130 +++++++++--------- 2 files changed, 143 insertions(+), 67 deletions(-) diff --git a/src/workers/continuum-core/src/runtime/message_bus.rs b/src/workers/continuum-core/src/runtime/message_bus.rs index ac5735bc3..a2926a9bd 100644 --- a/src/workers/continuum-core/src/runtime/message_bus.rs +++ b/src/workers/continuum-core/src/runtime/message_bus.rs @@ -6,6 +6,7 @@ //! //! Modules subscribe via their config().event_subscriptions. +use super::artifact_handle::{ArtifactKey, ArtifactSelector}; use dashmap::DashMap; use std::collections::VecDeque; use std::sync::Mutex; @@ -23,6 +24,25 @@ struct Subscription { synchronous: bool, } +/// An artifact subscription record. Sibling to `Subscription` but uses +/// `ArtifactSelector::matches` (Exact / Prefix on the full +/// slash-convention key) instead of the colon-segmented `glob_matches`. +/// +/// Why a separate path: `glob_matches` is built for the event-bus +/// convention `::` with `*` matching one segment. ArtifactKey +/// uses `/.` (slash + dot) and has its own +/// matcher already (`ArtifactSelector::matches`) that the producer + +/// consumer sides both agree on. Routing artifact events through +/// glob_matches forces a separator translation that doesn't exist +/// cleanly; routing them through their own matcher keeps both paths +/// honest. Event subscriptions and artifact subscriptions coexist on +/// the same MessageBus, share publish(), share record_recent — they +/// just walk different subscriber lists with different matchers. +struct ArtifactSubscription { + selector: ArtifactSelector, + module_name: &'static str, +} + /// Event payload sent through the bus. #[derive(Debug, Clone)] pub struct BusEvent { @@ -49,6 +69,14 @@ pub struct MessageBus { /// Subscriptions grouped by module name subscriptions: DashMap<&'static str, Vec>, + /// Artifact subscriptions grouped by module name. Walked alongside + /// `subscriptions` on every publish, but matched via + /// `ArtifactSelector::matches` instead of `glob_matches`. PR-3 of + /// CBAR-PIECE-2 introduces this path so Prefix selectors actually + /// deliver — the prior approach of cramming ArtifactKeys through + /// the colon-segmented glob matcher only worked for Exact. + artifact_subscriptions: DashMap<&'static str, Vec>, + /// Broadcast channel for async (deferred) event delivery sender: broadcast::Sender, @@ -79,6 +107,7 @@ impl MessageBus { let (sender, _) = broadcast::channel(1024); Self { subscriptions: DashMap::new(), + artifact_subscriptions: DashMap::new(), sender, recent_events: Mutex::new(VecDeque::with_capacity(RECENT_EVENT_BUFFER_SIZE)), coalesce_tracker: DashMap::new(), @@ -148,6 +177,31 @@ impl MessageBus { self.subscriptions.entry(module_name).or_default().push(sub); } + /// Subscribe to artifact events matching an ArtifactSelector. + /// + /// Sibling to `subscribe`, but routes via `ArtifactSelector::matches` + /// (Exact / Prefix on the full slash-convention key) instead of + /// colon-segmented glob_matches. Delivery is always synchronous — + /// `on_artifact_available` is contract-bound to cheap-and-return, + /// so inline dispatch from the publisher's task is safe and avoids + /// the broadcast-channel detour that would force the runtime to + /// route back to handle_event. + /// + /// Used by `Runtime::register` to wire `ServiceModule:: + /// artifact_subscriptions()`. The default `handle_event` impl on + /// ServiceModule auto-forwards to `on_artifact_available` when + /// the incoming event_name matches one of this module's selectors. + pub fn subscribe_artifact(&self, selector: ArtifactSelector, module_name: &'static str) { + let sub = ArtifactSubscription { + selector, + module_name, + }; + self.artifact_subscriptions + .entry(module_name) + .or_default() + .push(sub); + } + /// Get a receiver for async event delivery. /// Modules that need async events call this during initialize(). pub fn receiver(&self) -> broadcast::Receiver { @@ -164,7 +218,7 @@ impl MessageBus { payload: serde_json::Value, registry: &super::ModuleRegistry, ) { - // Synchronous tier: call matching handlers inline + // Synchronous tier (glob-matched event_subscriptions): call inline. for entry in self.subscriptions.iter() { for sub in entry.value().iter() { if sub.synchronous && glob_matches(&sub.pattern, event_name) { @@ -180,6 +234,30 @@ impl MessageBus { } } + // Artifact tier (ArtifactSelector-matched artifact_subscriptions): + // walk the dedicated artifact subscriber list using the selector's + // own matcher. Delivers via handle_event so the default impl on + // ServiceModule (which forwards to on_artifact_available when + // the key matches one of artifact_subscriptions()) closes the + // loop. A module that overrides handle_event keeps full control; + // it can call self.on_artifact_available(...).await from inside + // its override. + let key = ArtifactKey::from(event_name); + for entry in self.artifact_subscriptions.iter() { + for sub in entry.value().iter() { + if sub.selector.matches(&key) { + if let Some(module) = registry.get_by_name(sub.module_name) { + if let Err(e) = module.handle_event(event_name, payload.clone()).await { + warn!( + "Artifact handler error: module={}, key={}, error={}", + sub.module_name, event_name, e + ); + } + } + } + } + } + // Deferred tier: broadcast for async consumers let event = BusEvent { name: event_name.to_string(), diff --git a/src/workers/continuum-core/src/runtime/runtime.rs b/src/workers/continuum-core/src/runtime/runtime.rs index 0d0107229..b7b471c3a 100644 --- a/src/workers/continuum-core/src/runtime/runtime.rs +++ b/src/workers/continuum-core/src/runtime/runtime.rs @@ -82,60 +82,39 @@ impl Runtime { self.bus.subscribe(pattern, config.name, false); } - // PIECE-2 PR-3: wire artifact_subscriptions onto the same bus. - // Each ArtifactSelector translates to a bus subscription: - // Exact(k) → bus.subscribe(k, name, true) - // Prefix(p) → KNOWN GAP, no-op + warn (see below) + // PIECE-2 PR-3 follow-up: wire artifact_subscriptions through + // MessageBus::subscribe_artifact (Exact AND Prefix supported). // - // Subscribed `synchronous: true` so MessageBus::publish dispatches - // inline through handle_event. The async tier (synchronous=false) - // sends to a broadcast channel that nothing in the runtime - // currently auto-routes back to handle_event — synchronous=false - // would silently drop. Sync is safe because on_artifact_available - // is contract-bound to cheap-and-return (see its docstring); if - // a subscriber needs heavy work, it can `tokio::spawn` inside - // the handler. + // Original PR-3 (#1339) routed only Exact through bus.subscribe + // and emitted warn! for Prefix because the bus's glob_matches + // uses colon-segmented patterns incompatible with the + // slash-convention ArtifactKey. This follow-up adds a dedicated + // artifact subscriber path on MessageBus that uses + // ArtifactSelector::matches directly, so Prefix("cognition/") + // matches any key starting with that string without forcing a + // separator translation that doesn't exist cleanly. Event + // subscriptions (event_subscriptions on the bus) keep their + // colon-segmented glob path unchanged — the two subscriber + // lists coexist on the same MessageBus. // - // Delivery: bus calls handle_event with event_name = key; the - // default handle_event impl in service_module.rs auto-dispatches - // to on_artifact_available when the incoming key matches one of + // Delivery is synchronous through the dedicated path because + // on_artifact_available is contract-bound to cheap-and-return. + // The bus calls handle_event with event_name = key; the default + // handle_event impl in service_module.rs auto-dispatches to + // on_artifact_available when the incoming key matches one of // this module's artifact_subscriptions. Modules that override - // handle_event keep full control and can call - // on_artifact_available themselves if they want. + // handle_event keep full control. // // Cadence routing split (per airc design check w/ vhsm-scope // airc-8a5e, 2026-05-16 19:58Z): // Cadence::EventDriven | OnArtifact → this bus path // Cadence::Periodic → existing tick_interval path // Cadence::Mixed → both - // We always wire bus subscriptions when artifact_subscriptions - // is non-empty; the tick_interval path is wired separately by - // start_tick_loops. + // We always wire artifact subscriptions when + // artifact_subscriptions is non-empty; the tick_interval path + // is wired separately by start_tick_loops. for selector in module.artifact_subscriptions() { - match selector { - super::ArtifactSelector::Exact(key) => { - self.bus.subscribe(key.as_str(), config.name, true); - } - super::ArtifactSelector::Prefix(p) => { - // KNOWN GAP: bus glob_matches (message_bus.rs:245) - // splits on `:` not `/` — Prefix("cognition/") → - // bus pattern matches nothing because the matcher - // only sees one colon-segment on either side. - // Resolving requires choosing one separator - // convention for ArtifactKey + aligning bus events - // to match. PR-3 ships Exact-only support; Prefix - // is silently no-op'd until convention is unified - // (separate slice). Pinned by a test that asserts - // the no-op so the follow-up has a regression check - // to flip. - warn!( - "Module '{}' uses ArtifactSelector::Prefix({:?}) but bus glob_matches \ - uses colon-segmented patterns — prefix delivery is not wired in PR-3. \ - Use Exact selectors until separator convention is unified.", - config.name, p - ); - } - } + self.bus.subscribe_artifact(selector, config.name); } if config.max_concurrency > 0 { @@ -436,11 +415,11 @@ mod piece_2_pr3_dispatch_tests { //! PIECE-2 PR-3 dispatch tests. //! //! Proves the registration → bus.subscribe → handle_event → - //! on_artifact_available chain wires correctly for - //! ArtifactSelector::Exact, that ArtifactSelector::Prefix is a - //! pinned no-op pending separator unification, and that modules - //! NOT opted-in see no artifact dispatch (backwards-compat - //! guarantee). + //! on_artifact_available chain wires correctly for both + //! ArtifactSelector::Exact and ArtifactSelector::Prefix (via the + //! dedicated artifact-subscriber path on MessageBus added in the + //! follow-up to PR-3), and that modules NOT opted-in see no + //! artifact dispatch (backwards-compat guarantee). //! //! Test fixture: a tracking module that records every //! on_artifact_available call into a shared Vec the test asserts @@ -574,16 +553,18 @@ mod piece_2_pr3_dispatch_tests { assert_eq!(calls[0].1["pressure"], 0.42); } - /// What this catches (PR-3 known gap): ArtifactSelector::Prefix - /// is wired but silently no-ops because bus glob_matches uses - /// colon-segmented patterns and ArtifactKey convention isn't - /// unified. This test pins the gap so a future PR that unifies - /// the separator must update this test to "Prefix actually - /// delivers." Don't delete the test — flipping it from - /// expect-zero to expect-N is the exact regression check the - /// follow-up needs. + /// What this catches (PR-3 follow-up): ArtifactSelector::Prefix + /// now actually delivers. Original PR-3 (#1339) pinned this as + /// no-op because the routing crammed ArtifactKeys through the + /// bus's colon-segmented glob_matches. This follow-up adds a + /// dedicated artifact-subscriber path on MessageBus that uses + /// ArtifactSelector::matches directly, so Prefix("cognition/") + /// matches anything starting with that string. + /// + /// Also asserts that a non-matching key is NOT delivered — the + /// bound on the prefix matters, it's not a wildcard. #[tokio::test] - async fn prefix_selector_currently_no_ops_pending_separator_unification() { + async fn prefix_selector_delivers_matching_keys_and_skips_others() { let runtime = Runtime::new(); let (module, received) = RecordingModule::new( "prefix-recorder", @@ -595,7 +576,7 @@ mod piece_2_pr3_dispatch_tests { .bus() .publish( "cognition/rate_proposals.result", - serde_json::json!({}), + serde_json::json!({"score": 0.7}), runtime.registry(), ) .await; @@ -603,18 +584,35 @@ mod piece_2_pr3_dispatch_tests { .bus() .publish( "cognition/generate_recipe.result", - serde_json::json!({}), + serde_json::json!({"recipe_id": "abc"}), runtime.registry(), ) .await; + // Non-matching key — must NOT deliver. + runtime + .bus() + .publish( + "paging/broker.snapshot", + serde_json::json!({"pressure": 0.1}), + runtime.registry(), + ) + .await; + + let calls = received.lock().clone(); + let delivered_keys: Vec = + calls.iter().map(|(k, _)| k.as_str().to_string()).collect(); assert_eq!( - received.lock().len(), - 0, - "PR-3 known gap: Prefix selectors silently no-op until \ - separator convention is unified across ArtifactKey + bus \ - matcher. When unified, this assertion should become \ - assert_eq!(calls.len(), 2) and the test name updated." + calls.len(), + 2, + "Prefix selector should deliver both cognition/* keys; got {:?}", + delivered_keys + ); + assert!(delivered_keys.contains(&"cognition/rate_proposals.result".to_string())); + assert!(delivered_keys.contains(&"cognition/generate_recipe.result".to_string())); + assert!( + !delivered_keys.contains(&"paging/broker.snapshot".to_string()), + "Prefix is a bound, not a wildcard — keys outside the prefix must not deliver" ); }