Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 301 additions & 8 deletions src/workers/continuum-core/src/inference/llm_module_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,80 @@ use async_trait::async_trait;
use serde_json::Value;
use std::any::Any;

use std::sync::Arc;

use super::llm_module::{
FinishReason, FirstTokenEmitted, InferenceComplete, InferenceRequest,
};
use super::llm_module_bus::{publish_first_token_emitted, publish_inference_complete};
use crate::runtime::message_bus::MessageBus;
use crate::runtime::module_context::ModuleContext;
use crate::runtime::registry::ModuleRegistry;
use crate::runtime::service_module::{
CommandResult, ModuleConfig, ModulePriority, ServiceModule,
};

/// Optional bus + registry handle for auto-publishing inference
/// response events. When set on `InferenceLlmModule`, every
/// `handle_command` call that produces an `InferenceResponse` also
/// publishes the complete + first_token events via the artifact
/// dispatch path (#1339+#1343) using the canonical keys from
/// `llm_module_bus` (PR-3a / #1392).
///
/// Same shape as the genome `BusHook` pattern (#1362) — kept as
/// one struct (not two Arcs on the module) so the absence-of-bus
/// case is a single `Option<BusHook>` field.
struct BusHook {
bus: Arc<MessageBus>,
registry: Arc<ModuleRegistry>,
}

/// Per-process implementation of `inference-llm`. ServiceModule
/// trait impl that handles `inference/llm/request` commands.
///
/// PR-2 is stub-backed (canned tokens); PR-3 replaces the stub
/// with the real `LlamaCppAdapter` invoke. The module's external
/// contract (commands + response shapes) stays identical across
/// the stub-vs-real transition — downstream consumers don't
/// need to know which is running.
pub struct InferenceLlmModule;
/// PR-2 shipped the stub-backed module; PR-3a shipped the bus
/// publishing helpers; PR-3b (this) wires them together. The
/// module's external contract (commands + response shapes) stays
/// identical across the stub-vs-real transition — downstream
/// consumers don't need to know which is running.
///
/// PR-3b adds optional bus publishing: when constructed via
/// `with_bus(bus, registry)`, every successful handle_command
/// publishes InferenceComplete + FirstTokenEmitted to the trace
/// bus. Constructed via `new()` (the PR-2 shape), the module
/// stays bus-less and behaves exactly as before — useful for
/// tests + standalone use where no runtime is around.
pub struct InferenceLlmModule {
bus_hook: Option<BusHook>,
}

impl InferenceLlmModule {
/// Construct without bus publishing (PR-2 shape). Inference
/// responses are returned through the CommandResult but NOT
/// published to any bus.
pub fn new() -> Self {
Self
Self { bus_hook: None }
}

/// Construct with auto-publishing bus hook. Every successful
/// `handle_command` publishes the InferenceComplete +
/// FirstTokenEmitted events via the `llm_module_bus` helpers
/// (PR-3a / #1392) under the canonical keys.
///
/// `bus` + `registry` must be from the same Runtime — publishing
/// uses `bus.publish` which looks up modules via the registry.
/// Subscribers register through `bus.subscribe_artifact` for the
/// inference keys (typically via
/// `subscribe_to_inference_responses(bus, module_name)` from PR-3a).
///
/// Why a separate constructor instead of a setter: prevents the
/// "bus added partway through service" race where some events
/// are published and some aren't. Same pattern as my genome
/// LocalWorkingSetManager::with_bus (#1362).
pub fn with_bus(bus: Arc<MessageBus>, registry: Arc<ModuleRegistry>) -> Self {
Self {
bus_hook: Some(BusHook { bus, registry }),
}
}
}

