From e440036dd0172f2f78c54f46fdee34dbe9d72b82 Mon Sep 17 00:00:00 2001 From: joelteply Date: Fri, 29 May 2026 17:07:33 -0500 Subject: [PATCH] =?UTF-8?q?feat(continuum-core/persona):=20L0-2-respond-co?= =?UTF-8?q?ntext=20=E2=80=94=20required=20ResponderConfig,=20NeedsResponse?= =?UTF-8?q?=20outcome,=20no=20empty=20defaults?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reworked from the earlier L0-2-respond attempt (#1466, self-closed) after auditing three doctrine violations: 1. std::Mutex held across respond().await — blocks status/enroll/other personas' ticks for the full inference roundtrip 2. Empty-default fields on RespondInput (model: String::new(), etc.) wrapped as 'fail loudly at inference' — that's the silent-default- substitution pattern this migration is deleting on the TS side 3. RespondError as Ok outcome — circuit breaker never trips on repeated inference failures (silent degradation) This slice fixes them all by SHRINKING the scope: no respond() call yet. That's the next slice, which can rely on RespondInput being honestly constructed. What this slice does: - New ResponderConfig struct (model, system_prompt, capabilities, specialty). All required at enrollment time; validated non-empty with named errors for model + specialty - EnrolledPersona extends with responder_config field - enroll signature requires ResponderConfig as a parameter; rejected enrollments don't mutate state (validate before lock) - persona/enroll command parses model/system_prompt/specialty/ capabilities from JSON params; requires model loud - ServiceOnceOutcome updated: - SilentByDecision { message_id, decision } — gate said no - NeedsResponse { message_id, decision, respond_input } — gate said yes; respond_input is fully-formed from real config - UnsupportedItem unchanged - Idle unchanged - Evaluated REMOVED - service_once_for: pops + evaluates; if should_respond, builds RespondInput from real persona config + per-message context; no empty-string defaults - build_respond_input populates EVERY required field from responder_config + the chat wire. The genuinely-empty Vec fields (recent_history, known_specialties, other_persona_names, message_media, recalled_engrams) are LEGITIMATELY empty for first-turn fresh context, not silently-substituted defaults What this slice does NOT do: - Call respond(). Next slice owns that, plus the lock-around-await discipline + inference-error-trips-circuit-breaker contract - Wire persona/enroll from production code. L0-2-cutover Tests: 19/19 passing. 16 pre-existing + 3 new doctrine pins: - enroll_with_empty_model_is_rejected_loud - enroll_with_empty_specialty_is_rejected_loud - enroll_command_requires_model - service_once_for dispatch test extended to verify the RespondInput carries the persona's real model/specialty/ system_prompt, not empty defaults Verified on Xcode 26.3 + llama/metal feature. Card: 8d11027b Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/persona/service_module.rs | 348 +++++++++++++++--- 1 file changed, 303 insertions(+), 45 deletions(-) diff --git a/src/workers/continuum-core/src/persona/service_module.rs b/src/workers/continuum-core/src/persona/service_module.rs index 96ea94f10..107d403fc 100644 --- a/src/workers/continuum-core/src/persona/service_module.rs +++ b/src/workers/continuum-core/src/persona/service_module.rs @@ -1,15 +1,30 @@ //! `PersonaServiceModule` — singleton Rust `ServiceModule` for persona //! work. //! -//! ## L0-2-dispatch scope +//! ## L0-2-respond-context scope //! -//! Builds on L0-2-prep (#1464): each `EnrolledPersona` now carries a -//! per-persona `ChannelRegistry` + `PersonaState`. `service_once_for` -//! pops the next eligible item via `channel_registry::service_cycle` -//! and runs it through `full_evaluate` (the unified pre-response gate -//! from `persona::evaluator`). The result is recorded; the actual -//! `respond()` call needs more upstream context (`TurnContext`, room -//! history, known-specialties roster) that lands in a follow-up slice. +//! Builds on L0-2-dispatch (#1465). Each `EnrolledPersona` now carries +//! a required `ResponderConfig` (model, system_prompt, capabilities) +//! supplied at enrollment. When `full_evaluate` decides +//! `should_respond=true`, `service_once_for` constructs a fully-formed +//! `RespondInput` and surfaces it as `ServiceOnceOutcome::NeedsResponse`. +//! +//! What this slice does NOT do: call `respond()`. That's the next +//! slice, which can rely on `RespondInput` being honestly constructed +//! from real config + per-message context — no empty-string defaults +//! that the inference layer would have to fail loudly on. +//! +//! Why this shape, not "wire respond() now": +//! - Empty-default fields on `RespondInput` (the previous attempt) are +//! the silent-default-substitution pattern this whole migration is +//! deleting on the TS side. Not reinventing it in Rust. +//! - The lock discipline for calling `respond().await` from inside a +//! mutex-held context needs care (drop lock around inference). Best +//! to land the construction first, then the dispatch shape. +//! - Errors from `respond()` MUST trip the per-persona circuit breaker +//! (silent-degradation otherwise). That requires `service_once_for` +//! to surface inference errors as `Err`, not wrap them as `Ok`. The +//! next slice owns that contract. //! //! `tick` iterates enrolled personas, calls `service_once_for` on each, //! manages per-persona circuit-breaker (5 consecutive failures → 30s @@ -33,12 +48,17 @@ use async_trait::async_trait; use serde_json::{json, Value}; use uuid::Uuid; +use crate::cognition::response_orchestrator::PersonaSlot as ResponderPersona; +use crate::model_registry::Capability; use crate::persona::channel_registry::ChannelRegistry; use crate::persona::channel_types::ServiceCycleResult; use crate::persona::evaluator::{full_evaluate, FullEvaluateRequest, FullEvaluateResult}; +use crate::persona::response::RespondInput; +use crate::persona::turn_context::TurnContext; use crate::persona::types::{PersonaState, SenderType}; use crate::persona::unified::PersonaCognition; use serde::Deserialize; +use std::collections::HashSet; /// Wire shape that mirrors `ChatQueueItem::to_json()` (camelCase with a /// `"type": "chat"` discriminant). Used here to deserialize whatever @@ -76,10 +96,56 @@ const CIRCUIT_BREAKER_COOLDOWN_MS: u64 = 30_000; /// the rest. const MAX_DRAIN_PER_TICK: u32 = 20; +/// Per-persona persistent response configuration. Required at enrollment. +/// All fields validated non-empty/non-default at enrollment time so +/// `build_respond_input` can construct a honestly-populated `RespondInput` +/// — no empty-string fallbacks that the inference layer would have to +/// fail-loudly on. (Per Joel 2026-05-29 + the URI doctrine peer mapped: +/// empty model fails at the URI parser; same fail-loud should happen at +/// our boundary, not deeper.) +#[derive(Debug, Clone)] +pub struct ResponderConfig { + /// Model identifier this persona renders with. Non-empty. + pub model: String, + /// Persona's system prompt / identity template. For now used as-is; + /// RAG-enriched system prompt construction is upstream-context + /// plumbing that lands when the actual `respond()` dispatch wires. + pub system_prompt: String, + /// Model capabilities (vision, audio input, streaming, etc.). + /// Empty set is a VALID value (a text-only persona); but the field + /// must be supplied explicitly, not defaulted. + pub capabilities: HashSet, + /// Stable specialty identifier (e.g. "code-review", "general"). + /// Matched against `SharedAnalysis.suggested_angles` by the + /// response orchestrator. Non-empty (use "general" for unscoped). + pub specialty: String, +} + +impl ResponderConfig { + /// Validate required fields. Returns a clear error message naming + /// any missing piece so misconfiguration surfaces at enrollment, + /// not inside the inference layer. + pub fn validate(&self) -> Result<(), String> { + if self.model.trim().is_empty() { + return Err("ResponderConfig.model is empty (persona must declare its model)".to_string()); + } + if self.specialty.trim().is_empty() { + return Err( + "ResponderConfig.specialty is empty (use 'general' if unscoped, not empty)" + .to_string(), + ); + } + // system_prompt + capabilities may legitimately be empty for + // some personas; their emptiness is recorded but not rejected. + Ok(()) + } +} + /// Per-persona state inside the singleton service module. One entry per /// enrolled persona; carries the persona's cognition container, the -/// per-persona channel queues + state for the service loop, and the -/// per-enrollment circuit-breaker bookkeeping. +/// per-persona channel queues + state for the service loop, the +/// responder config supplied at enrollment, and the per-enrollment +/// circuit-breaker bookkeeping. /// /// Named `EnrolledPersona` rather than `PersonaSlot` to avoid collision /// with the existing `cognition::response_orchestrator::PersonaSlot` @@ -96,6 +162,10 @@ pub struct EnrolledPersona { /// by `service_cycle` to gate non-urgent items by `should_engage`. /// `service_cycle` updates the inbox_load field on every call. pub state: PersonaState, + /// Per-persona responder configuration. Required at enrollment; + /// supplies `model`, `system_prompt`, `capabilities`, `specialty` + /// for `build_respond_input` so no field needs an empty default. + pub responder_config: ResponderConfig, /// Unix-ms timestamp at which the per-persona circuit re-closes. /// 0 means the circuit is currently closed (healthy). pub circuit_open_until_ms: u64, @@ -105,13 +175,19 @@ pub struct EnrolledPersona { } impl EnrolledPersona { - fn new(persona_id: Uuid, display_name: String, cognition: PersonaCognition) -> Self { + fn new( + persona_id: Uuid, + display_name: String, + cognition: PersonaCognition, + responder_config: ResponderConfig, + ) -> Self { Self { persona_id, display_name, cognition, channels: ChannelRegistry::new(), state: PersonaState::new(), + responder_config, circuit_open_until_ms: 0, consecutive_failures: 0, } @@ -120,27 +196,34 @@ impl EnrolledPersona { /// Outcome of a single `service_once_for` call on one enrolled persona. /// -/// We do NOT yet call `respond()` from this slice — that needs upstream -/// context (`TurnContext`, room history, known-specialties roster) that -/// will plumb through in a follow-up slice. `Evaluated` carries the -/// `FullEvaluateResult` so the test harness (and eventually production) -/// can see what the gate decided. +/// L0-2-respond-context shape: no `respond()` call yet. When the gate +/// says yes, a fully-formed `RespondInput` is surfaced for the caller +/// (the actual dispatch slice owns the `respond()` call + the +/// inference-error-trips-circuit-breaker contract). #[derive(Debug)] pub enum ServiceOnceOutcome { /// The channel was idle; no item to dispatch this cycle. Idle, - /// An item was popped, evaluated, and the gate returned a decision. - /// `respond()` wiring lands in a follow-up slice; this outcome - /// carries the inputs that respond() would have consumed so callers - /// (and tests) can verify the path. - Evaluated { + /// `full_evaluate` decided NOT to respond. Carries the gate outcome + /// for observability. + SilentByDecision { message_id: Uuid, decision: FullEvaluateResult, }, - /// Item was popped but couldn't be deserialized as a chat - /// `InboxMessage`. Voice + task items live in the same channel - /// queues and will be wired in a later slice; for now they're - /// surfaced as `UnsupportedItem` rather than silently dropped. + /// `full_evaluate` decided to respond. The `RespondInput` is + /// fully-formed from the persona's responder config + per-message + /// context. Caller dispatches `persona::response::respond(input)` + /// in the next slice — that's where the lock-around-await + /// discipline + inference-error-trips-circuit-breaker contract + /// live. + NeedsResponse { + message_id: Uuid, + decision: FullEvaluateResult, + respond_input: Box, + }, + /// Item was popped but its `"type"` wasn't `"chat"`. Voice + task + /// items live in the same channel queues and will be wired in + /// later slices; surfaced here rather than silently dropped. UnsupportedItem { item_type: String }, } @@ -169,10 +252,19 @@ impl PersonaServiceModule { /// Enroll a persona. Constructs a `PersonaCognition` for it under the /// module's shared `RagEngine`, stores the slot. Idempotent: enrolling - /// the same id with a different display name updates the name; the - /// existing cognition + circuit-breaker state are preserved (do NOT - /// reset cognition state silently — that would be a fallback). - pub fn enroll(&self, persona_id: Uuid, display_name: impl Into) -> Result<(), String> { + /// the same id with a different display name updates the name AND the + /// responder config; the existing cognition + circuit-breaker state + /// are preserved (silently resetting cognition would be a fallback). + /// + /// Validates the `ResponderConfig` before mutating any state — a + /// rejected enrollment leaves the module untouched. + pub fn enroll( + &self, + persona_id: Uuid, + display_name: impl Into, + responder_config: ResponderConfig, + ) -> Result<(), String> { + responder_config.validate()?; let display_name = display_name.into(); let mut personas = self .personas @@ -180,6 +272,7 @@ impl PersonaServiceModule { .map_err(|_| "personas lock poisoned".to_string())?; if let Some(slot) = personas.get_mut(&persona_id) { slot.display_name = display_name; + slot.responder_config = responder_config; return Ok(()); } let cognition = PersonaCognition::new( @@ -189,7 +282,7 @@ impl PersonaServiceModule { ); personas.insert( persona_id, - EnrolledPersona::new(persona_id, display_name, cognition), + EnrolledPersona::new(persona_id, display_name, cognition, responder_config), ); Ok(()) } @@ -292,12 +385,65 @@ impl PersonaServiceModule { &persona.cognition.message_cache, now_ms, ); - Ok(ServiceOnceOutcome::Evaluated { + if !decision.should_respond { + return Ok(ServiceOnceOutcome::SilentByDecision { + message_id: wire.id, + decision, + }); + } + let respond_input = Self::build_respond_input(persona, &wire); + Ok(ServiceOnceOutcome::NeedsResponse { message_id: wire.id, decision, + respond_input: Box::new(respond_input), }) } + /// Construct a `RespondInput` for `persona::response::respond()` + /// from the enrolled persona's stored config + the popped chat-item + /// wire. Deterministic + side-effect free; no empty-string defaults + /// — every required field comes from `responder_config` (validated + /// at enrollment) or from the message itself. + /// + /// Fields that are LEGITIMATELY empty here: + /// - `turn_context.recent_history`: populated by L0-3/L0-4 when the + /// inbox-routing path plumbs prior-message context per-turn. For + /// now an empty Vec means "first-turn fresh context." + /// - `turn_context.known_specialties`: populated when the response + /// orchestrator has multiple-persona-in-room context. Empty Vec + /// means "no other-persona specialties to consider." + /// - `other_persona_names`: same provenance — populated when the + /// room roster is plumbed. + /// - `message_media`: populated when the chat item carries media + /// (next slice for media item wiring). + /// - `recalled_engrams`: populated when admission state recall is + /// wired (L0-3+). + /// + /// None of those are silently-substituted defaults — they're + /// genuinely-absent context that the receiver tolerates. The fields + /// that would be DANGEROUS to default (model, system_prompt, + /// capabilities, specialty) come from responder_config which is + /// validated non-empty at enrollment. + fn build_respond_input(persona: &EnrolledPersona, wire: &ChatItemWire) -> RespondInput { + RespondInput { + persona: ResponderPersona { + persona_id: persona.persona_id, + specialty: persona.responder_config.specialty.clone(), + display_name: persona.display_name.clone(), + }, + turn_context: TurnContext::arc(wire.room_id, Vec::new(), Vec::new()), + message_id: wire.id, + message_text: wire.content.clone(), + other_persona_names: Vec::new(), + system_prompt: persona.responder_config.system_prompt.clone(), + model: persona.responder_config.model.clone(), + is_voice: false, + message_media: Vec::new(), + capabilities: persona.responder_config.capabilities.clone(), + recalled_engrams: Vec::new(), + } + } + /// Iterate every enrolled persona, run `service_once_for` up to /// `MAX_DRAIN_PER_TICK` times per persona while the channel has /// work. Per-persona circuit breaker gates failures. @@ -410,7 +556,41 @@ impl ServiceModule for PersonaServiceModule { .and_then(Value::as_str) .ok_or_else(|| "persona/enroll requires display_name (string)".to_string())? .to_string(); - self.enroll(persona_id, display_name)?; + let model = params + .get("model") + .and_then(Value::as_str) + .ok_or_else(|| "persona/enroll requires model (string)".to_string())? + .to_string(); + let system_prompt = params + .get("system_prompt") + .and_then(Value::as_str) + .unwrap_or("") + .to_string(); + let specialty = params + .get("specialty") + .and_then(Value::as_str) + .unwrap_or("general") + .to_string(); + // capabilities arrives as a JSON array of strings; each + // entry is the kebab-case name of a `Capability` variant + // (matching the serde rename in model_registry::Capability). + let capabilities: HashSet = params + .get("capabilities") + .and_then(Value::as_array) + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_str()) + .filter_map(|s| serde_json::from_value::(json!(s)).ok()) + .collect() + }) + .unwrap_or_default(); + let responder_config = ResponderConfig { + model, + system_prompt, + capabilities, + specialty, + }; + self.enroll(persona_id, display_name, responder_config)?; Ok(CommandResult::Json(json!({ "enrolled": persona_id.to_string(), "total": self.enrolled_count()?, @@ -441,6 +621,15 @@ mod tests { PersonaServiceModule::new(Arc::new(RagEngine::new())) } + fn test_config() -> ResponderConfig { + ResponderConfig { + model: "test-model".to_string(), + system_prompt: "You are a helpful test persona.".to_string(), + capabilities: HashSet::new(), + specialty: "general".to_string(), + } + } + #[test] fn config_declares_persona_prefix_and_high_priority() { let m = fresh_module(); @@ -474,7 +663,12 @@ mod tests { let result = m .handle_command( "persona/enroll", - json!({"persona_id": persona_id.to_string(), "display_name": "Helper"}), + json!({ + "persona_id": persona_id.to_string(), + "display_name": "Helper", + "model": "test-model", + "specialty": "general", + }), ) .await .expect("enroll succeeds with valid params"); @@ -502,8 +696,8 @@ mod tests { async fn enroll_is_idempotent_and_updates_display_name() { let m = fresh_module(); let persona_id = Uuid::new_v4(); - m.enroll(persona_id, "First").expect("first enroll"); - m.enroll(persona_id, "Second").expect("second enroll"); + m.enroll(persona_id, "First", test_config()).expect("first enroll"); + m.enroll(persona_id, "Second", test_config()).expect("second enroll"); assert_eq!(m.enrolled_count().unwrap(), 1); let snapshot = m.enrolled_snapshot().unwrap(); assert_eq!(snapshot.len(), 1); @@ -515,8 +709,8 @@ mod tests { let m = fresh_module(); let a = Uuid::new_v4(); let b = Uuid::new_v4(); - m.enroll(a, "Alpha").expect("enroll alpha"); - m.enroll(b, "Beta").expect("enroll beta"); + m.enroll(a, "Alpha", test_config()).expect("enroll alpha"); + m.enroll(b, "Beta", test_config()).expect("enroll beta"); assert_eq!(m.enrolled_count().unwrap(), 2); } @@ -582,7 +776,7 @@ mod tests { async fn tick_with_enrolled_persona_and_no_items_is_no_op() { let m = fresh_module(); let persona_id = Uuid::new_v4(); - m.enroll(persona_id, "Helper").expect("enroll"); + m.enroll(persona_id, "Helper", test_config()).expect("enroll"); // No items in any channel — tick should drain nothing, errors zero. m.tick().await.expect("tick succeeds with idle persona"); assert_eq!(m.enrolled_count().unwrap(), 1); @@ -637,7 +831,7 @@ mod tests { async fn service_once_for_idle_returns_idle() { let m = fresh_module(); let persona_id = Uuid::new_v4(); - m.enroll(persona_id, "Helper").expect("enroll"); + m.enroll(persona_id, "Helper", test_config()).expect("enroll"); let mut personas = m.personas.lock().unwrap(); let persona = personas.get_mut(&persona_id).unwrap(); ensure_chat_channel(persona); @@ -650,7 +844,7 @@ mod tests { async fn service_once_for_dispatches_chat_item_through_full_evaluate() { let m = fresh_module(); let persona_id = Uuid::new_v4(); - m.enroll(persona_id, "Helper").expect("enroll"); + m.enroll(persona_id, "Helper", test_config()).expect("enroll"); let room_id = Uuid::new_v4(); let mut personas = m.personas.lock().unwrap(); let persona = personas.get_mut(&persona_id).unwrap(); @@ -663,21 +857,85 @@ mod tests { .expect("route chat item to Chat channel"); let outcome = PersonaServiceModule::service_once_for(persona, 1_700_000_000_000).expect("dispatch ok"); + // Sender is human + persona is not in DND + no rate limit → gate + // says respond → NeedsResponse with a fully-formed RespondInput. match outcome { - ServiceOnceOutcome::Evaluated { message_id, decision: _ } => { + ServiceOnceOutcome::NeedsResponse { + message_id, + decision: _, + respond_input, + } => { assert_eq!(message_id, expected_id); + // Verify the respond_input has the persona's real config, + // not empty defaults. This is the doctrine pin: no empty + // model, no empty specialty, no empty system_prompt + // (all came from test_config()). + assert_eq!(respond_input.model, "test-model"); + assert_eq!(respond_input.persona.specialty, "general"); + assert_eq!( + respond_input.system_prompt, + "You are a helpful test persona." + ); + assert_eq!(respond_input.message_id, expected_id); + assert_eq!(respond_input.message_text, "hello"); } - other => panic!("expected Evaluated, got {other:?}"), + other => panic!("expected NeedsResponse, got {other:?}"), } } + #[tokio::test] + async fn enroll_with_empty_model_is_rejected_loud() { + let m = fresh_module(); + let persona_id = Uuid::new_v4(); + let mut bad_config = test_config(); + bad_config.model = String::new(); + let err = m + .enroll(persona_id, "Helper", bad_config) + .expect_err("enroll must reject empty model"); + assert!(err.contains("model"), "error names the field: {err}"); + assert_eq!( + m.enrolled_count().unwrap(), + 0, + "rejected enrollment must not mutate state" + ); + } + + #[tokio::test] + async fn enroll_with_empty_specialty_is_rejected_loud() { + let m = fresh_module(); + let persona_id = Uuid::new_v4(); + let mut bad_config = test_config(); + bad_config.specialty = String::new(); + let err = m + .enroll(persona_id, "Helper", bad_config) + .expect_err("enroll must reject empty specialty"); + assert!(err.contains("specialty"), "error names the field: {err}"); + } + + #[tokio::test] + async fn enroll_command_requires_model() { + let m = fresh_module(); + let persona_id = Uuid::new_v4(); + let err = m + .handle_command( + "persona/enroll", + json!({ + "persona_id": persona_id.to_string(), + "display_name": "Helper", + }), + ) + .await + .expect_err("enroll command must require model"); + assert!(err.contains("model"), "error names the missing param: {err}"); + } + #[tokio::test] async fn drain_all_personas_processes_two_personas_independently() { let m = fresh_module(); let a = Uuid::new_v4(); let b = Uuid::new_v4(); - m.enroll(a, "Alpha").expect("enroll a"); - m.enroll(b, "Beta").expect("enroll b"); + m.enroll(a, "Alpha", test_config()).expect("enroll a"); + m.enroll(b, "Beta", test_config()).expect("enroll b"); let room_id = Uuid::new_v4(); { let mut personas = m.personas.lock().unwrap(); @@ -706,7 +964,7 @@ mod tests { // processed; the remainder stays queued. let m = fresh_module(); let persona_id = Uuid::new_v4(); - m.enroll(persona_id, "Helper").expect("enroll"); + m.enroll(persona_id, "Helper", test_config()).expect("enroll"); let room_id = Uuid::new_v4(); let staged = MAX_DRAIN_PER_TICK as usize + 5; {