From 3e2ef107d70974fe47f284bc588781052e7bcfec Mon Sep 17 00:00:00 2001 From: Test Date: Mon, 18 May 2026 11:54:17 -0500 Subject: [PATCH] =?UTF-8?q?feat(inference):=20inference-llm=20PR-3b=20?= =?UTF-8?q?=E2=80=94=20InferenceLlmModule=20auto-publishes=20via=20bus=20h?= =?UTF-8?q?ook?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-3b of inference-llm. Wires the bus helpers from PR-3a (#1392) INTO InferenceLlmModule's handle_command so every successful inference response auto-publishes InferenceComplete + FirstTokenEmitted to the trace bus. Closes the inference-llm bus loop: producer (command) → engine (stub for now) → response (CommandResult) → bus dispatch (complete + first_token) → subscriber (sentinel/VDD/audit). What lands - BusHook private struct: { bus: Arc, registry: Arc }. Same shape as genome::local_manager BusHook (#1362). - InferenceLlmModule.bus_hook: Option — None = bus-less PR-2 behavior; Some = auto-publish on every successful handle_command. - with_bus(bus, registry) constructor — wires both Arcs at module construction; no in-flight switching (prevents the "bus added mid-service" race). - handle_request body: on success, spawns publish_inference_complete and publish_first_token_emitted into the current tokio runtime via Handle::try_current. Spawn pattern (not await) avoids the DashMap borrow-across-await lifetime issue inside Send-bounded async_trait — same workaround as my genome LocalWorkingSetManager (#1362). - spawn_publish_inference_complete + spawn_publish_first_token_emitted module-private helpers — Arcs cloned out before spawn so the &BusHook borrow doesn't outlive the spawn. Design choices - Publishing is best-effort observability. The authoritative response goes back through the CommandResult arm regardless of publish success — callers who need to know if a generation happened look at the Result, not the bus. - Error paths (unknown command + invalid payload) do NOT publish. Tests pin this — bus events represent successful generations; errors are loud in the Result and silent on the bus. - Two separate spawns (one per event) rather than one bundled publish. Lets subscribers see first_token even if the complete event hasn't dispatched yet (race-tolerant TTFT observability). Tests 4 new bus tests (12 total): - handle_command_with_bus_auto_publishes_complete_and_first_token — end-to-end: register subscriber, run handle_command, yield for spawn, verify both events landed with matching requestId - handle_command_without_bus_does_not_publish — backwards-compat with PR-2 new() constructor - handle_command_unknown_with_bus_does_not_publish — error paths silent on bus - handle_command_invalid_payload_with_bus_does_not_publish — same invariant 12/12 pass on inference::llm_module_service. No regressions across other 2957 lib tests. Stack - #1387 — inference-llm PR-1: typed event surface - #1391 — inference-llm PR-2: ServiceModule impl (stub-backed) - #1392 — inference-llm PR-3a: bus keys + publishing helpers - THIS PR — inference-llm PR-3b: auto-publish wiring - NEXT — PR-4: real LlamaCppAdapter invoke + tokenizer + streaming (the stub stays in place until then; PR-4 swaps under the same external contract) Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/inference/llm_module_service.rs | 309 +++++++++++++++++- 1 file changed, 301 insertions(+), 8 deletions(-) diff --git a/src/workers/continuum-core/src/inference/llm_module_service.rs b/src/workers/continuum-core/src/inference/llm_module_service.rs index 54542b6f0..75e880a4e 100644 --- a/src/workers/continuum-core/src/inference/llm_module_service.rs +++ b/src/workers/continuum-core/src/inference/llm_module_service.rs @@ -33,27 +33,80 @@ use async_trait::async_trait; use serde_json::Value; use std::any::Any; +use std::sync::Arc; + use super::llm_module::{ FinishReason, FirstTokenEmitted, InferenceComplete, InferenceRequest, }; +use super::llm_module_bus::{publish_first_token_emitted, publish_inference_complete}; +use crate::runtime::message_bus::MessageBus; use crate::runtime::module_context::ModuleContext; +use crate::runtime::registry::ModuleRegistry; use crate::runtime::service_module::{ CommandResult, ModuleConfig, ModulePriority, ServiceModule, }; +/// Optional bus + registry handle for auto-publishing inference +/// response events. When set on `InferenceLlmModule`, every +/// `handle_command` call that produces an `InferenceResponse` also +/// publishes the complete + first_token events via the artifact +/// dispatch path (#1339+#1343) using the canonical keys from +/// `llm_module_bus` (PR-3a / #1392). +/// +/// Same shape as the genome `BusHook` pattern (#1362) — kept as +/// one struct (not two Arcs on the module) so the absence-of-bus +/// case is a single `Option` field. +struct BusHook { + bus: Arc, + registry: Arc, +} + /// Per-process implementation of `inference-llm`. ServiceModule /// trait impl that handles `inference/llm/request` commands. /// -/// PR-2 is stub-backed (canned tokens); PR-3 replaces the stub -/// with the real `LlamaCppAdapter` invoke. The module's external -/// contract (commands + response shapes) stays identical across -/// the stub-vs-real transition — downstream consumers don't -/// need to know which is running. -pub struct InferenceLlmModule; +/// PR-2 shipped the stub-backed module; PR-3a shipped the bus +/// publishing helpers; PR-3b (this) wires them together. The +/// module's external contract (commands + response shapes) stays +/// identical across the stub-vs-real transition — downstream +/// consumers don't need to know which is running. +/// +/// PR-3b adds optional bus publishing: when constructed via +/// `with_bus(bus, registry)`, every successful handle_command +/// publishes InferenceComplete + FirstTokenEmitted to the trace +/// bus. Constructed via `new()` (the PR-2 shape), the module +/// stays bus-less and behaves exactly as before — useful for +/// tests + standalone use where no runtime is around. +pub struct InferenceLlmModule { + bus_hook: Option, +} impl InferenceLlmModule { + /// Construct without bus publishing (PR-2 shape). Inference + /// responses are returned through the CommandResult but NOT + /// published to any bus. pub fn new() -> Self { - Self + Self { bus_hook: None } + } + + /// Construct with auto-publishing bus hook. Every successful + /// `handle_command` publishes the InferenceComplete + + /// FirstTokenEmitted events via the `llm_module_bus` helpers + /// (PR-3a / #1392) under the canonical keys. + /// + /// `bus` + `registry` must be from the same Runtime — publishing + /// uses `bus.publish` which looks up modules via the registry. + /// Subscribers register through `bus.subscribe_artifact` for the + /// inference keys (typically via + /// `subscribe_to_inference_responses(bus, module_name)` from PR-3a). + /// + /// Why a separate constructor instead of a setter: prevents the + /// "bus added partway through service" race where some events + /// are published and some aren't. Same pattern as my genome + /// LocalWorkingSetManager::with_bus (#1362). + pub fn with_bus(bus: Arc, registry: Arc) -> Self { + Self { + bus_hook: Some(BusHook { bus, registry }), + } } } @@ -136,12 +189,24 @@ impl InferenceLlmModule { .map_err(|e| format!("inference-llm: invalid InferenceRequest payload: {e}"))?; // PR-2 stub: pretend we ran a model + emit canned tokens. - // PR-3 replaces this block with the real LlamaCppAdapter + // PR-4 replaces this block with the real LlamaCppAdapter // invoke. The InferenceComplete + FirstTokenEmitted wire // shapes stay identical across the transition. let complete = run_stub_inference(&request); let first_token = first_token_for(&request, &complete); + // PR-3b: auto-publish to the trace bus when configured. + // Spawn pattern (not await) to avoid the DashMap + // borrow-across-await lifetime issue inside the Send-bounded + // async_trait method body — same workaround as my genome + // LocalWorkingSetManager (#1362). The publish is best-effort + // observability; the authoritative response goes back through + // the CommandResult arm regardless of publishing outcome. + if let Some(hook) = &self.bus_hook { + spawn_publish_inference_complete(hook, complete.clone()); + spawn_publish_first_token_emitted(hook, first_token); + } + let response = InferenceResponse { complete, first_token, @@ -150,6 +215,34 @@ impl InferenceLlmModule { } } +/// Spawn a `publish_inference_complete` into the current tokio +/// runtime. Standalone fn (not a method) so the `&BusHook` borrow +/// doesn't outlive the spawn — Arcs get cloned out first, then the +/// spawned future owns its captures. Same lifetime workaround as +/// my genome `spawn_publish_page_fault` (#1362) — see that PR for +/// the full rationale on why spawn vs await. +fn spawn_publish_inference_complete(hook: &BusHook, complete: InferenceComplete) { + if let Ok(handle) = tokio::runtime::Handle::try_current() { + let bus = hook.bus.clone(); + let registry = hook.registry.clone(); + handle.spawn(async move { + publish_inference_complete(&bus, ®istry, &complete).await; + }); + } +} + +/// Spawn a `publish_first_token_emitted` into the current tokio +/// runtime. Same pattern as `spawn_publish_inference_complete`. +fn spawn_publish_first_token_emitted(hook: &BusHook, event: FirstTokenEmitted) { + if let Ok(handle) = tokio::runtime::Handle::try_current() { + let bus = hook.bus.clone(); + let registry = hook.registry.clone(); + handle.spawn(async move { + publish_first_token_emitted(&bus, ®istry, &event).await; + }); + } +} + /// PR-2 stub inference. Returns the canned 3-token response with /// FinishReason::Stop. Useful for testing the request/response /// wire shape end-to-end without loading a real model. @@ -347,4 +440,204 @@ mod tests { _ => panic!("expected Json"), } } + + // ─── PR-3b: bus auto-publish tests ───────────────────────── + + use crate::inference::llm_module_bus::{ + FIRST_TOKEN_EMITTED_KEY, INFERENCE_COMPLETE_KEY, + inference_response_selectors, + }; + use crate::runtime::artifact_handle::{ArtifactKey, ArtifactSelector}; + use crate::runtime::runtime::Runtime; + use parking_lot::Mutex; + + /// Recording subscriber for PR-3b bus tests. + struct InferenceRecorder { + captured: Arc>>, + } + + impl InferenceRecorder { + fn new() -> (Arc, Arc>>) { + let captured = Arc::new(Mutex::new(Vec::new())); + let module = Arc::new(Self { + captured: captured.clone(), + }); + (module, captured) + } + } + + #[async_trait] + impl ServiceModule for InferenceRecorder { + fn config(&self) -> ModuleConfig { + ModuleConfig { + name: "pr3b-inference-recorder", + priority: ModulePriority::Normal, + command_prefixes: &[], + event_subscriptions: &[], + needs_dedicated_thread: false, + max_concurrency: 0, + tick_interval: None, + } + } + async fn initialize( + &self, + _ctx: &crate::runtime::ModuleContext, + ) -> Result<(), String> { + Ok(()) + } + async fn handle_command( + &self, + _: &str, + _: serde_json::Value, + ) -> Result { + Err("not handled".to_string()) + } + fn artifact_subscriptions(&self) -> Vec { + inference_response_selectors() + } + async fn on_artifact_available( + &self, + key: &ArtifactKey, + payload: serde_json::Value, + ) -> Result<(), String> { + self.captured.lock().push((key.as_str().to_string(), payload)); + Ok(()) + } + fn as_any(&self) -> &dyn Any { + self + } + } + + /// What this catches: with_bus wires auto-publishing. After a + /// successful handle_command call, both InferenceComplete and + /// FirstTokenEmitted land on the trace bus under their canonical + /// keys. End-to-end test of the PR-2 + PR-3a + PR-3b chain. + #[tokio::test] + async fn handle_command_with_bus_auto_publishes_complete_and_first_token() { + let runtime = Arc::new(Runtime::new()); + let (recorder, captured) = InferenceRecorder::new(); + runtime.register(recorder); + + let module = InferenceLlmModule::with_bus( + runtime.bus_arc(), + runtime.registry_arc(), + ); + + let req = sample_request(); + let params = serde_json::to_value(&req).unwrap(); + let _ = module.handle_command(COMMAND_REQUEST, params).await.unwrap(); + + // Yield to let the spawned publishes run. + for _ in 0..50 { + tokio::task::yield_now().await; + if captured.lock().len() >= 2 { + break; + } + } + + let events = captured.lock().clone(); + let keys: Vec = events.iter().map(|(k, _)| k.clone()).collect(); + assert!( + keys.contains(&INFERENCE_COMPLETE_KEY.to_string()), + "expected InferenceComplete event; got keys {keys:?}" + ); + assert!( + keys.contains(&FIRST_TOKEN_EMITTED_KEY.to_string()), + "expected FirstTokenEmitted event; got keys {keys:?}" + ); + + // Both events carry the same requestId we sent in. + for (key, payload) in events { + if key == INFERENCE_COMPLETE_KEY { + let c: InferenceComplete = serde_json::from_value(payload).unwrap(); + assert_eq!(c.request_id, req.request_id); + } else if key == FIRST_TOKEN_EMITTED_KEY { + let f: FirstTokenEmitted = serde_json::from_value(payload).unwrap(); + assert_eq!(f.request_id, req.request_id); + } + } + } + + /// What this catches: bus-less mode (via new()) doesn't publish. + /// Backwards-compat with PR-2 — tests + standalone use don't + /// require a Runtime. + #[tokio::test] + async fn handle_command_without_bus_does_not_publish() { + let runtime = Arc::new(Runtime::new()); + let (recorder, captured) = InferenceRecorder::new(); + runtime.register(recorder); + + // Module constructed WITHOUT bus. + let module = InferenceLlmModule::new(); + let req = sample_request(); + let params = serde_json::to_value(&req).unwrap(); + let _ = module.handle_command(COMMAND_REQUEST, params).await.unwrap(); + + // Yield to give any incorrectly-spawned publish a chance. + for _ in 0..20 { + tokio::task::yield_now().await; + } + + assert!( + captured.lock().is_empty(), + "bus-less module must not publish anything" + ); + } + + /// What this catches: handle_command_unknown does NOT publish. + /// Only successful generations publish events; the unknown- + /// command error path is silent on the bus (the typed error in + /// the Result is the authoritative signal). + #[tokio::test] + async fn handle_command_unknown_with_bus_does_not_publish() { + let runtime = Arc::new(Runtime::new()); + let (recorder, captured) = InferenceRecorder::new(); + runtime.register(recorder); + + let module = InferenceLlmModule::with_bus( + runtime.bus_arc(), + runtime.registry_arc(), + ); + + let result = module + .handle_command("inference/llm/bogus", Value::Null) + .await; + assert!(result.is_err()); + + for _ in 0..20 { + tokio::task::yield_now().await; + } + + assert!( + captured.lock().is_empty(), + "error path must not publish events" + ); + } + + /// What this catches: handle_command_invalid_payload does NOT + /// publish. Same invariant as the unknown-command case — invalid + /// input fails fast via Result; no observability noise on the + /// failure path. + #[tokio::test] + async fn handle_command_invalid_payload_with_bus_does_not_publish() { + let runtime = Arc::new(Runtime::new()); + let (recorder, captured) = InferenceRecorder::new(); + runtime.register(recorder); + + let module = InferenceLlmModule::with_bus( + runtime.bus_arc(), + runtime.registry_arc(), + ); + + let result = module + .handle_command(COMMAND_REQUEST, serde_json::json!({"not": "valid"})) + .await; + assert!(result.is_err()); + + for _ in 0..20 { + tokio::task::yield_now().await; + } + + assert!(captured.lock().is_empty()); + } }