Expand Down Expand Up @@ -136,12 +189,24 @@ impl InferenceLlmModule {
.map_err(|e| format!("inference-llm: invalid InferenceRequest payload: {e}"))?;

// PR-2 stub: pretend we ran a model + emit canned tokens.
// PR-3 replaces this block with the real LlamaCppAdapter
// PR-4 replaces this block with the real LlamaCppAdapter
// invoke. The InferenceComplete + FirstTokenEmitted wire
// shapes stay identical across the transition.
let complete = run_stub_inference(&request);
let first_token = first_token_for(&request, &complete);

// PR-3b: auto-publish to the trace bus when configured.
// Spawn pattern (not await) to avoid the DashMap
// borrow-across-await lifetime issue inside the Send-bounded
// async_trait method body — same workaround as my genome
// LocalWorkingSetManager (#1362). The publish is best-effort
// observability; the authoritative response goes back through
// the CommandResult arm regardless of publishing outcome.
if let Some(hook) = &self.bus_hook {
spawn_publish_inference_complete(hook, complete.clone());
spawn_publish_first_token_emitted(hook, first_token);
}

let response = InferenceResponse {
complete,
first_token,
Expand All @@ -150,6 +215,34 @@ impl InferenceLlmModule {
}
}

/// Spawn a `publish_inference_complete` into the current tokio
/// runtime. Standalone fn (not a method) so the `&BusHook` borrow
/// doesn't outlive the spawn — Arcs get cloned out first, then the
/// spawned future owns its captures. Same lifetime workaround as
/// my genome `spawn_publish_page_fault` (#1362) — see that PR for
/// the full rationale on why spawn vs await.
fn spawn_publish_inference_complete(hook: &BusHook, complete: InferenceComplete) {
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let bus = hook.bus.clone();
let registry = hook.registry.clone();
handle.spawn(async move {
publish_inference_complete(&bus, &registry, &complete).await;
});
}
}

/// Spawn a `publish_first_token_emitted` into the current tokio
/// runtime. Same pattern as `spawn_publish_inference_complete`.
fn spawn_publish_first_token_emitted(hook: &BusHook, event: FirstTokenEmitted) {
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let bus = hook.bus.clone();
let registry = hook.registry.clone();
handle.spawn(async move {
publish_first_token_emitted(&bus, &registry, &event).await;
});
}
}

/// PR-2 stub inference. Returns the canned 3-token response with
/// FinishReason::Stop. Useful for testing the request/response
/// wire shape end-to-end without loading a real model.
Expand Down Expand Up @@ -347,4 +440,204 @@ mod tests {
_ => panic!("expected Json"),
}
}

// ─── PR-3b: bus auto-publish tests ─────────────────────────

use crate::inference::llm_module_bus::{
FIRST_TOKEN_EMITTED_KEY, INFERENCE_COMPLETE_KEY,
inference_response_selectors,
};
use crate::runtime::artifact_handle::{ArtifactKey, ArtifactSelector};
use crate::runtime::runtime::Runtime;
use parking_lot::Mutex;

/// Recording subscriber for PR-3b bus tests.
struct InferenceRecorder {
captured: Arc<Mutex<Vec<(String, serde_json::Value)>>>,
}

impl InferenceRecorder {
fn new() -> (Arc<Self>, Arc<Mutex<Vec<(String, serde_json::Value)>>>) {
let captured = Arc::new(Mutex::new(Vec::new()));
let module = Arc::new(Self {
captured: captured.clone(),
});
(module, captured)
}
}

#[async_trait]
impl ServiceModule for InferenceRecorder {
fn config(&self) -> ModuleConfig {
ModuleConfig {
name: "pr3b-inference-recorder",
priority: ModulePriority::Normal,
command_prefixes: &[],
event_subscriptions: &[],
needs_dedicated_thread: false,
max_concurrency: 0,
tick_interval: None,
}
}
async fn initialize(
&self,
_ctx: &crate::runtime::ModuleContext,
) -> Result<(), String> {
Ok(())
}
async fn handle_command(
&self,
_: &str,
_: serde_json::Value,
) -> Result<CommandResult, String> {
Err("not handled".to_string())
}
fn artifact_subscriptions(&self) -> Vec<ArtifactSelector> {
inference_response_selectors()
}
async fn on_artifact_available(
&self,
key: &ArtifactKey,
payload: serde_json::Value,
) -> Result<(), String> {
self.captured.lock().push((key.as_str().to_string(), payload));
Ok(())
}
fn as_any(&self) -> &dyn Any {
self
}
}

