diff --git a/src/workers/continuum-core/src/ipc/mod.rs b/src/workers/continuum-core/src/ipc/mod.rs index 98d0e7bd3..dda06e84a 100644 --- a/src/workers/continuum-core/src/ipc/mod.rs +++ b/src/workers/continuum-core/src/ipc/mod.rs @@ -938,8 +938,11 @@ pub fn start_server( // Shared state for per-persona cognition (unified: engine + inbox + rate limiter + sleep + adapters + genome) let rag_engine = Arc::new(RagEngine::new()); - let cognition_state = - Arc::new(CognitionState::new(rag_engine.clone()).with_gpu_manager(gpu_manager.clone())); + let cognition_state = Arc::new( + CognitionState::new(rag_engine.clone()) + .with_gpu_manager(gpu_manager.clone()) + .with_module_registry(runtime.registry_arc()), + ); let personas = cognition_state.personas.clone(); runtime.register(Arc::new(CognitionModule::new(cognition_state))); diff --git a/src/workers/continuum-core/src/modules/cognition.rs b/src/workers/continuum-core/src/modules/cognition.rs index b46c37009..06053569a 100644 --- a/src/workers/continuum-core/src/modules/cognition.rs +++ b/src/workers/continuum-core/src/modules/cognition.rs @@ -50,7 +50,9 @@ use crate::persona::{ use crate::persona::{RecentResponse, SleepMode}; use crate::rag::RagEngine; use crate::runtime; -use crate::runtime::{CommandResult, ModuleConfig, ModuleContext, ModulePriority, ServiceModule}; +use crate::runtime::{ + CommandResult, ModuleConfig, ModuleContext, ModulePriority, ModuleRegistry, ServiceModule, +}; use crate::utils::params::Params; use async_trait::async_trait; use dashmap::DashMap; @@ -75,6 +77,12 @@ pub struct CognitionState { pub loop_detector: LoopDetector, /// GPU memory manager — real VRAM budgets for genome paging. pub gpu_manager: Option>, + /// Rust module registry for in-process cognition -> inference dispatch. + /// + /// This is intentionally NOT the global command executor: `persona/turn-execute` + /// must fail loudly if the Rust inference module is absent instead of falling + /// through to TypeScript. + pub module_registry: Option>, } impl CognitionState { @@ -84,6 +92,7 @@ impl CognitionState { rag_engine, loop_detector: LoopDetector::new(), gpu_manager: None, + module_registry: None, } } @@ -92,6 +101,11 @@ impl CognitionState { self } + pub fn with_module_registry(mut self, registry: Arc) -> Self { + self.module_registry = Some(registry); + self + } + /// Per-persona inference budget from GPU manager, or 200MB fallback. pub fn per_persona_budget_mb(&self) -> f32 { match &self.gpu_manager { @@ -151,8 +165,10 @@ impl ServiceModule for CognitionModule { // codex's persona inbox fanout primitive (today) + the upcoming // PressureBroker singleton (#1299) make event fanout the // intended invariant. Inference is still gated downstream by - // ai_provider::max_concurrency. No hardcoded fixed cap here. - max_concurrency: usize::MAX, + // ai_provider::max_concurrency. 0 is the runtime contract for + // "unlimited / module-managed"; usize::MAX overflows Tokio's + // semaphore permit ceiling during registration. + max_concurrency: 0, tick_interval: None, } } @@ -360,11 +376,138 @@ impl ServiceModule for CognitionModule { } Ok(CommandResult::Json( - serde_json::to_value(&record) - .map_err(|e| format!("Serialize error: {e}"))?, + serde_json::to_value(&record).map_err(|e| format!("Serialize error: {e}"))?, )) } + // ─── Lane D: persona/turn-execute (alpha card #1409) ── + // + // Chains the full Rust persona turn in one IPC hop: + // drain inbox + // -> wrap in PersonaTurnFrame + // -> derive ResponsePrompt (lazy output) + // -> build InferenceRequest (prompt_text path) + // -> dispatch `inference/llm/request` via the Rust + // ModuleRegistry only + // -> bundle replay_record + inference response + // + // Why one command: the TS persona loop previously + // executed each stage with its own IPC round-trip + // (drain, then build prompt, then call inference) — + // 3 round-trips per turn, prompt-building lived in + // TS. Lane D pulls all three into the substrate so + // (a) the prompt is built in Rust where the turn-frame + // lives, (b) the production replay record carries the + // exact prompt that fed inference, (c) the persona + // turn becomes one observable unit on the bus. + // + // Empty drain returns `{ "replayRecord": null, + // "inferenceResponse": null }` — no-op, not an error. + // Persona not found returns typed Err per Joel's never- + // swallow rule. + // + // The actual inference happens in InferenceLlmModule: + // when wired with no adapter (PR-5 shape), it returns + // the 3-token stub response; when wired with an + // adapter (future), it runs the real engine. Either + // way the turn-execute command's contract is the same. + "persona/turn-execute" => { + let _timer = TimingGuard::new("module", "persona_turn_execute"); + let persona_uuid = p.uuid("persona_id")?; + let window_ms = p.u64_or("window_ms", 80); + let max_items_u64 = p.u64_or("max_items", 16); + let max_items = usize::try_from(max_items_u64) + .map_err(|_| format!("max_items too large: {max_items_u64}"))?; + + // Optional composition + sampling + budget params. Callers that + // don't pass them get defaults; the substrate uses the canonical + // SamplingParams::default + a conservative GenerationBudget so + // a misconfigured caller doesn't run unbounded inference. + let composition_artifact_id = + p.uuid_opt("composition_artifact_id").unwrap_or(Uuid::nil()); + let max_tokens = u32::try_from(p.u64_or("max_tokens", 512)) + .map_err(|_| "max_tokens too large for u32".to_string())?; + let max_duration_ms = u32::try_from(p.u64_or("max_duration_ms", 10_000)) + .map_err(|_| "max_duration_ms too large for u32".to_string())?; + + let persona = self + .state + .personas + .get(&persona_uuid) + .ok_or_else(|| format!("No cognition for {persona_uuid}"))?; + + let raw_frame = persona.inbox.drain_frame(window_ms, max_items); + record_drained_turn_frame(&raw_frame); + + // Empty drain: returned as null pair, NOT an Err. + // Idle ticks are routine; a no-op is the correct + // outcome, not a failure. + let inbox_frame = match raw_frame { + Some(f) => f, + None => { + return Ok(CommandResult::Json(serde_json::json!({ + "replayRecord": Value::Null, + "inferenceResponse": Value::Null, + }))); + } + }; + + let turn_frame = PersonaTurnFrame::from_inbox_frame(inbox_frame); + let replay_record = turn_frame.replay_record(); + if let Some(ref rec) = replay_record { + crate::persona::recorder::record_turn_frame_replay(rec); + } + + let response_prompt = turn_frame + .response_prompt() + .ok_or_else(|| { + format!( + "persona/turn-execute: non-empty drain produced no ResponsePrompt for {persona_uuid}" + ) + })?; + + // Build the substrate InferenceRequest. The + // request_id is fresh per-turn; the persona + + // composition come from the turn frame + caller. + // prompt_text is the flattened ResponsePrompt; + // prompt_tokens is empty (adapter-path). + let inference_request = crate::inference::llm_module::InferenceRequest { + request_id: crate::inference::llm_module::InferenceRequestId::new( + Uuid::new_v4(), + ), + persona: crate::genome::working_set::PersonaId::new(persona_uuid), + composition: crate::inference::llm_module::CompositionPlan( + crate::genome::working_set::ArtifactId::new(composition_artifact_id), + ), + prompt_tokens: vec![], + prompt_text: Some(response_prompt.to_prompt_text()), + budget: crate::inference::llm_module::GenerationBudget { + max_tokens, + max_duration_ms, + }, + sampling: crate::inference::llm_module::SamplingParams::default(), + stop_sequences: vec![], + }; + + let inference_response = execute_rust_module_json( + self.state.module_registry.as_deref(), + crate::inference::llm_module_service::COMMAND_REQUEST, + serde_json::to_value(&inference_request) + .map_err(|e| format!("Serialize inference request: {e}"))?, + ) + .await + .map_err(|e| { + format!( + "persona/turn-execute: Rust inference dispatch failed for {persona_uuid}: {e}" + ) + })?; + + Ok(CommandResult::Json(serde_json::json!({ + "replayRecord": replay_record, + "inferenceResponse": inference_response, + }))) + } + // ================================================================ // Admission Gate (continuum#1121 PR-4) // ================================================================ @@ -1589,6 +1732,24 @@ fn turn_frame_replay_record( .and_then(|frame| PersonaTurnFrame::from_inbox_frame(frame.clone()).replay_record()) } +async fn execute_rust_module_json( + registry: Option<&ModuleRegistry>, + command: &str, + params: Value, +) -> Result { + let registry = registry.ok_or_else(|| { + format!("{command}: Rust module registry unavailable; refusing TypeScript fallback") + })?; + let (module, routed_command) = registry.route_command(command).ok_or_else(|| { + format!("{command}: no Rust module route registered; refusing TypeScript fallback") + })?; + + match module.handle_command(&routed_command, params).await? { + CommandResult::Json(value) => Ok(value), + CommandResult::Binary { metadata, .. } => Ok(metadata), + } +} + #[cfg(test)] mod turn_frame_recording_tests { use super::*; @@ -1666,6 +1827,230 @@ mod turn_frame_recording_tests { } } +#[cfg(test)] +mod turn_execute_tests { + //! Lane D persona/turn-execute command surface tests. + //! + //! These tests pin the Rust-only shape: success routes through a + //! `ModuleRegistry` with `InferenceLlmModule` registered; missing registry + //! or missing route fails loudly instead of falling through to TypeScript. + use super::*; + use crate::inference::llm_module_service::InferenceLlmModule; + use crate::rag::RagEngine; + use std::sync::Arc; + + fn module_with_persona(persona_id: Uuid) -> CognitionModule { + module_with_persona_and_registry(persona_id, None) + } + + fn module_with_persona_and_registry( + persona_id: Uuid, + registry: Option>, + ) -> CognitionModule { + let rag_engine = Arc::new(RagEngine::new()); + let mut state = CognitionState::new(rag_engine.clone()); + if let Some(registry) = registry { + state = state.with_module_registry(registry); + } + let state = Arc::new(state); + state.personas.insert( + persona_id, + crate::persona::PersonaCognition::new( + persona_id, + "Test Persona".to_string(), + rag_engine, + ), + ); + CognitionModule::new(state) + } + + fn rust_inference_registry() -> Arc { + let registry = Arc::new(ModuleRegistry::new()); + registry.register(Arc::new(InferenceLlmModule::new())); + registry + } + + fn enqueue_message(module: &CognitionModule, persona_id: Uuid, content: &str, timestamp: u64) { + let room_id = Uuid::new_v4(); + let persona = module + .state + .personas + .get(&persona_id) + .expect("test persona exists"); + persona.inbox.enqueue(InboxMessage { + id: Uuid::new_v4(), + room_id, + sender_id: Uuid::new_v4(), + sender_name: "Joel".to_string(), + sender_type: SenderType::Human, + content: content.to_string(), + timestamp, + priority: 0.9, + source_modality: Some(Modality::Chat), + voice_session_id: None, + }); + } + + #[tokio::test] + async fn turn_execute_persona_not_found_returns_typed_error() { + let rag_engine = Arc::new(RagEngine::new()); + let state = Arc::new(CognitionState::new(rag_engine)); + let module = CognitionModule::new(state); + + let missing_persona = Uuid::new_v4(); + let result = module + .handle_command( + "persona/turn-execute", + serde_json::json!({ + "persona_id": missing_persona.to_string(), + }), + ) + .await; + + match result { + Err(msg) => { + assert!( + msg.contains("No cognition for"), + "expected 'No cognition for' in error, got: {msg}" + ); + assert!(msg.contains(&missing_persona.to_string())); + } + Ok(_) => panic!("missing persona must surface typed Err"), + } + } + + #[tokio::test] + async fn turn_execute_empty_drain_returns_null_bundle() { + // Persona exists but inbox is empty -> the command should + // short-circuit BEFORE any inference dispatch, returning + // the documented null pair. + let persona_id = Uuid::new_v4(); + let module = module_with_persona(persona_id); + + let result = module + .handle_command( + "persona/turn-execute", + serde_json::json!({ + "persona_id": persona_id.to_string(), + "window_ms": 50, + "max_items": 8, + }), + ) + .await + .expect("empty drain is a no-op, not an error"); + + match result { + CommandResult::Json(v) => { + assert_eq!( + v.get("replayRecord"), + Some(&Value::Null), + "empty drain produces null replayRecord; got {v}" + ); + assert_eq!( + v.get("inferenceResponse"), + Some(&Value::Null), + "empty drain produces null inferenceResponse; got {v}" + ); + } + CommandResult::Binary { .. } => panic!("expected Json"), + } + } + + #[tokio::test] + async fn turn_execute_bad_max_items_returns_typed_error() { + // Defensive: usize::try_from rejects > usize::MAX (always + // succeeds on 64-bit but defends 32-bit builds). The + // happy path validation comes via the empty-drain test + // above; this one pins the param-parse error path. + let persona_id = Uuid::new_v4(); + let module = module_with_persona(persona_id); + + let result = module + .handle_command( + "persona/turn-execute", + serde_json::json!({ + "persona_id": persona_id.to_string(), + "max_duration_ms": u64::MAX, + }), + ) + .await; + match result { + Err(msg) => { + assert!( + msg.contains("max_duration_ms too large"), + "expected max_duration_ms overflow error, got: {msg}" + ); + } + Ok(_) => panic!("u64::MAX max_duration_ms must fail u32 conversion"), + } + } + + #[tokio::test] + async fn turn_execute_success_routes_through_rust_inference_module() { + let persona_id = Uuid::new_v4(); + let module = module_with_persona_and_registry(persona_id, Some(rust_inference_registry())); + enqueue_message(&module, persona_id, "what changed?", 20_000); + + let result = module + .handle_command( + "persona/turn-execute", + serde_json::json!({ + "persona_id": persona_id.to_string(), + "max_tokens": 64, + "max_duration_ms": 1_000, + }), + ) + .await + .expect("Rust inference module handles turn"); + + let CommandResult::Json(value) = result else { + panic!("expected Json"); + }; + assert_eq!( + value["replayRecord"]["responsePrompt"]["messages"][0]["content"], + "Joel: what changed?" + ); + assert_eq!( + value["inferenceResponse"]["complete"]["tokensGenerated"], 3, + "registered InferenceLlmModule stub proves Rust-only dispatch reached inference" + ); + assert!( + module + .state + .personas + .get(&persona_id) + .expect("persona remains") + .inbox + .is_empty(), + "turn-execute drains one consolidated frame" + ); + } + + #[tokio::test] + async fn turn_execute_missing_rust_registry_refuses_ts_fallback() { + let persona_id = Uuid::new_v4(); + let module = module_with_persona(persona_id); + enqueue_message(&module, persona_id, "do not fall back to ts", 30_000); + + let result = module + .handle_command( + "persona/turn-execute", + serde_json::json!({ + "persona_id": persona_id.to_string(), + }), + ) + .await; + + match result { + Err(msg) => assert!( + msg.contains("refusing TypeScript fallback"), + "expected loud no-TS-fallback refusal, got: {msg}" + ), + Ok(_) => panic!("missing Rust registry must not fall through"), + } + } +} + // ============================================================================ // Parsing helpers // ============================================================================ diff --git a/src/workers/continuum-core/src/persona/turn_frame.rs b/src/workers/continuum-core/src/persona/turn_frame.rs index ea9bd839a..8f3d16935 100644 --- a/src/workers/continuum-core/src/persona/turn_frame.rs +++ b/src/workers/continuum-core/src/persona/turn_frame.rs @@ -267,6 +267,47 @@ impl PersonaTurnFrame { } } +impl ResponsePrompt { + /// Flatten the chat-style prompt into a single plain-text + /// prompt suitable for adapter-based inference engines that + /// tokenize internally (LlamaCppAdapter + cloud adapters via + /// `InferenceRequest.prompt_text`). + /// + /// Format: `system_prompt` on its own paragraph (if present), + /// then each `PromptMessage` on its own line as + /// `Role: content`. Role is lowercased to match the on-the-wire + /// PromptRole serde format ("system", "user", "assistant"). + /// + /// This is a deliberate "flatten now, structure later" decision: + /// adapter-based engines re-structure into their native format + /// internally; raw-token engines don't use prompt_text at all + /// (they take prompt_tokens). The substrate's job is to give + /// adapters a single deterministic text input that round-trips. + pub fn to_prompt_text(&self) -> String { + let mut out = String::new(); + if let Some(system) = self.system_prompt.as_deref() { + if !system.is_empty() { + out.push_str(system); + out.push_str("\n\n"); + } + } + for (i, msg) in self.messages.iter().enumerate() { + if i > 0 { + out.push('\n'); + } + let role = match msg.role { + PromptRole::System => "system", + PromptRole::User => "user", + PromptRole::Assistant => "assistant", + }; + out.push_str(role); + out.push_str(": "); + out.push_str(&msg.content); + } + out + } +} + #[cfg(test)] mod tests { use super::*; @@ -653,7 +694,10 @@ mod tests { let prompt = PersonaTurnFrame::from_inbox_frame(frame) .response_prompt() .unwrap(); - assert!(prompt.system_prompt.is_none(), "PR-1 leaves system_prompt for caller"); + assert!( + prompt.system_prompt.is_none(), + "PR-1 leaves system_prompt for caller" + ); } #[test] @@ -713,4 +757,88 @@ mod tests { assert!(json.contains("\"triggerMessageId\":"), "got {json}"); assert!(json.contains("\"role\":\"user\""), "got {json}"); } + + // ─── ResponsePrompt::to_prompt_text (Lane D turn-execute) ── + + fn prompt_with(system: Option<&str>, messages: Vec<(PromptRole, &str)>) -> ResponsePrompt { + ResponsePrompt { + persona_id: Uuid::nil(), + room_id: Uuid::nil(), + system_prompt: system.map(String::from), + messages: messages + .into_iter() + .map(|(role, content)| PromptMessage { + role, + content: content.to_string(), + }) + .collect(), + trigger_message_id: Uuid::nil(), + } + } + + #[test] + fn to_prompt_text_renders_each_message_as_role_colon_content() { + let prompt = prompt_with( + None, + vec![ + (PromptRole::User, "Joel: hi"), + (PromptRole::User, "Joel: how are you"), + ], + ); + let text = prompt.to_prompt_text(); + assert_eq!(text, "user: Joel: hi\nuser: Joel: how are you"); + } + + #[test] + fn to_prompt_text_prepends_system_prompt_when_present() { + let prompt = prompt_with( + Some("You are Helper, a calm assistant."), + vec![(PromptRole::User, "Joel: ping")], + ); + let text = prompt.to_prompt_text(); + assert_eq!( + text, + "You are Helper, a calm assistant.\n\nuser: Joel: ping" + ); + } + + #[test] + fn to_prompt_text_skips_empty_system_prompt() { + // Empty string is treated as "no system prompt" — no + // double-newline noise on the wire. + let prompt = prompt_with(Some(""), vec![(PromptRole::User, "hi")]); + let text = prompt.to_prompt_text(); + assert_eq!(text, "user: hi"); + } + + #[test] + fn to_prompt_text_handles_mixed_roles_in_order() { + let prompt = prompt_with( + None, + vec![ + (PromptRole::System, "Be brief."), + (PromptRole::User, "Joel: hi"), + (PromptRole::Assistant, "Helper: hello"), + (PromptRole::User, "Joel: thanks"), + ], + ); + let text = prompt.to_prompt_text(); + assert_eq!( + text, + "system: Be brief.\nuser: Joel: hi\nassistant: Helper: hello\nuser: Joel: thanks" + ); + } + + #[test] + fn to_prompt_text_handles_no_messages() { + let prompt = prompt_with(Some("Solo system instruction."), vec![]); + let text = prompt.to_prompt_text(); + assert_eq!(text, "Solo system instruction.\n\n"); + } + + #[test] + fn to_prompt_text_empty_prompt_returns_empty_string() { + let prompt = prompt_with(None, vec![]); + assert_eq!(prompt.to_prompt_text(), ""); + } }