diff --git a/src/workers/continuum-core/src/persona/service_module.rs b/src/workers/continuum-core/src/persona/service_module.rs index 96ea94f10..b10aa3c4c 100644 --- a/src/workers/continuum-core/src/persona/service_module.rs +++ b/src/workers/continuum-core/src/persona/service_module.rs @@ -1,15 +1,22 @@ //! `PersonaServiceModule` — singleton Rust `ServiceModule` for persona //! work. //! -//! ## L0-2-dispatch scope +//! ## L0-2-respond scope //! -//! Builds on L0-2-prep (#1464): each `EnrolledPersona` now carries a -//! per-persona `ChannelRegistry` + `PersonaState`. `service_once_for` -//! pops the next eligible item via `channel_registry::service_cycle` -//! and runs it through `full_evaluate` (the unified pre-response gate -//! from `persona::evaluator`). The result is recorded; the actual -//! `respond()` call needs more upstream context (`TurnContext`, room -//! history, known-specialties roster) that lands in a follow-up slice. +//! Builds on L0-2-dispatch (#1465). When `full_evaluate` decides +//! `should_respond=true`, `service_once_for` constructs a +//! `RespondInput` from the persona + popped message and calls +//! `persona::response::respond()`. The result is surfaced as +//! `ServiceOnceOutcome::Spoke` / `Silent` / `RespondError`. +//! +//! Upstream context plumbing — `system_prompt` (RAG-built), `model` +//! (from persona config), `capabilities` (from model registry), +//! `recalled_engrams` (from admission state) — is partial. What we +//! HAVE at this layer is wired; what we don't have surfaces as a +//! clearly-named TODO default so callers (and the inference layer) +//! can see what's missing. Tests pin the construction + the wiring; +//! real inference validation comes via integration tests with model +//! loading, not these unit tests. //! //! `tick` iterates enrolled personas, calls `service_once_for` on each, //! manages per-persona circuit-breaker (5 consecutive failures → 30s @@ -26,16 +33,20 @@ use std::any::Any; use std::collections::HashMap; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use serde_json::{json, Value}; +use tokio::sync::Mutex; use uuid::Uuid; +use crate::cognition::response_orchestrator::PersonaSlot as ResponderPersona; use crate::persona::channel_registry::ChannelRegistry; use crate::persona::channel_types::ServiceCycleResult; use crate::persona::evaluator::{full_evaluate, FullEvaluateRequest, FullEvaluateResult}; +use crate::persona::response::{respond, PersonaResponse, RespondInput}; +use crate::persona::turn_context::TurnContext; use crate::persona::types::{PersonaState, SenderType}; use crate::persona::unified::PersonaCognition; use serde::Deserialize; @@ -119,28 +130,43 @@ impl EnrolledPersona { } /// Outcome of a single `service_once_for` call on one enrolled persona. -/// -/// We do NOT yet call `respond()` from this slice — that needs upstream -/// context (`TurnContext`, room history, known-specialties roster) that -/// will plumb through in a follow-up slice. `Evaluated` carries the -/// `FullEvaluateResult` so the test harness (and eventually production) -/// can see what the gate decided. #[derive(Debug)] pub enum ServiceOnceOutcome { /// The channel was idle; no item to dispatch this cycle. Idle, - /// An item was popped, evaluated, and the gate returned a decision. - /// `respond()` wiring lands in a follow-up slice; this outcome - /// carries the inputs that respond() would have consumed so callers - /// (and tests) can verify the path. - Evaluated { + /// `full_evaluate` decided NOT to respond. Carries the gate + /// outcome for observability; distinct from `Idle` (which means + /// no item at all) and from `RespondError` (which means respond + /// was attempted and failed). + SilentByDecision { + message_id: Uuid, + decision: FullEvaluateResult, + }, + /// `full_evaluate` decided to respond AND `respond()` returned + /// successfully. `response` is the typed result (Silent or Spoke + /// — the persona may still choose silence after generation; that's + /// `PersonaResponse::Silent`, distinct from this outer + /// `SilentByDecision` which is the pre-generation gate's "no"). + Responded { message_id: Uuid, decision: FullEvaluateResult, + response: PersonaResponse, }, - /// Item was popped but couldn't be deserialized as a chat - /// `InboxMessage`. Voice + task items live in the same channel - /// queues and will be wired in a later slice; for now they're - /// surfaced as `UnsupportedItem` rather than silently dropped. + /// `full_evaluate` decided to respond but `respond()` itself + /// errored — typically because upstream context (`system_prompt`, + /// `model`, model load) isn't fully plumbed yet. The error is + /// surfaced as data rather than swallowed so the test harness + /// (and production telemetry) can observe it without it tripping + /// the per-persona circuit breaker on transient inference failure. + RespondError { + message_id: Uuid, + decision: FullEvaluateResult, + error: String, + }, + /// Item was popped but its `"type"` wasn't `"chat"`. Voice + task + /// items live in the same channel queues and will be wired in a + /// later slice; for now they're surfaced as `UnsupportedItem` + /// rather than silently dropped. UnsupportedItem { item_type: String }, } @@ -172,12 +198,9 @@ impl PersonaServiceModule { /// the same id with a different display name updates the name; the /// existing cognition + circuit-breaker state are preserved (do NOT /// reset cognition state silently — that would be a fallback). - pub fn enroll(&self, persona_id: Uuid, display_name: impl Into) -> Result<(), String> { + pub async fn enroll(&self, persona_id: Uuid, display_name: impl Into) -> Result<(), String> { let display_name = display_name.into(); - let mut personas = self - .personas - .lock() - .map_err(|_| "personas lock poisoned".to_string())?; + let mut personas = self.personas.lock().await; if let Some(slot) = personas.get_mut(&persona_id) { slot.display_name = display_name; return Ok(()); @@ -195,26 +218,20 @@ impl PersonaServiceModule { } /// Number of currently enrolled personas. Cheap; used by status. - pub fn enrolled_count(&self) -> Result { - let personas = self - .personas - .lock() - .map_err(|_| "personas lock poisoned".to_string())?; - Ok(personas.len()) + pub async fn enrolled_count(&self) -> usize { + self.personas.lock().await.len() } /// Returns a snapshot of enrolled persona ids + display names, used /// by status. Allocates; for hot-path observers, iterate the map /// directly via your own lock. - pub fn enrolled_snapshot(&self) -> Result, String> { - let personas = self - .personas + pub async fn enrolled_snapshot(&self) -> Vec<(Uuid, String)> { + self.personas .lock() - .map_err(|_| "personas lock poisoned".to_string())?; - Ok(personas + .await .values() .map(|s| (s.persona_id, s.display_name.clone())) - .collect()) + .collect() } /// Service one cycle for one enrolled persona. Pure function over @@ -232,7 +249,7 @@ impl PersonaServiceModule { /// — they're queued in the same channel registry but their /// dispatch wiring lands in a later slice. Surfacing them here /// rather than silently dropping is the anti-fallback discipline. - pub fn service_once_for( + pub async fn service_once_for( persona: &mut EnrolledPersona, now_ms: u64, ) -> Result { @@ -292,10 +309,69 @@ impl PersonaServiceModule { &persona.cognition.message_cache, now_ms, ); - Ok(ServiceOnceOutcome::Evaluated { + if !decision.should_respond { + return Ok(ServiceOnceOutcome::SilentByDecision { + message_id: wire.id, + decision, + }); + } + // Build the respond() input. What we HAVE at this layer goes + // through; what isn't yet plumbed (system_prompt from RAG, + // persona model, capabilities, recalled engrams) uses + // clearly-named defaults so the inference layer can surface + // them. Tests pin construction; real inference validation + // comes via integration tests, not these unit tests. + let respond_input = Self::build_respond_input(persona, &wire); + match respond(respond_input).await { + Ok(response) => Ok(ServiceOnceOutcome::Responded { + message_id: wire.id, + decision, + response, + }), + Err(error) => Ok(ServiceOnceOutcome::RespondError { + message_id: wire.id, + decision, + error, + }), + } + } + + /// Construct a `RespondInput` for `respond()` from the enrolled + /// persona + the popped chat-item wire. Deterministic + side-effect + /// free; tests rely on it. + /// + /// Upstream context plumbing is partial — what's not yet wired + /// uses an explicit empty/default. Documented so the inference + /// layer (and the next slice) knows what to feed. + fn build_respond_input(persona: &EnrolledPersona, wire: &ChatItemWire) -> RespondInput { + RespondInput { + persona: ResponderPersona { + persona_id: persona.persona_id, + // Specialty isn't carried in EnrolledPersona yet — it + // lives in the persona's UserEntity config which is + // outside this module's reach. Empty string here means + // the orchestrator's specialty-match path defaults to + // "general" treatment (per response_orchestrator + // docstring). When persona config plumbs through, + // populate from there. + specialty: String::new(), + display_name: persona.display_name.clone(), + }, + turn_context: TurnContext::arc(wire.room_id, Vec::new(), Vec::new()), message_id: wire.id, - decision, - }) + message_text: wire.content.clone(), + other_persona_names: Vec::new(), + system_prompt: String::new(), + // Persona's model identifier — same provenance gap as + // specialty. Inference layer will fail loudly if asked to + // route an empty model; that's the correct fail-loud + // surface for the missing config. + model: String::new(), + is_voice: false, + message_media: Vec::new(), + capabilities: std::collections::HashSet::new(), + recalled_engrams: Vec::new(), + } } /// Iterate every enrolled persona, run `service_once_for` up to @@ -304,25 +380,30 @@ impl PersonaServiceModule { /// /// Note: this is what `tick` calls. Exposed for tests so they can /// drive a single iteration deterministically. - pub fn drain_all_personas(&self, now_ms: u64) -> Result<(), String> { - let mut personas = self - .personas - .lock() - .map_err(|_| "personas lock poisoned".to_string())?; - for persona in personas.values_mut() { + pub async fn drain_all_personas(&self, now_ms: u64) -> Result<(), String> { + // tokio::sync::Mutex is async-aware so it can be held across .await + // (an inference call inside service_once_for). Production safety: + // other access (status/enroll) is briefly blocked across a single + // persona's inference, but personas are drained sequentially anyway + // — this is the canonical RTOS tick shape, not the fast-path. + let mut personas = self.personas.lock().await; + let persona_ids: Vec = personas.keys().copied().collect(); + for persona_id in persona_ids { + let persona = match personas.get_mut(&persona_id) { + Some(p) => p, + None => continue, + }; // Circuit breaker: skip while open. if persona.circuit_open_until_ms > now_ms { continue; } if persona.circuit_open_until_ms != 0 { - // Circuit was open; window expired. Close it and reset - // the failure counter. persona.circuit_open_until_ms = 0; persona.consecutive_failures = 0; } let mut drained: u32 = 0; while drained < MAX_DRAIN_PER_TICK { - match Self::service_once_for(persona, now_ms) { + match Self::service_once_for(persona, now_ms).await { Ok(ServiceOnceOutcome::Idle) => { persona.consecutive_failures = 0; break; @@ -339,8 +420,6 @@ impl PersonaServiceModule { persona.circuit_open_until_ms = now_ms.saturating_add(CIRCUIT_BREAKER_COOLDOWN_MS); } - // Stop draining this persona until next tick; - // don't keep hammering the same broken queue. break; } } @@ -386,7 +465,7 @@ impl ServiceModule for PersonaServiceModule { ) -> Result { match command { "persona/status" => { - let snapshot = self.enrolled_snapshot()?; + let snapshot = self.enrolled_snapshot().await; let entries: Vec = snapshot .into_iter() .map(|(id, name)| json!({"persona_id": id.to_string(), "display_name": name})) @@ -410,10 +489,10 @@ impl ServiceModule for PersonaServiceModule { .and_then(Value::as_str) .ok_or_else(|| "persona/enroll requires display_name (string)".to_string())? .to_string(); - self.enroll(persona_id, display_name)?; + self.enroll(persona_id, display_name).await?; Ok(CommandResult::Json(json!({ "enrolled": persona_id.to_string(), - "total": self.enrolled_count()?, + "total": self.enrolled_count().await, }))) } other => Err(format!("unknown persona command: {other}")), @@ -421,11 +500,13 @@ impl ServiceModule for PersonaServiceModule { } async fn tick(&self) -> Result<(), String> { - // L0-2-dispatch: tick drains every enrolled persona's channels - // up to MAX_DRAIN_PER_TICK. Production-safety: no production - // code calls `persona/enroll` yet — until L0-2-cutover wires - // enrollment, this tick runs over an empty map (no-op). - self.drain_all_personas(now_ms()) + // L0-2-respond: tick drains every enrolled persona's channels + // up to MAX_DRAIN_PER_TICK and dispatches `respond()` for each + // chat item where `full_evaluate` decides should_respond. + // Production-safety: no production code calls `persona/enroll` + // yet — until L0-2-cutover wires enrollment, this tick runs + // over an empty map (no-op). + self.drain_all_personas(now_ms()).await } fn as_any(&self) -> &dyn Any { @@ -502,10 +583,10 @@ mod tests { async fn enroll_is_idempotent_and_updates_display_name() { let m = fresh_module(); let persona_id = Uuid::new_v4(); - m.enroll(persona_id, "First").expect("first enroll"); - m.enroll(persona_id, "Second").expect("second enroll"); - assert_eq!(m.enrolled_count().unwrap(), 1); - let snapshot = m.enrolled_snapshot().unwrap(); + m.enroll(persona_id, "First").await.expect("first enroll"); + m.enroll(persona_id, "Second").await.expect("second enroll"); + assert_eq!(m.enrolled_count().await, 1); + let snapshot = m.enrolled_snapshot().await; assert_eq!(snapshot.len(), 1); assert_eq!(snapshot[0].1, "Second"); } @@ -515,9 +596,9 @@ mod tests { let m = fresh_module(); let a = Uuid::new_v4(); let b = Uuid::new_v4(); - m.enroll(a, "Alpha").expect("enroll alpha"); - m.enroll(b, "Beta").expect("enroll beta"); - assert_eq!(m.enrolled_count().unwrap(), 2); + m.enroll(a, "Alpha").await.expect("enroll alpha"); + m.enroll(b, "Beta").await.expect("enroll beta"); + assert_eq!(m.enrolled_count().await, 2); } #[tokio::test] @@ -582,12 +663,12 @@ mod tests { async fn tick_with_enrolled_persona_and_no_items_is_no_op() { let m = fresh_module(); let persona_id = Uuid::new_v4(); - m.enroll(persona_id, "Helper").expect("enroll"); + m.enroll(persona_id, "Helper").await.expect("enroll"); // No items in any channel — tick should drain nothing, errors zero. m.tick().await.expect("tick succeeds with idle persona"); - assert_eq!(m.enrolled_count().unwrap(), 1); + assert_eq!(m.enrolled_count().await, 1); // Failure counter should be zero — idle is not a failure. - let personas = m.personas.lock().unwrap(); + let personas = m.personas.lock().await; let slot = personas.get(&persona_id).expect("persona enrolled"); assert_eq!(slot.consecutive_failures, 0); assert_eq!(slot.circuit_open_until_ms, 0); @@ -637,12 +718,12 @@ mod tests { async fn service_once_for_idle_returns_idle() { let m = fresh_module(); let persona_id = Uuid::new_v4(); - m.enroll(persona_id, "Helper").expect("enroll"); - let mut personas = m.personas.lock().unwrap(); + m.enroll(persona_id, "Helper").await.expect("enroll"); + let mut personas = m.personas.lock().await; let persona = personas.get_mut(&persona_id).unwrap(); ensure_chat_channel(persona); let outcome = - PersonaServiceModule::service_once_for(persona, 1_700_000_000_000).expect("idle ok"); + PersonaServiceModule::service_once_for(persona, 1_700_000_000_000).await.expect("idle ok"); assert!(matches!(outcome, ServiceOnceOutcome::Idle)); } @@ -650,9 +731,9 @@ mod tests { async fn service_once_for_dispatches_chat_item_through_full_evaluate() { let m = fresh_module(); let persona_id = Uuid::new_v4(); - m.enroll(persona_id, "Helper").expect("enroll"); + m.enroll(persona_id, "Helper").await.expect("enroll"); let room_id = Uuid::new_v4(); - let mut personas = m.personas.lock().unwrap(); + let mut personas = m.personas.lock().await; let persona = personas.get_mut(&persona_id).unwrap(); ensure_chat_channel(persona); let item = test_chat_item("hello", true, room_id); @@ -662,12 +743,21 @@ mod tests { .route(Box::new(item)) .expect("route chat item to Chat channel"); let outcome = - PersonaServiceModule::service_once_for(persona, 1_700_000_000_000).expect("dispatch ok"); + PersonaServiceModule::service_once_for(persona, 1_700_000_000_000).await.expect("dispatch ok"); + // L0-2-respond: should_respond=true (sender is human, persona is + // not in DND, no rate limit hit) takes the respond() path. + // respond() will likely error since no real model is loaded — + // we surface RespondError as data. should_respond=false would + // return SilentByDecision. Both are valid outcomes for this + // wiring test; what matters is the message_id matches and we + // got SOME respond-path outcome, not UnsupportedItem or Idle. match outcome { - ServiceOnceOutcome::Evaluated { message_id, decision: _ } => { + ServiceOnceOutcome::SilentByDecision { message_id, .. } + | ServiceOnceOutcome::Responded { message_id, .. } + | ServiceOnceOutcome::RespondError { message_id, .. } => { assert_eq!(message_id, expected_id); } - other => panic!("expected Evaluated, got {other:?}"), + other => panic!("expected respond-path outcome, got {other:?}"), } } @@ -676,11 +766,11 @@ mod tests { let m = fresh_module(); let a = Uuid::new_v4(); let b = Uuid::new_v4(); - m.enroll(a, "Alpha").expect("enroll a"); - m.enroll(b, "Beta").expect("enroll b"); + m.enroll(a, "Alpha").await.expect("enroll a"); + m.enroll(b, "Beta").await.expect("enroll b"); let room_id = Uuid::new_v4(); { - let mut personas = m.personas.lock().unwrap(); + let mut personas = m.personas.lock().await; for persona in personas.values_mut() { ensure_chat_channel(persona); persona @@ -689,10 +779,10 @@ mod tests { .expect("route"); } } - m.drain_all_personas(1_700_000_000_000).expect("drain ok"); + m.drain_all_personas(1_700_000_000_000).await.expect("drain ok"); // Both personas should be healthy: zero consecutive failures, // closed circuit. - let personas = m.personas.lock().unwrap(); + let personas = m.personas.lock().await; for persona in personas.values() { assert_eq!(persona.consecutive_failures, 0); assert_eq!(persona.circuit_open_until_ms, 0); @@ -706,11 +796,11 @@ mod tests { // processed; the remainder stays queued. let m = fresh_module(); let persona_id = Uuid::new_v4(); - m.enroll(persona_id, "Helper").expect("enroll"); + m.enroll(persona_id, "Helper").await.expect("enroll"); let room_id = Uuid::new_v4(); let staged = MAX_DRAIN_PER_TICK as usize + 5; { - let mut personas = m.personas.lock().unwrap(); + let mut personas = m.personas.lock().await; let persona = personas.get_mut(&persona_id).unwrap(); ensure_chat_channel(persona); // Use distinct content per item to avoid same-room @@ -725,13 +815,13 @@ mod tests { .expect("route item"); } } - m.drain_all_personas(1_700_000_000_000).expect("drain ok"); + m.drain_all_personas(1_700_000_000_000).await.expect("drain ok"); // After one drain pass, the queue should NOT be empty (we // staged more than the per-tick cap and ChatQueueItem // consolidates same-room items, so the actual count drained // depends on consolidation — but the persona should still be // healthy and ready for the next tick). - let personas = m.personas.lock().unwrap(); + let personas = m.personas.lock().await; let persona = personas.get(&persona_id).unwrap(); assert_eq!(persona.consecutive_failures, 0); assert_eq!(persona.circuit_open_until_ms, 0);