Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 79 additions & 1 deletion src/workers/continuum-core/src/runtime/message_bus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//!
//! Modules subscribe via their config().event_subscriptions.

use super::artifact_handle::{ArtifactKey, ArtifactSelector};
use dashmap::DashMap;
use std::collections::VecDeque;
use std::sync::Mutex;
Expand All @@ -23,6 +24,25 @@ struct Subscription {
synchronous: bool,
}

/// An artifact subscription record. Sibling to `Subscription` but uses
/// `ArtifactSelector::matches` (Exact / Prefix on the full
/// slash-convention key) instead of the colon-segmented `glob_matches`.
///
/// Why a separate path: `glob_matches` is built for the event-bus
/// convention `<a>:<b>:<c>` with `*` matching one segment. ArtifactKey
/// uses `<module>/<surface>.<event>` (slash + dot) and has its own
/// matcher already (`ArtifactSelector::matches`) that the producer +
/// consumer sides both agree on. Routing artifact events through
/// glob_matches forces a separator translation that doesn't exist
/// cleanly; routing them through their own matcher keeps both paths
/// honest. Event subscriptions and artifact subscriptions coexist on
/// the same MessageBus, share publish(), share record_recent — they
/// just walk different subscriber lists with different matchers.
struct ArtifactSubscription {
selector: ArtifactSelector,
module_name: &'static str,
}

