diff --git a/src/workers/continuum-core/src/persona/service_module.rs b/src/workers/continuum-core/src/persona/service_module.rs index 107d403fc..458be20ec 100644 --- a/src/workers/continuum-core/src/persona/service_module.rs +++ b/src/workers/continuum-core/src/persona/service_module.rs @@ -1,34 +1,30 @@ //! `PersonaServiceModule` — singleton Rust `ServiceModule` for persona //! work. //! -//! ## L0-2-respond-context scope +//! ## L0-2-respond-call scope //! -//! Builds on L0-2-dispatch (#1465). Each `EnrolledPersona` now carries -//! a required `ResponderConfig` (model, system_prompt, capabilities) -//! supplied at enrollment. When `full_evaluate` decides -//! `should_respond=true`, `service_once_for` constructs a fully-formed -//! `RespondInput` and surfaces it as `ServiceOnceOutcome::NeedsResponse`. +//! Builds on L0-2-respond-context (#1467). `drain_all_personas` now +//! actually calls `Responder::respond()` for each `NeedsResponse` +//! outcome from `service_once_for`. Three contracts the previous +//! self-closed attempt got wrong, now specified properly: //! -//! What this slice does NOT do: call `respond()`. That's the next -//! slice, which can rely on `RespondInput` being honestly constructed -//! from real config + per-message context — no empty-string defaults -//! that the inference layer would have to fail loudly on. -//! -//! Why this shape, not "wire respond() now": -//! - Empty-default fields on `RespondInput` (the previous attempt) are -//! the silent-default-substitution pattern this whole migration is -//! deleting on the TS side. Not reinventing it in Rust. -//! - The lock discipline for calling `respond().await` from inside a -//! mutex-held context needs care (drop lock around inference). Best -//! to land the construction first, then the dispatch shape. -//! - Errors from `respond()` MUST trip the per-persona circuit breaker -//! (silent-degradation otherwise). That requires `service_once_for` -//! to surface inference errors as `Err`, not wrap them as `Ok`. The -//! next slice owns that contract. -//! -//! `tick` iterates enrolled personas, calls `service_once_for` on each, -//! manages per-persona circuit-breaker (5 consecutive failures → 30s -//! cooldown), respects `MAX_DRAIN_PER_TICK` per persona. +//! 1. **Lock discipline.** The personas mutex is dropped before +//! `respond().await`. Production safety: status / enroll / other +//! personas' ticks are NOT blocked across the multi-second +//! inference call. Pattern: collect ids briefly, then per-id: lock +//! briefly to pop+evaluate, drop, respond, lock briefly to update +//! circuit breaker. +//! 2. **Inference errors trip the circuit (with a higher threshold).** +//! `consecutive_inference_failures` is a separate counter from +//! `consecutive_service_failures`. Service-layer failures +//! (deserialization, channel access) trip at the standard +//! threshold (5). Inference failures trip at a higher threshold +//! (15) — preserves "transient hiccup ≠ broken persona" while +//! still surfacing "model never loads" as back-pressure. +//! 3. **`Responder` trait** for dependency injection. Production uses +//! `DefaultResponder` which calls `persona::response::respond`. +//! Tests inject a mock that captures call args + returns scripted +//! responses (or errors) without loading a real model. //! //! Production safety: no production code calls `persona/enroll` yet — //! the runtime's tick scheduler invokes `tick()` every 250ms but with @@ -53,13 +49,32 @@ use crate::model_registry::Capability; use crate::persona::channel_registry::ChannelRegistry; use crate::persona::channel_types::ServiceCycleResult; use crate::persona::evaluator::{full_evaluate, FullEvaluateRequest, FullEvaluateResult}; -use crate::persona::response::RespondInput; +use crate::persona::response::{PersonaResponse, RespondInput}; use crate::persona::turn_context::TurnContext; use crate::persona::types::{PersonaState, SenderType}; use crate::persona::unified::PersonaCognition; use serde::Deserialize; use std::collections::HashSet; +/// Dependency-injection point for response generation. Production binds +/// to `DefaultResponder` (which calls `persona::response::respond`). +/// Tests inject a mock that records calls and returns scripted outcomes +/// (or errors) without loading a real model. +#[async_trait] +pub trait Responder: Send + Sync { + async fn respond(&self, input: RespondInput) -> Result; +} + +/// Production `Responder` — dispatches to `persona::response::respond`. +pub struct DefaultResponder; + +#[async_trait] +impl Responder for DefaultResponder { + async fn respond(&self, input: RespondInput) -> Result { + crate::persona::response::respond(input).await + } +} + /// Wire shape that mirrors `ChatQueueItem::to_json()` (camelCase with a /// `"type": "chat"` discriminant). Used here to deserialize whatever /// `channel_registry::service_cycle` pops back into typed fields without @@ -86,9 +101,18 @@ use crate::rag::RagEngine; use crate::runtime::service_module::{CommandResult, ModuleConfig, ModulePriority, ServiceModule}; use crate::runtime::ModuleContext; -/// After this many consecutive `service_once_for` failures, open the -/// per-persona circuit for `CIRCUIT_BREAKER_COOLDOWN_MS`. -const CIRCUIT_BREAKER_MAX_CONSECUTIVE_FAILURES: u32 = 5; +/// After this many consecutive *service-layer* failures (deserialization, +/// channel access, lock poisoning), open the per-persona circuit for +/// `CIRCUIT_BREAKER_COOLDOWN_MS`. Service-layer failures are signs of +/// real structural problems — trip fast. +const CIRCUIT_BREAKER_MAX_CONSECUTIVE_SERVICE_FAILURES: u32 = 5; +/// After this many consecutive *inference* failures from `Responder::respond`, +/// open the per-persona circuit. Higher than the service threshold — +/// inference can be transiently slow / OOMy / model-loading without +/// the persona being structurally broken. But if the model genuinely +/// never loads, eventually trip and surface back-pressure rather than +/// silently dropping every message. +const CIRCUIT_BREAKER_MAX_CONSECUTIVE_INFERENCE_FAILURES: u32 = 15; /// Duration the per-persona circuit stays open after tripping. const CIRCUIT_BREAKER_COOLDOWN_MS: u64 = 30_000; /// Per-tick per-persona drain bound — caps how many items a single @@ -169,9 +193,15 @@ pub struct EnrolledPersona { /// Unix-ms timestamp at which the per-persona circuit re-closes. /// 0 means the circuit is currently closed (healthy). pub circuit_open_until_ms: u64, - /// Consecutive `service_once_for` failures since the last success. - /// Trips the circuit at `CIRCUIT_BREAKER_MAX_CONSECUTIVE_FAILURES`. - pub consecutive_failures: u32, + /// Consecutive service-layer failures (deserialization, channel + /// access, lock poisoning). Trips the circuit at + /// `CIRCUIT_BREAKER_MAX_CONSECUTIVE_SERVICE_FAILURES` (5). + pub consecutive_service_failures: u32, + /// Consecutive inference failures from `Responder::respond`. Trips + /// the circuit at `CIRCUIT_BREAKER_MAX_CONSECUTIVE_INFERENCE_FAILURES` + /// (15) — higher tolerance because inference can be transiently + /// slow/OOMy without the persona being structurally broken. + pub consecutive_inference_failures: u32, } impl EnrolledPersona { @@ -189,17 +219,39 @@ impl EnrolledPersona { state: PersonaState::new(), responder_config, circuit_open_until_ms: 0, - consecutive_failures: 0, + consecutive_service_failures: 0, + consecutive_inference_failures: 0, } } } +/// Output of the *synchronous* pop+decide step (`service_once_for`) +/// inside the lock. The async `Responder::respond` dispatch happens +/// outside the lock; `drain_all_personas` converts a `NeedsResponse` +/// decision into a `ServiceOnceOutcome::Responded` or surfaces the +/// inference error. +#[derive(Debug)] +pub enum ServicePopDecision { + /// The channel was idle; nothing to pop. + Idle, + /// `full_evaluate` decided NOT to respond. + Silent { + message_id: Uuid, + decision: FullEvaluateResult, + }, + /// `full_evaluate` decided to respond; `respond_input` is fully-formed. + /// The caller dispatches `Responder::respond(*respond_input)` OUTSIDE + /// the lock. + NeedsResponse { + message_id: Uuid, + decision: FullEvaluateResult, + respond_input: Box, + }, + /// Popped item had a non-chat `"type"` discriminant. + UnsupportedItem { item_type: String }, +} + /// Outcome of a single `service_once_for` call on one enrolled persona. -/// -/// L0-2-respond-context shape: no `respond()` call yet. When the gate -/// says yes, a fully-formed `RespondInput` is surfaced for the caller -/// (the actual dispatch slice owns the `respond()` call + the -/// inference-error-trips-circuit-breaker contract). #[derive(Debug)] pub enum ServiceOnceOutcome { /// The channel was idle; no item to dispatch this cycle. @@ -210,16 +262,14 @@ pub enum ServiceOnceOutcome { message_id: Uuid, decision: FullEvaluateResult, }, - /// `full_evaluate` decided to respond. The `RespondInput` is - /// fully-formed from the persona's responder config + per-message - /// context. Caller dispatches `persona::response::respond(input)` - /// in the next slice — that's where the lock-around-await - /// discipline + inference-error-trips-circuit-breaker contract - /// live. - NeedsResponse { + /// `full_evaluate` decided to respond AND `Responder::respond` + /// returned successfully. `response` is the typed result + /// (`PersonaResponse::Silent` if the persona chose silence after + /// generation, `PersonaResponse::Spoke` otherwise). + Responded { message_id: Uuid, decision: FullEvaluateResult, - respond_input: Box, + response: PersonaResponse, }, /// Item was popped but its `"type"` wasn't `"chat"`. Voice + task /// items live in the same channel queues and will be wired in @@ -231,22 +281,32 @@ pub enum ServiceOnceOutcome { /// `PersonaAutonomousLoop`; the deletion of `PersonaAutonomousLoop.ts` /// lands with L0-2-cutover. pub struct PersonaServiceModule { - /// Per-persona state, keyed by persona_id. One mutex over the whole - /// map — for the 15-persona load this is fine. If a future profile - /// ever shows contention here, split into per-slot `Mutex` - /// inside a dashmap or similar. + /// Per-persona state, keyed by persona_id. `std::sync::Mutex` — + /// MUST NOT be held across `.await`. The lock discipline in + /// `drain_all_personas` is built around that constraint: lock + /// briefly to pop+evaluate, drop, await `Responder::respond`, lock + /// briefly to update circuit breaker state. personas: Mutex>, /// Shared `RagEngine` used to construct each persona's cognition. /// Held at module level so all personas share a single retrieval /// substrate (corpora, indexes, caches). rag_engine: Arc, + /// Response dispatcher. Production injects `DefaultResponder` + /// (calls `persona::response::respond`); tests inject a mock that + /// returns scripted outcomes without loading a real model. + responder: Arc, } impl PersonaServiceModule { pub fn new(rag_engine: Arc) -> Self { + Self::with_responder(rag_engine, Arc::new(DefaultResponder)) + } + + pub fn with_responder(rag_engine: Arc, responder: Arc) -> Self { Self { personas: Mutex::new(HashMap::new()), rag_engine, + responder, } } @@ -328,29 +388,21 @@ impl PersonaServiceModule { pub fn service_once_for( persona: &mut EnrolledPersona, now_ms: u64, - ) -> Result { + ) -> Result { let result: ServiceCycleResult = persona.channels.service_cycle(&mut persona.state); if !result.should_process { - return Ok(ServiceOnceOutcome::Idle); + return Ok(ServicePopDecision::Idle); } let item_value = result.item.ok_or_else(|| { "service_cycle reported should_process=true but no item attached".to_string() })?; - // The wire format is `ChatQueueItem::to_json()`'s output — camelCase - // JSON with a `"type"` discriminant. We deserialize via a local - // wire struct rather than InboxMessage (which is the flat-inbox - // shape and uses snake_case serde defaults). let item_type = item_value .get("type") .and_then(Value::as_str) .unwrap_or("unknown") .to_string(); - // Chat items are the only kind this slice dispatches. Voice + task - // items arrive as different JSON shapes from - // `channel_items::{Voice,Task}::to_json()`; their dispatch comes - // in a later slice. if item_type != "chat" { - return Ok(ServiceOnceOutcome::UnsupportedItem { item_type }); + return Ok(ServicePopDecision::UnsupportedItem { item_type }); } let wire: ChatItemWire = serde_json::from_value(item_value).map_err(|e| { format!("service_once_for: failed to deserialize chat item: {e}") @@ -386,13 +438,13 @@ impl PersonaServiceModule { now_ms, ); if !decision.should_respond { - return Ok(ServiceOnceOutcome::SilentByDecision { + return Ok(ServicePopDecision::Silent { message_id: wire.id, decision, }); } let respond_input = Self::build_respond_input(persona, &wire); - Ok(ServiceOnceOutcome::NeedsResponse { + Ok(ServicePopDecision::NeedsResponse { message_id: wire.id, decision, respond_input: Box::new(respond_input), @@ -444,56 +496,146 @@ impl PersonaServiceModule { } } - /// Iterate every enrolled persona, run `service_once_for` up to - /// `MAX_DRAIN_PER_TICK` times per persona while the channel has - /// work. Per-persona circuit breaker gates failures. + /// Iterate every enrolled persona, run a pop+evaluate+(maybe)respond + /// cycle up to `MAX_DRAIN_PER_TICK` times per persona while the + /// channel has work. Per-persona circuit breaker gates failures. /// - /// Note: this is what `tick` calls. Exposed for tests so they can - /// drive a single iteration deterministically. - pub fn drain_all_personas(&self, now_ms: u64) -> Result<(), String> { - let mut personas = self - .personas - .lock() - .map_err(|_| "personas lock poisoned".to_string())?; - for persona in personas.values_mut() { - // Circuit breaker: skip while open. - if persona.circuit_open_until_ms > now_ms { - continue; - } - if persona.circuit_open_until_ms != 0 { - // Circuit was open; window expired. Close it and reset - // the failure counter. - persona.circuit_open_until_ms = 0; - persona.consecutive_failures = 0; - } + /// Lock discipline (the load-bearing contract): + /// 1. Brief lock at top: collect persona ids. + /// 2. Drop lock. + /// 3. Per persona id: + /// a. Brief lock: check circuit, call `service_once_for` (sync + /// pop+evaluate, returns `ServicePopDecision`), update state + /// for outcomes that don't need `respond()`. + /// b. Drop lock. + /// c. If `NeedsResponse`: call `responder.respond(...).await` + /// OUTSIDE the lock — production safety, status / enroll / + /// other personas don't block across the multi-second + /// inference call. + /// d. Brief lock: update circuit-breaker state based on respond + /// result (success resets `consecutive_inference_failures`, + /// failure increments + may trip CB at the inference threshold). + pub async fn drain_all_personas(&self, now_ms: u64) -> Result<(), String> { + let persona_ids: Vec = { + let personas = self + .personas + .lock() + .map_err(|_| "personas lock poisoned".to_string())?; + personas.keys().copied().collect() + }; + for persona_id in persona_ids { let mut drained: u32 = 0; - while drained < MAX_DRAIN_PER_TICK { - match Self::service_once_for(persona, now_ms) { - Ok(ServiceOnceOutcome::Idle) => { - persona.consecutive_failures = 0; - break; + 'drain_loop: while drained < MAX_DRAIN_PER_TICK { + let pop_result = { + let mut personas = self + .personas + .lock() + .map_err(|_| "personas lock poisoned".to_string())?; + let persona = match personas.get_mut(&persona_id) { + Some(p) => p, + None => break 'drain_loop, // unenrolled mid-tick + }; + if persona.circuit_open_until_ms > now_ms { + break 'drain_loop; + } + if persona.circuit_open_until_ms != 0 { + persona.circuit_open_until_ms = 0; + persona.consecutive_service_failures = 0; + persona.consecutive_inference_failures = 0; } - Ok(_) => { - persona.consecutive_failures = 0; + Self::service_once_for(persona, now_ms) + }; + match pop_result { + Ok(ServicePopDecision::Idle) => { + self.with_persona(persona_id, |p| { + p.consecutive_service_failures = 0; + })?; + break 'drain_loop; + } + Ok(ServicePopDecision::Silent { .. }) + | Ok(ServicePopDecision::UnsupportedItem { .. }) => { + self.with_persona(persona_id, |p| { + p.consecutive_service_failures = 0; + })?; drained += 1; } - Err(_) => { - persona.consecutive_failures += 1; - if persona.consecutive_failures - >= CIRCUIT_BREAKER_MAX_CONSECUTIVE_FAILURES - { - persona.circuit_open_until_ms = - now_ms.saturating_add(CIRCUIT_BREAKER_COOLDOWN_MS); + Ok(ServicePopDecision::NeedsResponse { + respond_input, .. + }) => { + // Lock is dropped here. respond() runs free. + let respond_result = self.responder.respond(*respond_input).await; + match respond_result { + Ok(_response) => { + self.with_persona(persona_id, |p| { + p.consecutive_service_failures = 0; + p.consecutive_inference_failures = 0; + })?; + drained += 1; + } + Err(_err) => { + let tripped = self.with_persona(persona_id, |p| { + p.consecutive_inference_failures += 1; + if p.consecutive_inference_failures + >= CIRCUIT_BREAKER_MAX_CONSECUTIVE_INFERENCE_FAILURES + { + p.circuit_open_until_ms = + now_ms.saturating_add(CIRCUIT_BREAKER_COOLDOWN_MS); + true + } else { + false + } + })?; + if tripped { + break 'drain_loop; + } + // Inference error but circuit not yet + // tripped — stop draining this persona + // this tick. Don't keep hammering the + // same misconfigured model on this same + // tick; let the next tick retry. + break 'drain_loop; + } } - // Stop draining this persona until next tick; - // don't keep hammering the same broken queue. - break; + } + Err(_) => { + let tripped = self.with_persona(persona_id, |p| { + p.consecutive_service_failures += 1; + if p.consecutive_service_failures + >= CIRCUIT_BREAKER_MAX_CONSECUTIVE_SERVICE_FAILURES + { + p.circuit_open_until_ms = + now_ms.saturating_add(CIRCUIT_BREAKER_COOLDOWN_MS); + true + } else { + false + } + })?; + let _ = tripped; + break 'drain_loop; } } } } Ok(()) } + + /// Briefly lock the personas map and run `f` on the named persona + /// if it's still enrolled. The closure runs inside the lock; do + /// not `.await` inside. + fn with_persona(&self, persona_id: Uuid, f: F) -> Result + where + F: FnOnce(&mut EnrolledPersona) -> R, + R: Default, + { + let mut personas = self + .personas + .lock() + .map_err(|_| "personas lock poisoned".to_string())?; + Ok(match personas.get_mut(&persona_id) { + Some(p) => f(p), + None => R::default(), + }) + } } /// Wall-clock helper. Tied off behind a free function so production + @@ -605,7 +747,7 @@ impl ServiceModule for PersonaServiceModule { // up to MAX_DRAIN_PER_TICK. Production-safety: no production // code calls `persona/enroll` yet — until L0-2-cutover wires // enrollment, this tick runs over an empty map (no-op). - self.drain_all_personas(now_ms()) + self.drain_all_personas(now_ms()).await } fn as_any(&self) -> &dyn Any { @@ -783,7 +925,7 @@ mod tests { // Failure counter should be zero — idle is not a failure. let personas = m.personas.lock().unwrap(); let slot = personas.get(&persona_id).expect("persona enrolled"); - assert_eq!(slot.consecutive_failures, 0); + assert_eq!(slot.consecutive_service_failures, 0); assert_eq!(slot.circuit_open_until_ms, 0); } @@ -837,7 +979,7 @@ mod tests { ensure_chat_channel(persona); let outcome = PersonaServiceModule::service_once_for(persona, 1_700_000_000_000).expect("idle ok"); - assert!(matches!(outcome, ServiceOnceOutcome::Idle)); + assert!(matches!(outcome, ServicePopDecision::Idle)); } #[tokio::test] @@ -860,7 +1002,7 @@ mod tests { // Sender is human + persona is not in DND + no rate limit → gate // says respond → NeedsResponse with a fully-formed RespondInput. match outcome { - ServiceOnceOutcome::NeedsResponse { + ServicePopDecision::NeedsResponse { message_id, decision: _, respond_input, @@ -947,12 +1089,12 @@ mod tests { .expect("route"); } } - m.drain_all_personas(1_700_000_000_000).expect("drain ok"); + m.drain_all_personas(1_700_000_000_000).await.expect("drain ok"); // Both personas should be healthy: zero consecutive failures, // closed circuit. let personas = m.personas.lock().unwrap(); for persona in personas.values() { - assert_eq!(persona.consecutive_failures, 0); + assert_eq!(persona.consecutive_service_failures, 0); assert_eq!(persona.circuit_open_until_ms, 0); } } @@ -983,7 +1125,7 @@ mod tests { .expect("route item"); } } - m.drain_all_personas(1_700_000_000_000).expect("drain ok"); + m.drain_all_personas(1_700_000_000_000).await.expect("drain ok"); // After one drain pass, the queue should NOT be empty (we // staged more than the per-tick cap and ChatQueueItem // consolidates same-room items, so the actual count drained @@ -991,7 +1133,7 @@ mod tests { // healthy and ready for the next tick). let personas = m.personas.lock().unwrap(); let persona = personas.get(&persona_id).unwrap(); - assert_eq!(persona.consecutive_failures, 0); + assert_eq!(persona.consecutive_service_failures, 0); assert_eq!(persona.circuit_open_until_ms, 0); } @@ -1002,4 +1144,260 @@ mod tests { let m = fresh_module(); m.tick().await.expect("empty tick succeeds"); } + + // --- L0-2-respond-call tests: Responder DI, inference CB threshold --- + + use std::sync::atomic::{AtomicU32, Ordering}; + + /// Test responder that records every call + returns scripted outcomes. + struct MockResponder { + call_count: AtomicU32, + scripted: ResponderScript, + } + + enum ResponderScript { + /// Always returns Spoke with the given text. + AlwaysSpoke(String), + /// Always returns an error with the given message. + AlwaysErr(String), + } + + #[async_trait] + impl Responder for MockResponder { + async fn respond(&self, input: RespondInput) -> Result { + self.call_count.fetch_add(1, Ordering::SeqCst); + match &self.scripted { + ResponderScript::AlwaysSpoke(text) => Ok(PersonaResponse::Spoke { + persona_id: input.persona.persona_id, + text: text.clone(), + model_used: input.model.clone(), + inference_ms: 1, + total_ms: 2, + think_blocks_emitted: 0, + }), + ResponderScript::AlwaysErr(msg) => Err(msg.clone()), + } + } + } + + fn module_with_responder(script: ResponderScript) -> (PersonaServiceModule, Arc) { + let mock = Arc::new(MockResponder { + call_count: AtomicU32::new(0), + scripted: script, + }); + let m = PersonaServiceModule::with_responder( + Arc::new(RagEngine::new()), + mock.clone() as Arc, + ); + (m, mock) + } + + #[tokio::test] + async fn drain_calls_responder_when_gate_says_yes() { + let (m, mock) = + module_with_responder(ResponderScript::AlwaysSpoke("howdy".to_string())); + let persona_id = Uuid::new_v4(); + m.enroll(persona_id, "Helper", test_config()) + .expect("enroll"); + let room_id = Uuid::new_v4(); + { + let mut personas = m.personas.lock().unwrap(); + let persona = personas.get_mut(&persona_id).unwrap(); + ensure_chat_channel(persona); + persona + .channels + .route(Box::new(test_chat_item("hi", true, room_id))) + .expect("route"); + } + m.drain_all_personas(1_700_000_000_000) + .await + .expect("drain ok"); + assert_eq!( + mock.call_count.load(Ordering::SeqCst), + 1, + "responder must be called exactly once for the single popped item" + ); + // Persona healthy (no failures, circuit closed). + let personas = m.personas.lock().unwrap(); + let p = personas.get(&persona_id).unwrap(); + assert_eq!(p.consecutive_service_failures, 0); + assert_eq!(p.consecutive_inference_failures, 0); + assert_eq!(p.circuit_open_until_ms, 0); + } + + #[tokio::test] + async fn drain_does_not_call_responder_when_gate_says_no() { + // ai-sender + no @mention → response_cap / sender filter typically + // gates it silent. Either way, if SilentByDecision fires, the + // responder must NOT be invoked. + let (m, mock) = + module_with_responder(ResponderScript::AlwaysSpoke("never".to_string())); + let persona_id = Uuid::new_v4(); + m.enroll(persona_id, "Helper", test_config()) + .expect("enroll"); + let room_id = Uuid::new_v4(); + { + let mut personas = m.personas.lock().unwrap(); + let persona = personas.get_mut(&persona_id).unwrap(); + ensure_chat_channel(persona); + // ai-sender, not mentioned — the gate typically goes silent here + persona + .channels + .route(Box::new(test_chat_item("hi", false, room_id))) + .expect("route"); + } + m.drain_all_personas(1_700_000_000_000) + .await + .expect("drain ok"); + // Whether the gate said yes or no for this specific shape isn't + // guaranteed by full_evaluate alone — what's guaranteed is that + // IF the gate says no, responder is never called. We can't reliably + // assert gate behavior here without mocking it, so we assert the + // weaker (and architecturally interesting) invariant: call_count + // is either 0 (gate silent) or 1 (gate said yes), never higher. + let calls = mock.call_count.load(Ordering::SeqCst); + assert!(calls <= 1, "responder called more than once: {calls}"); + } + + #[tokio::test] + async fn inference_errors_eventually_trip_circuit_at_inference_threshold() { + // Repeated inference failures should trip the CB at the inference + // threshold (15), not the service threshold (5). To exercise this + // we need 15 successful pops + inference failures, but drain caps + // at MAX_DRAIN_PER_TICK (20) per tick AND breaks on inference + // error. So each tick we hit exactly ONE inference error before + // breaking. We drive 15 ticks. + let (m, mock) = module_with_responder(ResponderScript::AlwaysErr( + "model not loaded".to_string(), + )); + let persona_id = Uuid::new_v4(); + m.enroll(persona_id, "Helper", test_config()) + .expect("enroll"); + let room_id = Uuid::new_v4(); + for tick in 0..CIRCUIT_BREAKER_MAX_CONSECUTIVE_INFERENCE_FAILURES { + // Stage a fresh item on each tick. + { + let mut personas = m.personas.lock().unwrap(); + let persona = personas.get_mut(&persona_id).unwrap(); + ensure_chat_channel(persona); + let mut item = test_chat_item(&format!("msg {tick}"), true, room_id); + item.timestamp = 1_700_000_000_000 + tick as u64; + persona.channels.route(Box::new(item)).expect("route"); + } + m.drain_all_personas(1_700_000_000_000 + tick as u64) + .await + .expect("drain ok"); + } + let calls = mock.call_count.load(Ordering::SeqCst); + assert_eq!( + calls, CIRCUIT_BREAKER_MAX_CONSECUTIVE_INFERENCE_FAILURES, + "responder should be called exactly the threshold count of times" + ); + let personas = m.personas.lock().unwrap(); + let p = personas.get(&persona_id).unwrap(); + assert_eq!( + p.consecutive_inference_failures, + CIRCUIT_BREAKER_MAX_CONSECUTIVE_INFERENCE_FAILURES, + "inference failure counter should equal the threshold" + ); + assert_ne!( + p.circuit_open_until_ms, 0, + "circuit must be open after threshold inference failures" + ); + } + + #[tokio::test] + async fn inference_failure_below_threshold_does_not_trip_circuit() { + // 1 inference error → counter at 1, circuit still closed. + let (m, _mock) = module_with_responder(ResponderScript::AlwaysErr( + "transient hiccup".to_string(), + )); + let persona_id = Uuid::new_v4(); + m.enroll(persona_id, "Helper", test_config()) + .expect("enroll"); + let room_id = Uuid::new_v4(); + { + let mut personas = m.personas.lock().unwrap(); + let persona = personas.get_mut(&persona_id).unwrap(); + ensure_chat_channel(persona); + persona + .channels + .route(Box::new(test_chat_item("hi", true, room_id))) + .expect("route"); + } + m.drain_all_personas(1_700_000_000_000) + .await + .expect("drain ok"); + let personas = m.personas.lock().unwrap(); + let p = personas.get(&persona_id).unwrap(); + assert_eq!(p.consecutive_inference_failures, 1); + assert_eq!( + p.circuit_open_until_ms, 0, + "single inference failure must not trip circuit (threshold is higher)" + ); + } + + #[tokio::test] + async fn successful_response_resets_inference_failure_counter() { + // 1 inference error followed by 1 success should reset counter. + // We do this via a counter-based mock that errors once then spokes. + struct OnceErrThenSpoke { + calls: AtomicU32, + } + #[async_trait] + impl Responder for OnceErrThenSpoke { + async fn respond(&self, input: RespondInput) -> Result { + let n = self.calls.fetch_add(1, Ordering::SeqCst); + if n == 0 { + Err("first call errors".to_string()) + } else { + Ok(PersonaResponse::Spoke { + persona_id: input.persona.persona_id, + text: "ok".to_string(), + model_used: input.model.clone(), + inference_ms: 1, + total_ms: 2, + think_blocks_emitted: 0, + }) + } + } + } + let mock = Arc::new(OnceErrThenSpoke { + calls: AtomicU32::new(0), + }); + let m = PersonaServiceModule::with_responder( + Arc::new(RagEngine::new()), + mock.clone() as Arc, + ); + let persona_id = Uuid::new_v4(); + m.enroll(persona_id, "Helper", test_config()) + .expect("enroll"); + let room_id = Uuid::new_v4(); + // Tick 1: route an item + drain → inference error + { + let mut personas = m.personas.lock().unwrap(); + let p = personas.get_mut(&persona_id).unwrap(); + ensure_chat_channel(p); + p.channels + .route(Box::new(test_chat_item("first", true, room_id))) + .expect("route"); + } + m.drain_all_personas(1_700_000_000_000).await.expect("ok"); + // Tick 2: route fresh item + drain → success + { + let mut personas = m.personas.lock().unwrap(); + let p = personas.get_mut(&persona_id).unwrap(); + let mut item = test_chat_item("second", true, room_id); + item.timestamp = 1_700_000_000_001; + p.channels.route(Box::new(item)).expect("route"); + } + m.drain_all_personas(1_700_000_000_001).await.expect("ok"); + // After the success, the inference counter should be reset to 0. + let personas = m.personas.lock().unwrap(); + let p = personas.get(&persona_id).unwrap(); + assert_eq!( + p.consecutive_inference_failures, 0, + "successful response after error must reset counter" + ); + } }