From 137f01b7412679b2085fc9953a78c39616933db8 Mon Sep 17 00:00:00 2001 From: Test Date: Sat, 16 May 2026 19:21:53 -0500 Subject: [PATCH] =?UTF-8?q?feat(genome):=20working-set-manager=20PR-5=20?= =?UTF-8?q?=E2=80=94=20LocalWorkingSetManager=20auto-publishes=20via=20bus?= =?UTF-8?q?=20hook?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the genome stack end-to-end. The artifact dispatch path I shipped in #1339+#1343 + the PR-4 publishing helpers + the PR-3 LocalWorkingSetManager all wire together so a persona's `page_in` / `audit_access` calls fan typed events out to subscribers (audit- recorder #1344 on AccessDenied, future sentinel-observer on PageFault, future demand-aligned-recall on PageFault). What lands - `LocalWorkingSetManager::with_bus(tiers, bus, registry)` — optional bus hook stored as `Option` on the manager. Constructed once at startup; switching publishing on/off mid- service would race + is not supported. - Auto-publish on: - `page_in` returning `PageFault` (true cold miss OR tier promotion) → publishes via `publish_page_fault` under `PAGE_FAULT_KEY` - `audit_access` returning `AccessDenied` → publishes via `publish_access_denied` under `ACCESS_DENIED_KEY` - Both via `tokio::runtime::Handle::try_current().spawn(...)` — see "Why spawn instead of await" below. - `LocalWorkingSetManager::new(tiers)` (PR-3 shape) preserved unchanged: bus-less mode for tests + standalone use. - `Runtime::bus_arc()` accessor added — returns Arc for long-lived publishers (like LocalWorkingSetManager wired via with_bus) that need to hold their own bus reference. Why spawn instead of await `bus.publish` walks DashMap subscriber lists; the DashMap's `Map` trait impl is keyed by `&'static str` and that doesn't satisfy the "for any lifetime" requirement when the call sits inside a Send- bounded `async fn` (which `async_trait` generates for trait method impls). Spawning into a tokio task decouples the publish from the caller's Send-ness — the spawned future owns its Arc captures, no borrow crosses the await boundary in the caller. Sub-fix in MessageBus::publish While debugging the lifetime issue, found that `MessageBus::publish` held the DashMap borrow across the `await module.handle_event(...)` call inside both its glob_matched + artifact_matched walks. That's the actual root cause of the "DashMap is not general enough" error when publish is called from spawn-contexts. Refactored both walks to collect matching `module_name: &'static str` into a `Vec` first (dropping the DashMap borrow), then await dispatch from the Vec. Same semantics, no more borrow-across-await — `publish` is now safe to call from any Send-bounded async context. Tests 6 new tests on genome::local_manager::pr5 sub-section: - page_in_true_cold_miss_with_bus_publishes_page_fault — end-to-end Runtime + RecorderModule + with_bus + page_in → spawn → publish → subscriber. Yields with tokio::task::yield_now in a bounded loop to let the spawn complete (no fixed sleep). - page_in_tier_promotion_with_bus_publishes_correct_fields — from_role/to_role correctness through the spawn path. - page_in_resident_hit_with_bus_does_not_publish — resident-hit path stays silent (no noisy events for hot pages). - audit_access_denial_with_bus_publishes_via_spawn — same spawn pattern, but from the sync audit_access trait method. - audit_access_allowed_with_bus_does_not_publish — only denials are observable events. - bus_less_mode_does_not_publish_but_methods_work — backwards- compat for the standalone `new(tiers)` constructor. 69 genome:: tests total (PR-1's 35 + PR-2's 13 + PR-3's 8 + PR-4's 7 + PR-5's 6). All pass, no regressions across other 2615 lib tests. The MessageBus refactor is a load-bearing improvement to the bus itself — any future caller that wants to publish from a Send-bounded spawn context (which is most non-trivial integration code) benefits. Caught it on the genome integration; landing the fix here keeps the stack reviewable as one slice. Stack #1339 / #1343 — CBAR-PIECE-2 PR-3 artifact dispatch + Prefix follow-up (mine; the dispatch path PR-5 publishes through) #1344 — audit-recorder (codex's, now wired-in via AccessDenied) #1346 — working-set-manager PR-1: data types #1353 — working-set-manager PR-2: traits #1355 — working-set-manager PR-3: LocalWorkingSetManager #1358 — working-set-manager PR-4: bus keys + publishing helpers THIS PR — working-set-manager PR-5: auto-publish wiring (this is the architectural payoff of the whole genome stack) Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/genome/local_manager.rs | 449 +++++++++++++++++- .../continuum-core/src/runtime/message_bus.rs | 74 ++- .../continuum-core/src/runtime/runtime.rs | 9 + 3 files changed, 496 insertions(+), 36 deletions(-) diff --git a/src/workers/continuum-core/src/genome/local_manager.rs b/src/workers/continuum-core/src/genome/local_manager.rs index 05a40f3eb..2cdd85698 100644 --- a/src/workers/continuum-core/src/genome/local_manager.rs +++ b/src/workers/continuum-core/src/genome/local_manager.rs @@ -44,6 +44,7 @@ use parking_lot::RwLock; use std::collections::HashMap; use std::sync::Arc; +use super::bus::{publish_access_denied, publish_page_fault}; use super::manager::WorkingSetManager; use super::store::TierStore; use super::tier::{TierError, TierRole}; @@ -51,10 +52,33 @@ use super::working_set::{ AccessDenied, PageFault, PageHandle, PageRef, PersonaId, ResidentPage, WorkingSet, WorkingSetCapacity, }; +use crate::runtime::message_bus::MessageBus; +use crate::runtime::registry::ModuleRegistry; + +/// Optional bus + registry handle for auto-publishing genome events. +/// When set on a `LocalWorkingSetManager`, every `page_in`/ +/// `audit_access` call that produces a typed event also publishes the +/// event via the artifact dispatch path (#1339+#1343) using the +/// canonical keys from `genome::bus` (PR-4 / #1358). +/// +/// Kept as one struct (not two Arcs on the manager) so the absence-of- +/// bus case is a single `Option` field — easier to reason +/// about than two correlated Options. +struct BusHook { + bus: Arc, + registry: Arc, +} /// Per-process working-set manager. Holds the tier chain + per-persona /// state. Thread-safe through `parking_lot::RwLock` — the hot-path /// `audit_access` and `working_set` calls only need a read lock. +/// +/// PR-5 adds optional bus publishing: when constructed via +/// `with_bus(tiers, bus, registry)`, every page_in / audit_access +/// call publishes the typed event to the trace bus through the +/// canonical genome keys. Constructed via `new(tiers)` (the PR-3 +/// shape), the manager stays bus-less and behaves exactly as before +/// — useful for tests + standalone use where no runtime is around. pub struct LocalWorkingSetManager { /// The tier chain, ordered highest (Fast) to lowest (Frozen). /// Each tier is a `Box` from PR-2. The order is @@ -69,17 +93,53 @@ pub struct LocalWorkingSetManager { /// this via `register_page_owner`; PR-4 may move to a typed /// genome-region-keyed table per GENOME-FOUNDRY-SENTINEL Part 4. page_owners: RwLock>, + /// Optional bus hook for auto-publishing events. `None` = bus-less + /// mode (PR-3 behavior, no publishing). `Some` = wire every typed + /// event to the artifact dispatch path via the genome::bus + /// helpers shipped in PR-4. + bus_hook: Option, } impl LocalWorkingSetManager { - /// Construct with the tier chain. The vec is in walk order: - /// `tiers[0]` is the highest tier (Fast — checked first by - /// `page_in`); `tiers[N-1]` is the lowest (typically Frozen). + /// Construct with the tier chain — bus-less mode (PR-3 shape). + /// Page events are returned through the trait's `Result` arms but + /// NOT published to any bus. Useful for tests and standalone use + /// where no runtime is around. pub fn new(tiers: Vec>) -> Self { Self { tiers, working_sets: RwLock::new(HashMap::new()), page_owners: RwLock::new(HashMap::new()), + bus_hook: None, + } + } + + /// Construct with the tier chain + auto-publishing bus hook. + /// Every `page_in` that returns a `PageFault` AND every + /// `audit_access` denial publishes the typed event via the + /// `genome::bus` helpers (PR-4 / #1358) under the canonical + /// genome keys. + /// + /// `bus` + `registry` must be from the same Runtime — publishing + /// uses `bus.publish` which looks up modules via the registry. + /// Subscribers register through `bus.subscribe_artifact` for the + /// genome keys (typically via `subscribe_to_genome_events(bus, + /// module_name)` from PR-4). + /// + /// Why a separate constructor instead of a setter: prevents the + /// "bus added partway through service" race where some events + /// are published and some aren't. The manager either publishes + /// from construction onward, or never — no in-between state. + pub fn with_bus( + tiers: Vec>, + bus: Arc, + registry: Arc, + ) -> Self { + Self { + tiers, + working_sets: RwLock::new(HashMap::new()), + page_owners: RwLock::new(HashMap::new()), + bus_hook: Some(BusHook { bus, registry }), } } @@ -154,24 +214,27 @@ impl WorkingSetManager for LocalWorkingSetManager { ); } - // Return PageFault to signal the caller "this was a - // tier promotion" — they'll publish to the trace bus. - // The handle is in the Err arm; the spec uses this - // typed signal to capture sentinel observability - // without confusing it with a failure. - return Err(PageFault { + // Tier-promotion PageFault. Publish to bus if hook + // present (PR-5 wiring; PR-3 contract — Err arm is + // the typed sentinel observability signal, not a + // failure), then return. + let fault = PageFault { page, from_role: Some(from_role), to_role, persona, elapsed_us: 0, eviction_cost: None, - }); + }; + if let Some(hook) = &self.bus_hook { + spawn_publish_page_fault(hook, fault.clone()); + } + return Err(fault); } } // True cold miss — page doesn't exist in any tier yet. - Err(PageFault { + let fault = PageFault { page, from_role: None, to_role: self @@ -182,7 +245,11 @@ impl WorkingSetManager for LocalWorkingSetManager { persona, elapsed_us: 0, eviction_cost: None, - }) + }; + if let Some(hook) = &self.bus_hook { + spawn_publish_page_fault(hook, fault.clone()); + } + Err(fault) } async fn page_out( @@ -249,7 +316,7 @@ impl WorkingSetManager for LocalWorkingSetManager { persona: PersonaId, page: PageRef, ) -> Result<(), AccessDenied> { - match self.page_owners.read().get(&page).copied() { + let result: Result<(), AccessDenied> = match self.page_owners.read().get(&page).copied() { Some(owner) if owner != persona => Err(AccessDenied { actor: persona, page, @@ -257,7 +324,16 @@ impl WorkingSetManager for LocalWorkingSetManager { reason: "cross-persona read blocked by working-set MMU".to_string(), }), _ => Ok(()), + }; + + // Auto-publish on denial via the spawn helper (same lifetime- + // workaround pattern as page_in — see spawn_publish_page_fault + // for the rationale). + if let (Err(ref denied), Some(hook)) = (&result, &self.bus_hook) { + spawn_publish_access_denied(hook, denied.clone()); } + + result } } @@ -270,6 +346,48 @@ impl LocalWorkingSetManager { } } +/// Spawn a `publish_page_fault` into the current tokio runtime. +/// Standalone fn (not a method) so the `&BusHook` borrow doesn't +/// outlive the spawn — Arcs get cloned out first, then the spawned +/// future owns its captures. +/// +/// Why spawn instead of await: `bus.publish` walks the DashMap of +/// subscribers; the DashMap's `Map` trait impl has a specific +/// lifetime that doesn't satisfy the for-any-lifetime requirement +/// generated by `async_trait`'s `Send`-bounded future. Awaiting +/// `publish` inside the trait method's body trips a +/// "DashMap is not general enough" error. Spawning decouples the +/// publish from the caller's Send-ness — no borrow crosses the await +/// boundary in the caller's future. +/// +/// If no tokio runtime is current (rare — only sync-only test paths +/// without `#[tokio::test]`), the spawn is skipped silently because +/// `Handle::try_current` returns Err. The typed event in the +/// returned `Result` is still authoritative; observability is +/// best-effort. +fn spawn_publish_page_fault(hook: &BusHook, fault: PageFault) { + if let Ok(handle) = tokio::runtime::Handle::try_current() { + let bus = hook.bus.clone(); + let registry = hook.registry.clone(); + handle.spawn(async move { + publish_page_fault(&bus, ®istry, &fault).await; + }); + } +} + +/// Spawn a `publish_access_denied` into the current tokio runtime. +/// Same pattern as `spawn_publish_page_fault`; used by the sync +/// `audit_access` trait method. +fn spawn_publish_access_denied(hook: &BusHook, denied: AccessDenied) { + if let Ok(handle) = tokio::runtime::Handle::try_current() { + let bus = hook.bus.clone(); + let registry = hook.registry.clone(); + handle.spawn(async move { + publish_access_denied(&bus, ®istry, &denied).await; + }); + } +} + /// Unix-ms timestamp. Used by `ResidentPage.last_access_ms` to record /// the wall-clock of a page promotion. Tests pass a fixed value to a /// stub clock; production reads `SystemTime::now()`. @@ -621,4 +739,309 @@ mod tests { ]); assert_eq!(mgr.tier_count(), 4); } + + // ─── PR-5 bus-publishing tests ────────────────────────────── + + use crate::genome::bus::{ + all_genome_artifact_selectors, ACCESS_DENIED_KEY, PAGE_FAULT_KEY, + }; + use crate::runtime::artifact_handle::{ArtifactKey, ArtifactSelector}; + use crate::runtime::runtime::Runtime; + use crate::runtime::service_module::{ + CommandResult, ModuleConfig, ModulePriority, ServiceModule, + }; + use std::any::Any; + + /// Recording subscriber for the PR-5 bus tests. Captures every + /// (artifact_key, payload) so the test can assert which fired. + struct RecorderModule { + captured: Arc>>, + } + + impl RecorderModule { + fn new() -> (Arc, Arc>>) { + let captured = Arc::new(Mutex::new(Vec::new())); + let module = Arc::new(Self { + captured: captured.clone(), + }); + (module, captured) + } + } + + #[async_trait] + impl ServiceModule for RecorderModule { + fn config(&self) -> ModuleConfig { + ModuleConfig { + name: "pr5-recorder", + priority: ModulePriority::Normal, + command_prefixes: &[], + event_subscriptions: &[], + needs_dedicated_thread: false, + max_concurrency: 0, + tick_interval: None, + } + } + async fn initialize( + &self, + _ctx: &crate::runtime::ModuleContext, + ) -> Result<(), String> { + Ok(()) + } + async fn handle_command( + &self, + _: &str, + _: serde_json::Value, + ) -> Result { + Err("not handled".to_string()) + } + fn artifact_subscriptions(&self) -> Vec { + all_genome_artifact_selectors() + } + async fn on_artifact_available( + &self, + key: &ArtifactKey, + payload: serde_json::Value, + ) -> Result<(), String> { + self.captured.lock().push((key.as_str().to_string(), payload)); + Ok(()) + } + fn as_any(&self) -> &dyn Any { + self + } + } + + /// Helper: construct a Runtime + LocalWorkingSetManager wired + /// through it. Returns the manager + the recorder's captured + /// events. Used by the next several tests. + async fn wire_manager_to_runtime( + tiers: Vec>, + ) -> ( + LocalWorkingSetManager, + Arc, + Arc>>, + ) { + // Build runtime, register recorder. + let runtime = Arc::new(Runtime::new()); + let (recorder, captured) = RecorderModule::new(); + runtime.register(recorder); + + // Pull bus + registry as Arcs via the helper accessors. + // Runtime exposes `bus_arc()` and `registry_arc()` for this. + let bus = runtime.bus_arc(); + let registry = runtime.registry_arc(); + + let mgr = LocalWorkingSetManager::with_bus(tiers, bus, registry); + (mgr, runtime, captured) + } + + /// What this catches: with the bus hook wired, `page_in` for a + /// true cold miss (no tier has the page) publishes a PageFault + /// with `from_role: None`. The whole chain — manager → + /// publish_page_fault → bus.subscribe_artifact → recorder + /// on_artifact_available — fires end-to-end. + #[tokio::test] + async fn page_in_true_cold_miss_with_bus_publishes_page_fault() { + let cold = StubTier::new(TierRole::Cold, vec![]); + let fast = StubTier::new(TierRole::Fast, vec![]); + let (mgr, _runtime, captured) = + wire_manager_to_runtime(vec![fast, cold]).await; + + let persona = make_persona(30); + mgr.register_persona(persona, capacity_uma()); + + let page = make_page(31); + let result = mgr.page_in(persona, page).await; + assert!(result.is_err(), "true cold miss returns Err(PageFault)"); + + // Yield to let the spawned publish task run. + for _ in 0..50 { + tokio::task::yield_now().await; + if !captured.lock().is_empty() { + break; + } + } + + let events = captured.lock().clone(); + let faults: Vec<_> = events + .iter() + .filter(|(k, _)| k == PAGE_FAULT_KEY) + .collect(); + assert_eq!(faults.len(), 1, "exactly one PageFault published"); + let fault: PageFault = serde_json::from_value(faults[0].1.clone()).unwrap(); + assert_eq!(fault.from_role, None, "true cold miss has no from_role"); + assert_eq!(fault.persona, persona); + assert_eq!(fault.page, page); + } + + /// What this catches: page_in tier-promotion (page exists in Cold, + /// promoted to Fast) publishes a PageFault with from_role=Some(Cold) + /// and to_role=Fast. Sentinel uses this to learn the persona's + /// promotion pattern. + #[tokio::test] + async fn page_in_tier_promotion_with_bus_publishes_correct_fields() { + let page = make_page(40); + let cold = StubTier::new(TierRole::Cold, vec![page]); + let fast = StubTier::new(TierRole::Fast, vec![]); + let (mgr, _runtime, captured) = + wire_manager_to_runtime(vec![fast, cold]).await; + + let persona = make_persona(41); + mgr.register_persona(persona, capacity_uma()); + + let _ = mgr.page_in(persona, page).await; + + for _ in 0..50 { + tokio::task::yield_now().await; + if !captured.lock().is_empty() { + break; + } + } + + let events = captured.lock().clone(); + let faults: Vec<_> = events + .iter() + .filter(|(k, _)| k == PAGE_FAULT_KEY) + .collect(); + assert_eq!(faults.len(), 1); + let fault: PageFault = serde_json::from_value(faults[0].1.clone()).unwrap(); + assert_eq!(fault.from_role, Some(TierRole::Cold)); + assert_eq!(fault.to_role, TierRole::Fast); + } + + /// What this catches: page_in resident-hit (page already in the + /// working set) does NOT publish a PageFault. PageFault is only + /// for misses — pinning the resident-hit path's silence prevents + /// noisy events for hot pages. + #[tokio::test] + async fn page_in_resident_hit_with_bus_does_not_publish() { + let page = make_page(50); + let fast = StubTier::new(TierRole::Fast, vec![page]); + let (mgr, _runtime, captured) = wire_manager_to_runtime(vec![fast]).await; + + let persona = make_persona(51); + mgr.register_persona(persona, capacity_uma()); + + // First call: tier promotion → 1 PageFault published. + let _ = mgr.page_in(persona, page).await; + for _ in 0..50 { + tokio::task::yield_now().await; + if !captured.lock().is_empty() { + break; + } + } + assert_eq!( + captured.lock().iter().filter(|(k, _)| k == PAGE_FAULT_KEY).count(), + 1 + ); + + // Second call: resident hit → NO additional PageFault. + let _ = mgr.page_in(persona, page).await; + // Yield a few times to give any incorrectly-spawned publish a + // chance to run — we want to assert no additional event. + for _ in 0..20 { + tokio::task::yield_now().await; + } + assert_eq!( + captured.lock().iter().filter(|(k, _)| k == PAGE_FAULT_KEY).count(), + 1, + "resident-hit path must not publish" + ); + } + + /// What this catches: audit_access denial spawns a publish through + /// the current tokio runtime. The sync trait method returns + /// immediately; the publish completes asynchronously. Test polls + /// briefly because the spawn isn't synchronously joined. + #[tokio::test] + async fn audit_access_denial_with_bus_publishes_via_spawn() { + let fast = StubTier::new(TierRole::Fast, vec![]); + let (mgr, _runtime, captured) = wire_manager_to_runtime(vec![fast]).await; + + let owner = make_persona(60); + let intruder = make_persona(61); + let page = make_page(62); + mgr.register_persona(owner, capacity_uma()); + mgr.register_persona(intruder, capacity_uma()); + mgr.register_page_owner(page, owner); + + // Cross-persona access — Err returned immediately, publish + // spawned. + let result = mgr.audit_access(intruder, page); + assert!(result.is_err()); + + // Yield so the spawned publish task gets a chance to run. + // tokio::yield_now() inside a loop bounded by attempts is the + // safe way to wait without a fixed sleep. + for _ in 0..50 { + tokio::task::yield_now().await; + if !captured.lock().is_empty() { + break; + } + } + + let events = captured.lock().clone(); + let denied_events: Vec<_> = events + .iter() + .filter(|(k, _)| k == ACCESS_DENIED_KEY) + .collect(); + assert_eq!(denied_events.len(), 1, "exactly one AccessDenied published"); + let denied: AccessDenied = + serde_json::from_value(denied_events[0].1.clone()).unwrap(); + assert_eq!(denied.actor, intruder); + assert_eq!(denied.owner, Some(owner)); + } + + /// What this catches: audit_access for same-persona access does + /// NOT publish. Only denials are observable events. + #[tokio::test] + async fn audit_access_allowed_with_bus_does_not_publish() { + let fast = StubTier::new(TierRole::Fast, vec![]); + let (mgr, _runtime, captured) = wire_manager_to_runtime(vec![fast]).await; + + let owner = make_persona(70); + let page = make_page(71); + mgr.register_persona(owner, capacity_uma()); + mgr.register_page_owner(page, owner); + + // Owner accessing own page: Ok. + let result = mgr.audit_access(owner, page); + assert!(result.is_ok()); + + // Yield in case anything was queued. + for _ in 0..10 { + tokio::task::yield_now().await; + } + + let events = captured.lock().clone(); + let denied_events: Vec<_> = events + .iter() + .filter(|(k, _)| k == ACCESS_DENIED_KEY) + .collect(); + assert!(denied_events.is_empty(), "no denial = no event"); + } + + /// What this catches: bus-less mode (via `new` instead of + /// `with_bus`) still works — the trait methods behave identically + /// to PR-3, just without publishing. Backwards-compat for the + /// standalone use case. + #[tokio::test] + async fn bus_less_mode_does_not_publish_but_methods_work() { + let page = make_page(80); + let fast = StubTier::new(TierRole::Fast, vec![page]); + // `new` instead of `with_bus` — no bus hook. + let mgr = LocalWorkingSetManager::new(vec![fast]); + let persona = make_persona(81); + mgr.register_persona(persona, capacity_uma()); + + // page_in still returns Err(PageFault) — caller-side + // observability still works through the Result arm. + let result = mgr.page_in(persona, page).await; + assert!(result.is_err()); + + // audit_access still returns the typed denial — no spawn, + // no publish, no observable side effect (the typed Result + // is THE signal). + let result = mgr.audit_access(persona, page); + assert!(result.is_ok()); + } } diff --git a/src/workers/continuum-core/src/runtime/message_bus.rs b/src/workers/continuum-core/src/runtime/message_bus.rs index a2926a9bd..f7b111a80 100644 --- a/src/workers/continuum-core/src/runtime/message_bus.rs +++ b/src/workers/continuum-core/src/runtime/message_bus.rs @@ -212,24 +212,44 @@ impl MessageBus { /// Async handlers receive via the broadcast channel. /// /// registry is needed to look up module instances for synchronous delivery. + /// + /// Implementation note: both subscriber walks collect a + /// `Vec<&'static str>` of matching module names BEFORE entering + /// the async dispatch loop. This drops the DashMap borrow before + /// any `.await`, which lets the publish future remain `Send` even + /// when called from spawn contexts (e.g. genome PR-5's + /// `tokio::spawn` of `publish_page_fault`). Without this, the + /// DashMap iter borrow lives across the await and trips + /// "implementation of `dashmap::Map` is not general enough" + /// when the future is shipped to a Send-bounded task. pub async fn publish( &self, event_name: &str, payload: serde_json::Value, registry: &super::ModuleRegistry, ) { - // Synchronous tier (glob-matched event_subscriptions): call inline. - for entry in self.subscriptions.iter() { - for sub in entry.value().iter() { - if sub.synchronous && glob_matches(&sub.pattern, event_name) { - if let Some(module) = registry.get_by_name(sub.module_name) { - if let Err(e) = module.handle_event(event_name, payload.clone()).await { - warn!( - "Event handler error: module={}, event={}, error={}", - sub.module_name, event_name, e - ); - } - } + // Synchronous tier (glob-matched event_subscriptions): collect + // matching module names, release the DashMap borrow, then + // dispatch. + let glob_matched: Vec<&'static str> = self + .subscriptions + .iter() + .flat_map(|entry| { + entry + .value() + .iter() + .filter(|sub| sub.synchronous && glob_matches(&sub.pattern, event_name)) + .map(|sub| sub.module_name) + .collect::>() + }) + .collect(); + for module_name in glob_matched { + if let Some(module) = registry.get_by_name(module_name) { + if let Err(e) = module.handle_event(event_name, payload.clone()).await { + warn!( + "Event handler error: module={}, event={}, error={}", + module_name, event_name, e + ); } } } @@ -243,17 +263,25 @@ impl MessageBus { // it can call self.on_artifact_available(...).await from inside // its override. let key = ArtifactKey::from(event_name); - for entry in self.artifact_subscriptions.iter() { - for sub in entry.value().iter() { - if sub.selector.matches(&key) { - if let Some(module) = registry.get_by_name(sub.module_name) { - if let Err(e) = module.handle_event(event_name, payload.clone()).await { - warn!( - "Artifact handler error: module={}, key={}, error={}", - sub.module_name, event_name, e - ); - } - } + let artifact_matched: Vec<&'static str> = self + .artifact_subscriptions + .iter() + .flat_map(|entry| { + entry + .value() + .iter() + .filter(|sub| sub.selector.matches(&key)) + .map(|sub| sub.module_name) + .collect::>() + }) + .collect(); + for module_name in artifact_matched { + if let Some(module) = registry.get_by_name(module_name) { + if let Err(e) = module.handle_event(event_name, payload.clone()).await { + warn!( + "Artifact handler error: module={}, key={}, error={}", + module_name, event_name, e + ); } } } diff --git a/src/workers/continuum-core/src/runtime/runtime.rs b/src/workers/continuum-core/src/runtime/runtime.rs index b7b471c3a..aecc63e18 100644 --- a/src/workers/continuum-core/src/runtime/runtime.rs +++ b/src/workers/continuum-core/src/runtime/runtime.rs @@ -334,6 +334,15 @@ impl Runtime { &self.bus } + /// Get the Arc for sharing across threads. + /// Used by long-lived publishers (e.g. LocalWorkingSetManager + /// constructed via `with_bus` per genome PR-5) that hold their + /// own Arc and call `bus.publish` without going through the + /// Runtime each time. + pub fn bus_arc(&self) -> Arc { + self.bus.clone() + } + /// Get a reference to the shared compute cache. pub fn compute(&self) -> &SharedCompute { &self.compute