Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/workers/continuum-core/src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,8 +938,11 @@ pub fn start_server(

// Shared state for per-persona cognition (unified: engine + inbox + rate limiter + sleep + adapters + genome)
let rag_engine = Arc::new(RagEngine::new());
let cognition_state =
Arc::new(CognitionState::new(rag_engine.clone()).with_gpu_manager(gpu_manager.clone()));
let cognition_state = Arc::new(
CognitionState::new(rag_engine.clone())
.with_gpu_manager(gpu_manager.clone())
.with_module_registry(runtime.registry_arc()),
);
let personas = cognition_state.personas.clone();
runtime.register(Arc::new(CognitionModule::new(cognition_state)));

Expand Down
185 changes: 152 additions & 33 deletions src/workers/continuum-core/src/modules/cognition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ use crate::persona::{
use crate::persona::{RecentResponse, SleepMode};
use crate::rag::RagEngine;
use crate::runtime;
use crate::runtime::{CommandResult, ModuleConfig, ModuleContext, ModulePriority, ServiceModule};
use crate::runtime::{
CommandResult, ModuleConfig, ModuleContext, ModulePriority, ModuleRegistry, ServiceModule,
};
use crate::utils::params::Params;
use async_trait::async_trait;
use dashmap::DashMap;
Expand All @@ -75,6 +77,12 @@ pub struct CognitionState {
pub loop_detector: LoopDetector,
/// GPU memory manager — real VRAM budgets for genome paging.
pub gpu_manager: Option<Arc<GpuMemoryManager>>,
/// Rust module registry for in-process cognition -> inference dispatch.
///
/// This is intentionally NOT the global command executor: `persona/turn-execute`
/// must fail loudly if the Rust inference module is absent instead of falling
/// through to TypeScript.
pub module_registry: Option<Arc<ModuleRegistry>>,
}

impl CognitionState {
Expand All @@ -84,6 +92,7 @@ impl CognitionState {
rag_engine,
loop_detector: LoopDetector::new(),
gpu_manager: None,
module_registry: None,
}
}

Expand All @@ -92,6 +101,11 @@ impl CognitionState {
self
}

pub fn with_module_registry(mut self, registry: Arc<ModuleRegistry>) -> Self {
self.module_registry = Some(registry);
self
}

/// Per-persona inference budget from GPU manager, or 200MB fallback.
pub fn per_persona_budget_mb(&self) -> f32 {
match &self.gpu_manager {
Expand Down Expand Up @@ -151,8 +165,10 @@ impl ServiceModule for CognitionModule {
// codex's persona inbox fanout primitive (today) + the upcoming
// PressureBroker singleton (#1299) make event fanout the
// intended invariant. Inference is still gated downstream by
// ai_provider::max_concurrency. No hardcoded fixed cap here.
max_concurrency: usize::MAX,
// ai_provider::max_concurrency. 0 is the runtime contract for
// "unlimited / module-managed"; usize::MAX overflows Tokio's
// semaphore permit ceiling during registration.
max_concurrency: 0,
tick_interval: None,
}
}
Expand Down Expand Up @@ -360,8 +376,7 @@ impl ServiceModule for CognitionModule {
}

Ok(CommandResult::Json(
serde_json::to_value(&record)
.map_err(|e| format!("Serialize error: {e}"))?,
serde_json::to_value(&record).map_err(|e| format!("Serialize error: {e}"))?,
))
}

Expand All @@ -372,9 +387,8 @@ impl ServiceModule for CognitionModule {
// -> wrap in PersonaTurnFrame
// -> derive ResponsePrompt (lazy output)
// -> build InferenceRequest (prompt_text path)
// -> dispatch `inference/llm/request` via the
// global command_executor (routes to the
// InferenceLlmModule registered in PR-5 #1404)
// -> dispatch `inference/llm/request` via the Rust
// ModuleRegistry only
// -> bundle replay_record + inference response
//
// Why one command: the TS persona loop previously
Expand Down Expand Up @@ -475,20 +489,16 @@ impl ServiceModule for CognitionModule {
stop_sequences: vec![],
};

// Dispatch via the global command_executor. Routes
// to InferenceLlmModule (registered in PR-5 #1404)
// through the ModuleRegistry. The executor maps
// unhandled commands to the TS bridge; for
// inference/llm/request the Rust route wins.
let inference_response = crate::runtime::command_executor::execute_json(
let inference_response = execute_rust_module_json(
self.state.module_registry.as_deref(),
crate::inference::llm_module_service::COMMAND_REQUEST,
serde_json::to_value(&inference_request)
.map_err(|e| format!("Serialize inference request: {e}"))?,
)
.await
.map_err(|e| {
format!(
"persona/turn-execute: inference dispatch failed for {persona_uuid}: {e}"
"persona/turn-execute: Rust inference dispatch failed for {persona_uuid}: {e}"
)
})?;

Expand Down Expand Up @@ -1722,6 +1732,24 @@ fn turn_frame_replay_record(
.and_then(|frame| PersonaTurnFrame::from_inbox_frame(frame.clone()).replay_record())
}

async fn execute_rust_module_json(
registry: Option<&ModuleRegistry>,
command: &str,
params: Value,
) -> Result<Value, String> {
let registry = registry.ok_or_else(|| {
format!("{command}: Rust module registry unavailable; refusing TypeScript fallback")
})?;
let (module, routed_command) = registry.route_command(command).ok_or_else(|| {
format!("{command}: no Rust module route registered; refusing TypeScript fallback")
})?;

match module.handle_command(&routed_command, params).await? {
CommandResult::Json(value) => Ok(value),
CommandResult::Binary { metadata, .. } => Ok(metadata),
}
}

#[cfg(test)]
mod turn_frame_recording_tests {
use super::*;
Expand Down Expand Up @@ -1803,25 +1831,28 @@ mod turn_frame_recording_tests {
mod turn_execute_tests {
//! Lane D persona/turn-execute command surface tests.
//!
//! End-to-end inference dispatch goes through the global
//! `command_executor::executor()` which is only initialized at
//! runtime startup (ipc/mod.rs). These tests cover the paths
//! that DON'T depend on the executor:
//! - persona-not-found returns a typed Err (loud-fail per
//! never-swallow rule)
//! - empty drain short-circuits before dispatch and returns
//! the documented {"replayRecord": null,
//! "inferenceResponse": null} bundle
//!
//! The dispatch-success path is covered by integration tests
//! that boot a Runtime with InferenceLlmModule registered.
//! These tests pin the Rust-only shape: success routes through a
//! `ModuleRegistry` with `InferenceLlmModule` registered; missing registry
//! or missing route fails loudly instead of falling through to TypeScript.
use super::*;
use crate::inference::llm_module_service::InferenceLlmModule;
use crate::rag::RagEngine;
use std::sync::Arc;

fn module_with_persona(persona_id: Uuid) -> CognitionModule {
module_with_persona_and_registry(persona_id, None)
}

fn module_with_persona_and_registry(
persona_id: Uuid,
registry: Option<Arc<ModuleRegistry>>,
) -> CognitionModule {
let rag_engine = Arc::new(RagEngine::new());
let state = Arc::new(CognitionState::new(rag_engine.clone()));
let mut state = CognitionState::new(rag_engine.clone());
if let Some(registry) = registry {
state = state.with_module_registry(registry);
}
let state = Arc::new(state);
state.personas.insert(
persona_id,
crate::persona::PersonaCognition::new(
Expand All @@ -1833,6 +1864,33 @@ mod turn_execute_tests {
CognitionModule::new(state)
}

fn rust_inference_registry() -> Arc<ModuleRegistry> {
let registry = Arc::new(ModuleRegistry::new());
registry.register(Arc::new(InferenceLlmModule::new()));
registry
}

fn enqueue_message(module: &CognitionModule, persona_id: Uuid, content: &str, timestamp: u64) {
let room_id = Uuid::new_v4();
let persona = module
.state
.personas
.get(&persona_id)
.expect("test persona exists");
persona.inbox.enqueue(InboxMessage {
id: Uuid::new_v4(),
room_id,
sender_id: Uuid::new_v4(),
sender_name: "Joel".to_string(),
sender_type: SenderType::Human,
content: content.to_string(),
timestamp,
priority: 0.9,
source_modality: Some(Modality::Chat),
voice_session_id: None,
});
}

#[tokio::test]
async fn turn_execute_persona_not_found_returns_typed_error() {
let rag_engine = Arc::new(RagEngine::new());
Expand Down Expand Up @@ -1865,11 +1923,7 @@ mod turn_execute_tests {
async fn turn_execute_empty_drain_returns_null_bundle() {
// Persona exists but inbox is empty -> the command should
// short-circuit BEFORE any inference dispatch, returning
// the documented null pair. If it ever tried to dispatch
// through command_executor::executor() (which isn't
// initialized in tests), the call would panic — so this
// test also doubles as a regression guard for the
// empty-drain short-circuit.
// the documented null pair.
let persona_id = Uuid::new_v4();
let module = module_with_persona(persona_id);

Expand Down Expand Up @@ -1930,6 +1984,71 @@ mod turn_execute_tests {
Ok(_) => panic!("u64::MAX max_duration_ms must fail u32 conversion"),
}
}

#[tokio::test]
async fn turn_execute_success_routes_through_rust_inference_module() {
let persona_id = Uuid::new_v4();
let module = module_with_persona_and_registry(persona_id, Some(rust_inference_registry()));
enqueue_message(&module, persona_id, "what changed?", 20_000);

let result = module
.handle_command(
"persona/turn-execute",
serde_json::json!({
"persona_id": persona_id.to_string(),
"max_tokens": 64,
"max_duration_ms": 1_000,
}),
)
.await
.expect("Rust inference module handles turn");

let CommandResult::Json(value) = result else {
panic!("expected Json");
};
assert_eq!(
value["replayRecord"]["responsePrompt"]["messages"][0]["content"],
"Joel: what changed?"
);
assert_eq!(
value["inferenceResponse"]["complete"]["tokensGenerated"], 3,
"registered InferenceLlmModule stub proves Rust-only dispatch reached inference"
);
assert!(
module
.state
.personas
.get(&persona_id)
.expect("persona remains")
.inbox
.is_empty(),
"turn-execute drains one consolidated frame"
);
}

#[tokio::test]
async fn turn_execute_missing_rust_registry_refuses_ts_fallback() {
let persona_id = Uuid::new_v4();
let module = module_with_persona(persona_id);
enqueue_message(&module, persona_id, "do not fall back to ts", 30_000);

let result = module
.handle_command(
"persona/turn-execute",
serde_json::json!({
"persona_id": persona_id.to_string(),
}),
)
.await;

match result {
Err(msg) => assert!(
msg.contains("refusing TypeScript fallback"),
"expected loud no-TS-fallback refusal, got: {msg}"
),
Ok(_) => panic!("missing Rust registry must not fall through"),
}
}
}

// ============================================================================
Expand Down
10 changes: 5 additions & 5 deletions src/workers/continuum-core/src/persona/turn_frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,10 @@ mod tests {
let prompt = PersonaTurnFrame::from_inbox_frame(frame)
.response_prompt()
.unwrap();
assert!(prompt.system_prompt.is_none(), "PR-1 leaves system_prompt for caller");
assert!(
prompt.system_prompt.is_none(),
"PR-1 leaves system_prompt for caller"
);
}

#[test]
Expand Down Expand Up @@ -757,10 +760,7 @@ mod tests {

// ─── ResponsePrompt::to_prompt_text (Lane D turn-execute) ──

fn prompt_with(
system: Option<&str>,
messages: Vec<(PromptRole, &str)>,
) -> ResponsePrompt {
fn prompt_with(system: Option<&str>, messages: Vec<(PromptRole, &str)>) -> ResponsePrompt {
ResponsePrompt {
persona_id: Uuid::nil(),
room_id: Uuid::nil(),
Expand Down
Loading