diff --git a/src/workers/continuum-core/src/ipc/mod.rs b/src/workers/continuum-core/src/ipc/mod.rs index 98d0e7bd3..dda06e84a 100644 --- a/src/workers/continuum-core/src/ipc/mod.rs +++ b/src/workers/continuum-core/src/ipc/mod.rs @@ -938,8 +938,11 @@ pub fn start_server( // Shared state for per-persona cognition (unified: engine + inbox + rate limiter + sleep + adapters + genome) let rag_engine = Arc::new(RagEngine::new()); - let cognition_state = - Arc::new(CognitionState::new(rag_engine.clone()).with_gpu_manager(gpu_manager.clone())); + let cognition_state = Arc::new( + CognitionState::new(rag_engine.clone()) + .with_gpu_manager(gpu_manager.clone()) + .with_module_registry(runtime.registry_arc()), + ); let personas = cognition_state.personas.clone(); runtime.register(Arc::new(CognitionModule::new(cognition_state))); diff --git a/src/workers/continuum-core/src/modules/cognition.rs b/src/workers/continuum-core/src/modules/cognition.rs index 47a1d88f4..06053569a 100644 --- a/src/workers/continuum-core/src/modules/cognition.rs +++ b/src/workers/continuum-core/src/modules/cognition.rs @@ -50,7 +50,9 @@ use crate::persona::{ use crate::persona::{RecentResponse, SleepMode}; use crate::rag::RagEngine; use crate::runtime; -use crate::runtime::{CommandResult, ModuleConfig, ModuleContext, ModulePriority, ServiceModule}; +use crate::runtime::{ + CommandResult, ModuleConfig, ModuleContext, ModulePriority, ModuleRegistry, ServiceModule, +}; use crate::utils::params::Params; use async_trait::async_trait; use dashmap::DashMap; @@ -75,6 +77,12 @@ pub struct CognitionState { pub loop_detector: LoopDetector, /// GPU memory manager — real VRAM budgets for genome paging. pub gpu_manager: Option>, + /// Rust module registry for in-process cognition -> inference dispatch. + /// + /// This is intentionally NOT the global command executor: `persona/turn-execute` + /// must fail loudly if the Rust inference module is absent instead of falling + /// through to TypeScript. + pub module_registry: Option>, } impl CognitionState { @@ -84,6 +92,7 @@ impl CognitionState { rag_engine, loop_detector: LoopDetector::new(), gpu_manager: None, + module_registry: None, } } @@ -92,6 +101,11 @@ impl CognitionState { self } + pub fn with_module_registry(mut self, registry: Arc) -> Self { + self.module_registry = Some(registry); + self + } + /// Per-persona inference budget from GPU manager, or 200MB fallback. pub fn per_persona_budget_mb(&self) -> f32 { match &self.gpu_manager { @@ -151,8 +165,10 @@ impl ServiceModule for CognitionModule { // codex's persona inbox fanout primitive (today) + the upcoming // PressureBroker singleton (#1299) make event fanout the // intended invariant. Inference is still gated downstream by - // ai_provider::max_concurrency. No hardcoded fixed cap here. - max_concurrency: usize::MAX, + // ai_provider::max_concurrency. 0 is the runtime contract for + // "unlimited / module-managed"; usize::MAX overflows Tokio's + // semaphore permit ceiling during registration. + max_concurrency: 0, tick_interval: None, } } @@ -360,8 +376,7 @@ impl ServiceModule for CognitionModule { } Ok(CommandResult::Json( - serde_json::to_value(&record) - .map_err(|e| format!("Serialize error: {e}"))?, + serde_json::to_value(&record).map_err(|e| format!("Serialize error: {e}"))?, )) } @@ -372,9 +387,8 @@ impl ServiceModule for CognitionModule { // -> wrap in PersonaTurnFrame // -> derive ResponsePrompt (lazy output) // -> build InferenceRequest (prompt_text path) - // -> dispatch `inference/llm/request` via the - // global command_executor (routes to the - // InferenceLlmModule registered in PR-5 #1404) + // -> dispatch `inference/llm/request` via the Rust + // ModuleRegistry only // -> bundle replay_record + inference response // // Why one command: the TS persona loop previously @@ -475,12 +489,8 @@ impl ServiceModule for CognitionModule { stop_sequences: vec![], }; - // Dispatch via the global command_executor. Routes - // to InferenceLlmModule (registered in PR-5 #1404) - // through the ModuleRegistry. The executor maps - // unhandled commands to the TS bridge; for - // inference/llm/request the Rust route wins. - let inference_response = crate::runtime::command_executor::execute_json( + let inference_response = execute_rust_module_json( + self.state.module_registry.as_deref(), crate::inference::llm_module_service::COMMAND_REQUEST, serde_json::to_value(&inference_request) .map_err(|e| format!("Serialize inference request: {e}"))?, @@ -488,7 +498,7 @@ impl ServiceModule for CognitionModule { .await .map_err(|e| { format!( - "persona/turn-execute: inference dispatch failed for {persona_uuid}: {e}" + "persona/turn-execute: Rust inference dispatch failed for {persona_uuid}: {e}" ) })?; @@ -1722,6 +1732,24 @@ fn turn_frame_replay_record( .and_then(|frame| PersonaTurnFrame::from_inbox_frame(frame.clone()).replay_record()) } +async fn execute_rust_module_json( + registry: Option<&ModuleRegistry>, + command: &str, + params: Value, +) -> Result { + let registry = registry.ok_or_else(|| { + format!("{command}: Rust module registry unavailable; refusing TypeScript fallback") + })?; + let (module, routed_command) = registry.route_command(command).ok_or_else(|| { + format!("{command}: no Rust module route registered; refusing TypeScript fallback") + })?; + + match module.handle_command(&routed_command, params).await? { + CommandResult::Json(value) => Ok(value), + CommandResult::Binary { metadata, .. } => Ok(metadata), + } +} + #[cfg(test)] mod turn_frame_recording_tests { use super::*; @@ -1803,25 +1831,28 @@ mod turn_frame_recording_tests { mod turn_execute_tests { //! Lane D persona/turn-execute command surface tests. //! - //! End-to-end inference dispatch goes through the global - //! `command_executor::executor()` which is only initialized at - //! runtime startup (ipc/mod.rs). These tests cover the paths - //! that DON'T depend on the executor: - //! - persona-not-found returns a typed Err (loud-fail per - //! never-swallow rule) - //! - empty drain short-circuits before dispatch and returns - //! the documented {"replayRecord": null, - //! "inferenceResponse": null} bundle - //! - //! The dispatch-success path is covered by integration tests - //! that boot a Runtime with InferenceLlmModule registered. + //! These tests pin the Rust-only shape: success routes through a + //! `ModuleRegistry` with `InferenceLlmModule` registered; missing registry + //! or missing route fails loudly instead of falling through to TypeScript. use super::*; + use crate::inference::llm_module_service::InferenceLlmModule; use crate::rag::RagEngine; use std::sync::Arc; fn module_with_persona(persona_id: Uuid) -> CognitionModule { + module_with_persona_and_registry(persona_id, None) + } + + fn module_with_persona_and_registry( + persona_id: Uuid, + registry: Option>, + ) -> CognitionModule { let rag_engine = Arc::new(RagEngine::new()); - let state = Arc::new(CognitionState::new(rag_engine.clone())); + let mut state = CognitionState::new(rag_engine.clone()); + if let Some(registry) = registry { + state = state.with_module_registry(registry); + } + let state = Arc::new(state); state.personas.insert( persona_id, crate::persona::PersonaCognition::new( @@ -1833,6 +1864,33 @@ mod turn_execute_tests { CognitionModule::new(state) } + fn rust_inference_registry() -> Arc { + let registry = Arc::new(ModuleRegistry::new()); + registry.register(Arc::new(InferenceLlmModule::new())); + registry + } + + fn enqueue_message(module: &CognitionModule, persona_id: Uuid, content: &str, timestamp: u64) { + let room_id = Uuid::new_v4(); + let persona = module + .state + .personas + .get(&persona_id) + .expect("test persona exists"); + persona.inbox.enqueue(InboxMessage { + id: Uuid::new_v4(), + room_id, + sender_id: Uuid::new_v4(), + sender_name: "Joel".to_string(), + sender_type: SenderType::Human, + content: content.to_string(), + timestamp, + priority: 0.9, + source_modality: Some(Modality::Chat), + voice_session_id: None, + }); + } + #[tokio::test] async fn turn_execute_persona_not_found_returns_typed_error() { let rag_engine = Arc::new(RagEngine::new()); @@ -1865,11 +1923,7 @@ mod turn_execute_tests { async fn turn_execute_empty_drain_returns_null_bundle() { // Persona exists but inbox is empty -> the command should // short-circuit BEFORE any inference dispatch, returning - // the documented null pair. If it ever tried to dispatch - // through command_executor::executor() (which isn't - // initialized in tests), the call would panic — so this - // test also doubles as a regression guard for the - // empty-drain short-circuit. + // the documented null pair. let persona_id = Uuid::new_v4(); let module = module_with_persona(persona_id); @@ -1930,6 +1984,71 @@ mod turn_execute_tests { Ok(_) => panic!("u64::MAX max_duration_ms must fail u32 conversion"), } } + + #[tokio::test] + async fn turn_execute_success_routes_through_rust_inference_module() { + let persona_id = Uuid::new_v4(); + let module = module_with_persona_and_registry(persona_id, Some(rust_inference_registry())); + enqueue_message(&module, persona_id, "what changed?", 20_000); + + let result = module + .handle_command( + "persona/turn-execute", + serde_json::json!({ + "persona_id": persona_id.to_string(), + "max_tokens": 64, + "max_duration_ms": 1_000, + }), + ) + .await + .expect("Rust inference module handles turn"); + + let CommandResult::Json(value) = result else { + panic!("expected Json"); + }; + assert_eq!( + value["replayRecord"]["responsePrompt"]["messages"][0]["content"], + "Joel: what changed?" + ); + assert_eq!( + value["inferenceResponse"]["complete"]["tokensGenerated"], 3, + "registered InferenceLlmModule stub proves Rust-only dispatch reached inference" + ); + assert!( + module + .state + .personas + .get(&persona_id) + .expect("persona remains") + .inbox + .is_empty(), + "turn-execute drains one consolidated frame" + ); + } + + #[tokio::test] + async fn turn_execute_missing_rust_registry_refuses_ts_fallback() { + let persona_id = Uuid::new_v4(); + let module = module_with_persona(persona_id); + enqueue_message(&module, persona_id, "do not fall back to ts", 30_000); + + let result = module + .handle_command( + "persona/turn-execute", + serde_json::json!({ + "persona_id": persona_id.to_string(), + }), + ) + .await; + + match result { + Err(msg) => assert!( + msg.contains("refusing TypeScript fallback"), + "expected loud no-TS-fallback refusal, got: {msg}" + ), + Ok(_) => panic!("missing Rust registry must not fall through"), + } + } } // ============================================================================ diff --git a/src/workers/continuum-core/src/persona/turn_frame.rs b/src/workers/continuum-core/src/persona/turn_frame.rs index 3a9b6caf3..8f3d16935 100644 --- a/src/workers/continuum-core/src/persona/turn_frame.rs +++ b/src/workers/continuum-core/src/persona/turn_frame.rs @@ -694,7 +694,10 @@ mod tests { let prompt = PersonaTurnFrame::from_inbox_frame(frame) .response_prompt() .unwrap(); - assert!(prompt.system_prompt.is_none(), "PR-1 leaves system_prompt for caller"); + assert!( + prompt.system_prompt.is_none(), + "PR-1 leaves system_prompt for caller" + ); } #[test] @@ -757,10 +760,7 @@ mod tests { // ─── ResponsePrompt::to_prompt_text (Lane D turn-execute) ── - fn prompt_with( - system: Option<&str>, - messages: Vec<(PromptRole, &str)>, - ) -> ResponsePrompt { + fn prompt_with(system: Option<&str>, messages: Vec<(PromptRole, &str)>) -> ResponsePrompt { ResponsePrompt { persona_id: Uuid::nil(), room_id: Uuid::nil(),