/// Event payload sent through the bus.
#[derive(Debug, Clone)]
pub struct BusEvent {
Expand All @@ -49,6 +69,14 @@ pub struct MessageBus {
/// Subscriptions grouped by module name
subscriptions: DashMap<&'static str, Vec<Subscription>>,

/// Artifact subscriptions grouped by module name. Walked alongside
/// `subscriptions` on every publish, but matched via
/// `ArtifactSelector::matches` instead of `glob_matches`. PR-3 of
/// CBAR-PIECE-2 introduces this path so Prefix selectors actually
/// deliver — the prior approach of cramming ArtifactKeys through
/// the colon-segmented glob matcher only worked for Exact.
artifact_subscriptions: DashMap<&'static str, Vec<ArtifactSubscription>>,

/// Broadcast channel for async (deferred) event delivery
sender: broadcast::Sender<BusEvent>,

Expand Down Expand Up @@ -79,6 +107,7 @@ impl MessageBus {
let (sender, _) = broadcast::channel(1024);
Self {
subscriptions: DashMap::new(),
artifact_subscriptions: DashMap::new(),
sender,
recent_events: Mutex::new(VecDeque::with_capacity(RECENT_EVENT_BUFFER_SIZE)),
coalesce_tracker: DashMap::new(),
Expand Down Expand Up @@ -148,6 +177,31 @@ impl MessageBus {
self.subscriptions.entry(module_name).or_default().push(sub);
}

/// Subscribe to artifact events matching an ArtifactSelector.
///
/// Sibling to `subscribe`, but routes via `ArtifactSelector::matches`
/// (Exact / Prefix on the full slash-convention key) instead of
/// colon-segmented glob_matches. Delivery is always synchronous —
/// `on_artifact_available` is contract-bound to cheap-and-return,
/// so inline dispatch from the publisher's task is safe and avoids
/// the broadcast-channel detour that would force the runtime to
/// route back to handle_event.
///
/// Used by `Runtime::register` to wire `ServiceModule::
/// artifact_subscriptions()`. The default `handle_event` impl on
/// ServiceModule auto-forwards to `on_artifact_available` when
/// the incoming event_name matches one of this module's selectors.
pub fn subscribe_artifact(&self, selector: ArtifactSelector, module_name: &'static str) {
let sub = ArtifactSubscription {
selector,
module_name,
};
self.artifact_subscriptions
.entry(module_name)
.or_default()
.push(sub);
}

/// Get a receiver for async event delivery.
/// Modules that need async events call this during initialize().
pub fn receiver(&self) -> broadcast::Receiver<BusEvent> {
Expand All @@ -164,7 +218,7 @@ impl MessageBus {
payload: serde_json::Value,
registry: &super::ModuleRegistry,
) {
// Synchronous tier: call matching handlers inline
// Synchronous tier (glob-matched event_subscriptions): call inline.
for entry in self.subscriptions.iter() {
for sub in entry.value().iter() {
if sub.synchronous && glob_matches(&sub.pattern, event_name) {
Expand All @@ -180,6 +234,30 @@ impl MessageBus {
}
}

// Artifact tier (ArtifactSelector-matched artifact_subscriptions):
// walk the dedicated artifact subscriber list using the selector's
// own matcher. Delivers via handle_event so the default impl on
// ServiceModule (which forwards to on_artifact_available when
// the key matches one of artifact_subscriptions()) closes the
// loop. A module that overrides handle_event keeps full control;
// it can call self.on_artifact_available(...).await from inside
// its override.
let key = ArtifactKey::from(event_name);
for entry in self.artifact_subscriptions.iter() {
for sub in entry.value().iter() {
if sub.selector.matches(&key) {
if let Some(module) = registry.get_by_name(sub.module_name) {
if let Err(e) = module.handle_event(event_name, payload.clone()).await {
warn!(
"Artifact handler error: module={}, key={}, error={}",
sub.module_name, event_name, e
);
}
}
}
}
}

// Deferred tier: broadcast for async consumers
let event = BusEvent {
name: event_name.to_string(),
Expand Down
130 changes: 64 additions & 66 deletions src/workers/continuum-core/src/runtime/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,60 +82,39 @@ impl Runtime {
self.bus.subscribe(pattern, config.name, false);
}

// PIECE-2 PR-3: wire artifact_subscriptions onto the same bus.
// Each ArtifactSelector translates to a bus subscription:
// Exact(k) → bus.subscribe(k, name, true)
// Prefix(p) → KNOWN GAP, no-op + warn (see below)
// PIECE-2 PR-3 follow-up: wire artifact_subscriptions through
// MessageBus::subscribe_artifact (Exact AND Prefix supported).
//
// Subscribed `synchronous: true` so MessageBus::publish dispatches
// inline through handle_event. The async tier (synchronous=false)
// sends to a broadcast channel that nothing in the runtime
// currently auto-routes back to handle_event — synchronous=false
// would silently drop. Sync is safe because on_artifact_available
// is contract-bound to cheap-and-return (see its docstring); if
// a subscriber needs heavy work, it can `tokio::spawn` inside
// the handler.
// Original PR-3 (#1339) routed only Exact through bus.subscribe
// and emitted warn! for Prefix because the bus's glob_matches
// uses colon-segmented patterns incompatible with the
// slash-convention ArtifactKey. This follow-up adds a dedicated
// artifact subscriber path on MessageBus that uses
// ArtifactSelector::matches directly, so Prefix("cognition/")
// matches any key starting with that string without forcing a
// separator translation that doesn't exist cleanly. Event
// subscriptions (event_subscriptions on the bus) keep their
// colon-segmented glob path unchanged — the two subscriber
// lists coexist on the same MessageBus.
//
// Delivery: bus calls handle_event with event_name = key; the
// default handle_event impl in service_module.rs auto-dispatches
// to on_artifact_available when the incoming key matches one of
// Delivery is synchronous through the dedicated path because
// on_artifact_available is contract-bound to cheap-and-return.
// The bus calls handle_event with event_name = key; the default
// handle_event impl in service_module.rs auto-dispatches to
// on_artifact_available when the incoming key matches one of
// this module's artifact_subscriptions. Modules that override
// handle_event keep full control and can call
// on_artifact_available themselves if they want.
// handle_event keep full control.
//
// Cadence routing split (per airc design check w/ vhsm-scope
// airc-8a5e, 2026-05-16 19:58Z):
// Cadence::EventDriven | OnArtifact → this bus path
// Cadence::Periodic → existing tick_interval path
// Cadence::Mixed → both
// We always wire bus subscriptions when artifact_subscriptions
// is non-empty; the tick_interval path is wired separately by
// start_tick_loops.
// We always wire artifact subscriptions when
// artifact_subscriptions is non-empty; the tick_interval path
// is wired separately by start_tick_loops.
for selector in module.artifact_subscriptions() {
match selector {
super::ArtifactSelector::Exact(key) => {
self.bus.subscribe(key.as_str(), config.name, true);
}
super::ArtifactSelector::Prefix(p) => {
// KNOWN GAP: bus glob_matches (message_bus.rs:245)
// splits on `:` not `/` — Prefix("cognition/") →
// bus pattern matches nothing because the matcher
// only sees one colon-segment on either side.
// Resolving requires choosing one separator
// convention for ArtifactKey + aligning bus events
// to match. PR-3 ships Exact-only support; Prefix
// is silently no-op'd until convention is unified
// (separate slice). Pinned by a test that asserts
// the no-op so the follow-up has a regression check
// to flip.
warn!(
"Module '{}' uses ArtifactSelector::Prefix({:?}) but bus glob_matches \
uses colon-segmented patterns — prefix delivery is not wired in PR-3. \
Use Exact selectors until separator convention is unified.",
config.name, p
);
}
}
self.bus.subscribe_artifact(selector, config.name);
}

if config.max_concurrency > 0 {
Expand Down Expand Up @@ -436,11 +415,11 @@ mod piece_2_pr3_dispatch_tests {
//! PIECE-2 PR-3 dispatch tests.
//!
//! Proves the registration → bus.subscribe → handle_event →
//! on_artifact_available chain wires correctly for
//! ArtifactSelector::Exact, that ArtifactSelector::Prefix is a
//! pinned no-op pending separator unification, and that modules
//! NOT opted-in see no artifact dispatch (backwards-compat
//! guarantee).
//! on_artifact_available chain wires correctly for both
//! ArtifactSelector::Exact and ArtifactSelector::Prefix (via the
//! dedicated artifact-subscriber path on MessageBus added in the
//! follow-up to PR-3), and that modules NOT opted-in see no
//! artifact dispatch (backwards-compat guarantee).
//!
//! Test fixture: a tracking module that records every
//! on_artifact_available call into a shared Vec the test asserts
Expand Down Expand Up @@ -574,16 +553,18 @@ mod piece_2_pr3_dispatch_tests {
assert_eq!(calls[0].1["pressure"], 0.42);
}

/// What this catches (PR-3 known gap): ArtifactSelector::Prefix
/// is wired but silently no-ops because bus glob_matches uses
/// colon-segmented patterns and ArtifactKey convention isn't
/// unified. This test pins the gap so a future PR that unifies
/// the separator must update this test to "Prefix actually
/// delivers." Don't delete the test — flipping it from
/// expect-zero to expect-N is the exact regression check the
/// follow-up needs.
/// What this catches (PR-3 follow-up): ArtifactSelector::Prefix
/// now actually delivers. Original PR-3 (#1339) pinned this as
/// no-op because the routing crammed ArtifactKeys through the
/// bus's colon-segmented glob_matches. This follow-up adds a
/// dedicated artifact-subscriber path on MessageBus that uses
/// ArtifactSelector::matches directly, so Prefix("cognition/")
/// matches anything starting with that string.
///
/// Also asserts that a non-matching key is NOT delivered — the
/// bound on the prefix matters, it's not a wildcard.
#[tokio::test]
async fn prefix_selector_currently_no_ops_pending_separator_unification() {
async fn prefix_selector_delivers_matching_keys_and_skips_others() {
let runtime = Runtime::new();
let (module, received) = RecordingModule::new(
"prefix-recorder",
Expand All @@ -595,26 +576,43 @@ mod piece_2_pr3_dispatch_tests {
.bus()
.publish(
"cognition/rate_proposals.result",
serde_json::json!({}),
serde_json::json!({"score": 0.7}),
runtime.registry(),
)
.await;
runtime
.bus()
.publish(
"cognition/generate_recipe.result",
serde_json::json!({}),
serde_json::json!({"recipe_id": "abc"}),
runtime.registry(),
)
.await;

// Non-matching key — must NOT deliver.
runtime
.bus()
.publish(
"paging/broker.snapshot",
serde_json::json!({"pressure": 0.1}),
runtime.registry(),
)
.await;

let calls = received.lock().clone();
let delivered_keys: Vec<String> =
calls.iter().map(|(k, _)| k.as_str().to_string()).collect();
assert_eq!(
received.lock().len(),
0,
"PR-3 known gap: Prefix selectors silently no-op until \
separator convention is unified across ArtifactKey + bus \
matcher. When unified, this assertion should become \
assert_eq!(calls.len(), 2) and the test name updated."
calls.len(),
2,
"Prefix selector should deliver both cognition/* keys; got {:?}",
delivered_keys
);
assert!(delivered_keys.contains(&"cognition/rate_proposals.result".to_string()));
assert!(delivered_keys.contains(&"cognition/generate_recipe.result".to_string()));
assert!(
!delivered_keys.contains(&"paging/broker.snapshot".to_string()),
"Prefix is a bound, not a wildcard — keys outside the prefix must not deliver"
);
}

Expand Down
Loading