/// What this catches: with_bus wires auto-publishing. After a
/// successful handle_command call, both InferenceComplete and
/// FirstTokenEmitted land on the trace bus under their canonical
/// keys. End-to-end test of the PR-2 + PR-3a + PR-3b chain.
#[tokio::test]
async fn handle_command_with_bus_auto_publishes_complete_and_first_token() {
let runtime = Arc::new(Runtime::new());
let (recorder, captured) = InferenceRecorder::new();
runtime.register(recorder);

let module = InferenceLlmModule::with_bus(
runtime.bus_arc(),
runtime.registry_arc(),
);

let req = sample_request();
let params = serde_json::to_value(&req).unwrap();
let _ = module.handle_command(COMMAND_REQUEST, params).await.unwrap();

// Yield to let the spawned publishes run.
for _ in 0..50 {
tokio::task::yield_now().await;
if captured.lock().len() >= 2 {
break;
}
}

let events = captured.lock().clone();
let keys: Vec<String> = events.iter().map(|(k, _)| k.clone()).collect();
assert!(
keys.contains(&INFERENCE_COMPLETE_KEY.to_string()),
"expected InferenceComplete event; got keys {keys:?}"
);
assert!(
keys.contains(&FIRST_TOKEN_EMITTED_KEY.to_string()),
"expected FirstTokenEmitted event; got keys {keys:?}"
);

// Both events carry the same requestId we sent in.
for (key, payload) in events {
if key == INFERENCE_COMPLETE_KEY {
let c: InferenceComplete = serde_json::from_value(payload).unwrap();
assert_eq!(c.request_id, req.request_id);
} else if key == FIRST_TOKEN_EMITTED_KEY {
let f: FirstTokenEmitted = serde_json::from_value(payload).unwrap();
assert_eq!(f.request_id, req.request_id);
}
}
}

/// What this catches: bus-less mode (via new()) doesn't publish.
/// Backwards-compat with PR-2 — tests + standalone use don't
/// require a Runtime.
#[tokio::test]
async fn handle_command_without_bus_does_not_publish() {
let runtime = Arc::new(Runtime::new());
let (recorder, captured) = InferenceRecorder::new();
runtime.register(recorder);

// Module constructed WITHOUT bus.
let module = InferenceLlmModule::new();
let req = sample_request();
let params = serde_json::to_value(&req).unwrap();
let _ = module.handle_command(COMMAND_REQUEST, params).await.unwrap();

// Yield to give any incorrectly-spawned publish a chance.
for _ in 0..20 {
tokio::task::yield_now().await;
}

assert!(
captured.lock().is_empty(),
"bus-less module must not publish anything"
);
}

/// What this catches: handle_command_unknown does NOT publish.
/// Only successful generations publish events; the unknown-
/// command error path is silent on the bus (the typed error in
/// the Result is the authoritative signal).
#[tokio::test]
async fn handle_command_unknown_with_bus_does_not_publish() {
let runtime = Arc::new(Runtime::new());
let (recorder, captured) = InferenceRecorder::new();
runtime.register(recorder);

let module = InferenceLlmModule::with_bus(
runtime.bus_arc(),
runtime.registry_arc(),
);

let result = module
.handle_command("inference/llm/bogus", Value::Null)
.await;
assert!(result.is_err());

for _ in 0..20 {
tokio::task::yield_now().await;
}

assert!(
captured.lock().is_empty(),
"error path must not publish events"
);
}

/// What this catches: handle_command_invalid_payload does NOT
/// publish. Same invariant as the unknown-command case — invalid
/// input fails fast via Result; no observability noise on the
/// failure path.
#[tokio::test]
async fn handle_command_invalid_payload_with_bus_does_not_publish() {
let runtime = Arc::new(Runtime::new());
let (recorder, captured) = InferenceRecorder::new();
runtime.register(recorder);

let module = InferenceLlmModule::with_bus(
runtime.bus_arc(),
runtime.registry_arc(),
);

let result = module
.handle_command(COMMAND_REQUEST, serde_json::json!({"not": "valid"}))
.await;
assert!(result.is_err());

for _ in 0..20 {
tokio::task::yield_now().await;
}

assert!(captured.lock().is_empty());
}
}
Loading