From 403b95780bfdb408544fad79d790b1f96dfb18fe Mon Sep 17 00:00:00 2001 From: Victor Sumner Date: Tue, 31 Mar 2026 20:09:30 -0400 Subject: [PATCH 1/3] feat(memory): add participant context foundation --- docs/design-docs/participant-awareness.md | 2 + docs/design-docs/working-memory.md | 8 +- prompts/en/channel.md.j2 | 20 +- src/agent/channel.rs | 386 +++++++++++++++------- src/agent/channel_history.rs | 42 ++- src/api/channels.rs | 19 +- src/config.rs | 21 ++ src/config/load.rs | 28 +- src/config/runtime.rs | 5 + src/config/toml_schema.rs | 9 + src/config/types.rs | 35 ++ src/conversation.rs | 5 + src/conversation/participants.rs | 288 ++++++++++++++++ src/memory/working.rs | 158 +++++++++ src/prompts/engine.rs | 9 +- tests/context_dump.rs | 2 + 16 files changed, 884 insertions(+), 153 deletions(-) create mode 100644 src/conversation/participants.rs diff --git a/docs/design-docs/participant-awareness.md b/docs/design-docs/participant-awareness.md index 3a2fe1a9a..d73a818c3 100644 --- a/docs/design-docs/participant-awareness.md +++ b/docs/design-docs/participant-awareness.md @@ -253,6 +253,8 @@ The participant summary loop would then use the canonical user ID for memory rec ## Configuration +The current codebase already has a smaller `ParticipantContextConfig` runtime surface for prompt-time rendering and a per-channel active participant map. The config below is still the intended full-pipeline target once summary generation and persistence land. + ```rust pub struct ParticipantConfig { pub enabled: bool, // default: true diff --git a/docs/design-docs/working-memory.md b/docs/design-docs/working-memory.md index a5a49d06e..fd62f0e4d 100644 --- a/docs/design-docs/working-memory.md +++ b/docs/design-docs/working-memory.md @@ -350,9 +350,9 @@ Per-user context injected when specific users are active in the current channel. ### Relationship to existing designs -This layer is the `participant-awareness.md` design, integrated into the layered context assembly. The design is unchanged -- `humans` table, cortex-generated summaries, cached and injected when `participants.len() >= min_participants`. The only change is its position in the prompt (it moves from "after Memory Context" to its own layer in the new assembly order). +The long-term target for this layer is still the `participant-awareness.md` design, but the current implementation is a lighter config-backed variant. The channel now maintains a per-session active participant map keyed by canonical human ID when available and by `platform:sender_id` otherwise. Prompt rendering reads from that tracked participant set, matches known humans against configured `HumanDef` entries, and renders the available profile fields (`display_name`, `role`, `description` / `bio`) inline. There is still no dedicated `humans` table or cortex-generated participant summary cache on the current code path yet. -If user-scoped memories lands first, the `humans` table merges with `user_identifiers` as described in that design doc. The working memory system does not depend on which identity table is canonical. +If user-scoped memories or the full participant-awareness pipeline lands later, this layer can switch to that richer source without changing the prompt shape. The working memory system does not depend on which identity store is canonical. ### Enhancement: Recent Activity Per User @@ -370,9 +370,9 @@ The participant summary (2-3 sentences about who this person is) is augmented wi Recent: asked about Docker runtime deps in #talk-to-spacebot 10m ago. ``` -The "Recent:" line is programmatic -- a query against `working_memory_events WHERE user_id = X ORDER BY timestamp DESC LIMIT 3`. The summary paragraph is the cached cortex-generated profile from the participant-awareness design. No additional LLM call. +The "Recent:" line is programmatic -- a query against `working_memory_events WHERE user_id = X ORDER BY timestamp DESC LIMIT 3`. The profile paragraph currently comes from configured human metadata when available. No additional LLM call. -**Token budget:** Configurable, default 400 tokens. Max 5 participants rendered. In a 50-person channel, only the 5 most recently active participants get profiles. +**Token budget:** Configurable via the dedicated participant-context config, default 400 tokens. Max 5 participants rendered. In a 50-person channel, only the 5 most recently active participants get profiles. --- diff --git a/prompts/en/channel.md.j2 b/prompts/en/channel.md.j2 index 4908de4de..148bf86a9 100644 --- a/prompts/en/channel.md.j2 +++ b/prompts/en/channel.md.j2 @@ -159,6 +159,18 @@ When in doubt, skip. Being a lurker who speaks when it matters is better than be {{ project_context }} {%- endif %} +{%- if working_memory %} +{{ working_memory }} +{%- endif %} + +{%- if channel_activity_map %} +{{ channel_activity_map }} +{%- endif %} + +{%- if participant_context %} +{{ participant_context }} +{%- endif %} + {%- if knowledge_synthesis %} ## Knowledge Context @@ -171,14 +183,6 @@ When in doubt, skip. Being a lurker who speaks when it matters is better than be {{ memory_bulletin }} {%- endif %} -{%- if working_memory %} -{{ working_memory }} -{%- endif %} - -{%- if channel_activity_map %} -{{ channel_activity_map }} -{%- endif %} - {%- if conversation_context %} ## Conversation Context diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 637d959eb..f1a3487cd 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -18,7 +18,11 @@ use crate::agent::worker::Worker; use crate::conversation::settings::{ DelegationMode, MemoryMode, ResolvedConversationSettings, ResponseMode, }; -use crate::conversation::{ChannelStore, ConversationLogger, ProcessRunLogger}; +use crate::conversation::{ + ActiveParticipant, ChannelStore, ConversationLogger, ProcessRunLogger, + participant_display_name, participant_memory_key, renderable_participants, + track_active_participant, +}; use crate::error::{AgentError, Result}; use crate::hooks::SpacebotHook; use crate::llm::SpacebotModel; @@ -82,6 +86,72 @@ fn should_flush_coalesce_buffer_for_event(event: &ProcessEvent) -> bool { ) } +fn extract_decision_summary_from_reply(reply_text: &str) -> Option { + let normalized = reply_text.split_whitespace().collect::>().join(" "); + let trimmed = normalized.trim(); + if trimmed.is_empty() { + return None; + } + + let lower = trimmed.to_ascii_lowercase(); + let explicit_markers = [ + "we decided to ", + "i decided to ", + "decision:", + "the decision is ", + "approved: ", + "approved to ", + "moving forward with ", + "move forward with ", + "going with ", + "switching to ", + "we will use ", + "i will use ", + "we'll use ", + "i'll use ", + "we will switch to ", + "i will switch to ", + "we'll switch to ", + "i'll switch to ", + "we will proceed with ", + "i will proceed with ", + "we'll proceed with ", + "i'll proceed with ", + ]; + let has_explicit_marker = explicit_markers.iter().any(|marker| lower.contains(marker)); + let has_change_comparison = lower.contains(" instead of ") + && [ + "use ", + "switch", + "adopt ", + "choose ", + "pick ", + "go with ", + "proceed with ", + ] + .iter() + .any(|marker| lower.contains(marker)); + + if !has_explicit_marker && !has_change_comparison { + return None; + } + + let mut summary = trimmed + .split_terminator(['.', '!', '?', '\n']) + .find(|sentence| !sentence.trim().is_empty()) + .unwrap_or(trimmed) + .trim() + .to_string(); + + if summary.len() > 200 { + let boundary = summary.floor_char_boundary(200); + summary.truncate(boundary); + summary.push_str("..."); + } + + Some(summary) +} + /// Shared state that channel tools need to act on the channel. /// /// Wrapped in Arc and passed to tools (branch, spawn_worker, route, cancel) @@ -133,6 +203,8 @@ pub struct ChannelState { /// Resolved model overrides from conversation settings. /// Used by branches, workers, and compactor to resolve their model. pub model_overrides: Arc, + /// Active participants seen during the current channel session. + pub active_participants: Arc>>, } impl ChannelState { @@ -552,6 +624,7 @@ impl Channel { resolved_settings.worker_context.clone(), )), model_overrides: Arc::new(resolved_settings.clone()), + active_participants: Arc::new(RwLock::new(HashMap::new())), }; // Each channel gets its own isolated tool server to avoid races between @@ -744,11 +817,7 @@ impl Channel { if message.source == "system" { return; } - let sender_name = message - .metadata - .get("sender_display_name") - .and_then(|v| v.as_str()) - .unwrap_or(&message.sender_id); + let sender_name = participant_display_name(message); // If attachments were saved, enrich the metadata with their info let metadata = if let Some(saved) = saved_attachments { @@ -763,7 +832,7 @@ impl Channel { self.state.conversation_logger.log_user_message( &self.state.channel_id, - sender_name, + &sender_name, &message.sender_id, raw_text, &metadata, @@ -777,6 +846,16 @@ impl Channel { matches!(self.current_adapter(), Some("email")) } + async fn track_participant_from_message(&self, message: &InboundMessage) { + if message.source == "system" { + return; + } + + let humans = self.deps.humans.load(); + let mut participants = self.state.active_participants.write().await; + track_active_participant(&mut participants, humans.as_ref(), message); + } + /// Return a handle that allows external supervision to cancel this channel's /// workers and branches without direct access to Channel internals. pub fn control_handle(&self) -> ChannelControlHandle { @@ -1326,11 +1405,7 @@ impl Channel { for message in &messages { if message.source != "system" { - let sender_name = message - .metadata - .get("sender_display_name") - .and_then(|v| v.as_str()) - .unwrap_or(&message.sender_id); + let sender_name = participant_display_name(message); let (raw_text, attachments) = match &message.content { crate::MessageContent::Text(text) => (text.clone(), Vec::new()), @@ -1380,7 +1455,7 @@ impl Channel { self.state.conversation_logger.log_user_message( &self.state.channel_id, - sender_name, + &sender_name, &message.sender_id, &raw_text, &metadata, @@ -1388,6 +1463,7 @@ impl Channel { self.state .channel_store .upsert(&message.conversation_id, &metadata); + self.track_participant_from_message(message).await; conversation_id = message.conversation_id.clone(); @@ -1511,7 +1587,7 @@ impl Channel { } // Run agent turn with any image/audio attachments preserved - let (result, skip_flag, replied_flag, _) = self + let (result, skip_flag, replied_flag, _, _) = self .run_agent_turn( &combined_text, &system_prompt, @@ -1547,7 +1623,6 @@ impl Channel { let prompt_engine = rc.prompts.load(); let identity_context = rc.identity.load().render(); - let memory_bulletin = rc.memory_bulletin.load(); let skills = rc.skills.load(); let skills_prompt = skills.render_channel_prompt(&prompt_engine)?; @@ -1589,53 +1664,13 @@ impl Channel { let project_context = self.build_project_context(&prompt_engine).await; - // Only inject memory context if not in Off mode (same guard as build_system_prompt). - let (working_memory, memory_bulletin_text) = if matches!( - self.resolved_settings.memory, - MemoryMode::Off - ) { - (String::new(), None) - } else { - let wm_config = **rc.working_memory.load(); - let timezone = self.deps.working_memory.timezone(); - let wm = match crate::memory::working::render_working_memory( - &self.deps.working_memory, - self.id.as_ref(), - &wm_config, - timezone, - ) - .await - { - Ok(text) => text, - Err(error) => { - tracing::warn!(channel_id = %self.id, %error, "working memory render failed"); - String::new() - } - }; - (wm, Some(memory_bulletin.to_string())) - }; - - let channel_activity_map = if matches!(self.resolved_settings.memory, MemoryMode::Off) { - String::new() - } else { - let wm_config = **rc.working_memory.load(); - let timezone = self.deps.working_memory.timezone(); - match crate::memory::working::render_channel_activity_map( - &self.deps.sqlite_pool, - &self.deps.working_memory, - self.id.as_ref(), - &wm_config, - timezone, - ) - .await - { - Ok(text) => text, - Err(error) => { - tracing::warn!(channel_id = %self.id, %error, "channel activity map render failed"); - String::new() - } - } - }; + let ( + working_memory, + channel_activity_map, + participant_context, + memory_bulletin_text, + knowledge_synthesis_text, + ) = self.render_memory_layers().await; let routing = rc.routing.load(); let model_name = routing.resolve(ProcessType::Channel, None).to_string(); @@ -1644,6 +1679,7 @@ impl Channel { let system_prompt = prompt_engine.render_channel_prompt_with_links( empty_to_none(identity_context), memory_bulletin_text, + knowledge_synthesis_text, empty_to_none(skills_prompt), worker_capabilities, self.conversation_context.clone(), @@ -1657,6 +1693,7 @@ impl Channel { self.backfill_transcript.clone(), empty_to_none(working_memory), empty_to_none(channel_activity_map), + empty_to_none(participant_context), )?; prompt_engine.maybe_append_tool_use_enforcement( @@ -1762,6 +1799,7 @@ impl Channel { .map(|data| data.iter().map(|(meta, _)| meta.clone()).collect()); self.persist_inbound_user_message(&message, &raw_text, saved_metas.as_deref()); + self.track_participant_from_message(&message).await; // Deterministic built-in command: bypass model output drift for agent identity checks. if message.source != "system" && raw_text.trim() == "/agent-id" { @@ -1902,7 +1940,7 @@ impl Channel { .adapter .as_deref() .or_else(|| self.current_adapter()); - let (result, skip_flag, replied_flag, retrigger_reply_preserved) = self + let (result, skip_flag, replied_flag, retrigger_reply_preserved, reply_text) = self .run_agent_turn( &user_text, &system_prompt, @@ -1916,6 +1954,36 @@ impl Channel { self.handle_agent_result(result, &skip_flag, &replied_flag, is_retrigger) .await; + if replied_flag.load(std::sync::atomic::Ordering::Relaxed) + && let Some(decision_summary) = reply_text + .as_deref() + .and_then(extract_decision_summary_from_reply) + { + let user_id = if message.source.trim().is_empty() || message.sender_id.is_empty() { + None + } else { + let humans = self.deps.humans.load(); + Some(participant_memory_key( + humans.as_ref(), + message.source.trim(), + &message.sender_id, + )) + }; + let mut event = self + .deps + .working_memory + .emit( + crate::memory::WorkingMemoryEventType::Decision, + decision_summary, + ) + .channel(self.id.as_ref()) + .importance(0.8); + if let Some(user_id) = user_id { + event = event.user(user_id); + } + event.record(); + } + // Safety-net: in quiet mode, explicit mention/reply should never be dropped silently. if should_send_quiet_mode_fallback( &message, @@ -2145,6 +2213,79 @@ impl Channel { prompt_engine.render_org_context(org_context).ok() } + async fn render_memory_layers( + &self, + ) -> (String, String, String, Option, Option) { + if matches!(self.resolved_settings.memory, MemoryMode::Off) { + return (String::new(), String::new(), String::new(), None, None); + } + + let rc = &self.deps.runtime_config; + let memory_bulletin_text = Some(rc.memory_bulletin.load().to_string()); + let knowledge_synthesis_text = Some(rc.knowledge_synthesis.load().to_string()); + let wm_config = **rc.working_memory.load(); + let timezone = self.deps.working_memory.timezone(); + + let working_memory = match crate::memory::working::render_working_memory( + &self.deps.working_memory, + self.id.as_ref(), + &wm_config, + timezone, + ) + .await + { + Ok(text) => text, + Err(error) => { + tracing::warn!(channel_id = %self.id, %error, "working memory render failed"); + String::new() + } + }; + + let channel_activity_map = match crate::memory::working::render_channel_activity_map( + &self.deps.sqlite_pool, + &self.deps.working_memory, + self.id.as_ref(), + &wm_config, + timezone, + ) + .await + { + Ok(text) => text, + Err(error) => { + tracing::warn!(channel_id = %self.id, %error, "channel activity map render failed"); + String::new() + } + }; + + let participant_config = **rc.participant_context.load(); + let tracked_participants = { + let participants = self.state.active_participants.read().await; + renderable_participants(&participants, &participant_config) + }; + let participant_context = match crate::memory::working::render_participant_context( + &self.deps.working_memory, + &tracked_participants, + self.id.as_ref(), + &participant_config, + ) + .await + { + Ok(text) => text, + Err(error) => { + tracing::warn!(channel_id = %self.id, %error, "participant context render failed"); + String::new() + } + }; + + ( + working_memory, + channel_activity_map, + participant_context, + memory_bulletin_text, + knowledge_synthesis_text, + ) + } + /// Build pre-rendered project context for prompt injection. /// /// Fetches all active projects with their repos and worktrees, converts them @@ -2267,7 +2408,6 @@ impl Channel { let prompt_engine = rc.prompts.load(); let identity_context = rc.identity.load().render(); - let memory_bulletin = rc.memory_bulletin.load(); let skills = rc.skills.load(); let skills_prompt = skills.render_channel_prompt(&prompt_engine)?; @@ -2301,59 +2441,13 @@ impl Channel { let project_context = self.build_project_context(&prompt_engine).await; - // Only inject memory context if not in Off mode - let (working_memory, channel_activity_map, memory_bulletin_text) = if matches!( - self.resolved_settings.memory, - MemoryMode::Off - ) { - (String::new(), String::new(), None) - } else { - // Render working memory layers (Layers 2 + 3). - let wm_config = **rc.working_memory.load(); - let timezone = self.deps.working_memory.timezone(); - let working_memory = match crate::memory::working::render_working_memory( - &self.deps.working_memory, - self.id.as_ref(), - &wm_config, - timezone, - ) - .await - { - Ok(text) => { - if text.is_empty() { - tracing::debug!(channel_id = %self.id, "working memory rendered empty (disabled?)"); - } else { - tracing::debug!(channel_id = %self.id, len = text.len(), "working memory rendered"); - } - text - } - Err(error) => { - tracing::warn!(channel_id = %self.id, %error, "working memory render failed"); - String::new() - } - }; - - let channel_activity_map = match crate::memory::working::render_channel_activity_map( - &self.deps.sqlite_pool, - &self.deps.working_memory, - self.id.as_ref(), - &wm_config, - timezone, - ) - .await - { - Ok(text) => text, - Err(error) => { - tracing::warn!(channel_id = %self.id, %error, "channel activity map render failed"); - String::new() - } - }; - - // In Ambient mode, we still show memory but don't trigger persistence - let memory_bulletin_text = Some(memory_bulletin.to_string()); - - (working_memory, channel_activity_map, memory_bulletin_text) - }; + let ( + working_memory, + channel_activity_map, + participant_context, + memory_bulletin_text, + knowledge_synthesis_text, + ) = self.render_memory_layers().await; let empty_to_none = |s: String| if s.is_empty() { None } else { Some(s) }; let routing = rc.routing.load(); @@ -2363,6 +2457,7 @@ impl Channel { let system_prompt = prompt_engine.render_channel_prompt_with_links( empty_to_none(identity_context), memory_bulletin_text, + knowledge_synthesis_text, empty_to_none(skills_prompt), worker_capabilities, self.conversation_context.clone(), @@ -2376,6 +2471,7 @@ impl Channel { self.backfill_transcript.clone(), empty_to_none(working_memory), empty_to_none(channel_activity_map), + empty_to_none(participant_context), )?; prompt_engine.maybe_append_tool_use_enforcement( @@ -2403,6 +2499,7 @@ impl Channel { crate::tools::SkipFlag, crate::tools::RepliedFlag, bool, + Option, )> { let skip_flag = crate::tools::new_skip_flag(); let replied_flag = crate::tools::new_replied_flag(); @@ -2583,7 +2680,7 @@ impl Channel { .await; } - let retrigger_reply_preserved = { + let applied_history = { let mut guard = self.state.history.write().await; apply_history_after_turn( &result, @@ -2601,7 +2698,13 @@ impl Channel { tracing::warn!(%error, "failed to remove channel tools"); } - Ok((result, skip_flag, replied_flag, retrigger_reply_preserved)) + Ok(( + result, + skip_flag, + replied_flag, + applied_history.retrigger_reply_preserved, + applied_history.reply_text, + )) } /// Send outbound text and record send metrics. @@ -3636,9 +3739,10 @@ fn is_dm_conversation_id(conv_id: &str) -> bool { #[cfg(test)] mod tests { use super::{ - QuietModeFallbackState, compute_listen_mode_invocation, is_dm_conversation_id, - recv_channel_event, should_process_event_for_channel, - should_send_discord_quiet_mode_ping_ack, should_send_quiet_mode_fallback, + QuietModeFallbackState, compute_listen_mode_invocation, + extract_decision_summary_from_reply, is_dm_conversation_id, recv_channel_event, + should_process_event_for_channel, should_send_discord_quiet_mode_ping_ack, + should_send_quiet_mode_fallback, }; use crate::memory::MemoryType; use crate::{AgentId, ChannelId, InboundMessage, MessageContent, ProcessEvent, ProcessId}; @@ -3711,6 +3815,44 @@ mod tests { assert!(matches!(event, crate::BroadcastRecvResult::Closed)); } + #[test] + fn extracts_decision_summary_from_reply_text() { + let summary = extract_decision_summary_from_reply( + "We'll switch to the new persistence trigger thresholds and remove the old 50-message cadence.", + ); + + assert_eq!( + summary.as_deref(), + Some( + "We'll switch to the new persistence trigger thresholds and remove the old 50-message cadence" + ) + ); + assert_eq!( + extract_decision_summary_from_reply( + "We decided to use the participant map instead of transcript scans." + ) + .as_deref(), + Some("We decided to use the participant map instead of transcript scans") + ); + assert_eq!( + extract_decision_summary_from_reply( + "Decision: move forward with the config-backed participant resolver." + ) + .as_deref(), + Some("Decision: move forward with the config-backed participant resolver") + ); + assert!(extract_decision_summary_from_reply("Here's the current status update.").is_none()); + assert!(extract_decision_summary_from_reply("I'll check that and report back.").is_none()); + assert!(extract_decision_summary_from_reply("Let's debug this first.").is_none()); + assert!(extract_decision_summary_from_reply("We'll look into it tomorrow.").is_none()); + assert!( + extract_decision_summary_from_reply( + "I approved the review comment and will follow up." + ) + .is_none() + ); + } + #[test] fn channel_coalesce_ignores_unrelated_memory_saved_events() { let channel_id: ChannelId = Arc::from("channel-a"); diff --git a/src/agent/channel_history.rs b/src/agent/channel_history.rs index 195baa60a..537ca5c58 100644 --- a/src/agent/channel_history.rs +++ b/src/agent/channel_history.rs @@ -33,11 +33,11 @@ pub(crate) fn apply_history_after_turn( history_len_before: usize, channel_id: &str, is_retrigger: bool, -) -> bool { +) -> AppliedHistory { match result { Ok(_) | Err(rig::completion::PromptError::MaxTurnsError { .. }) => { *guard = history; - false + AppliedHistory::default() } Err(rig::completion::PromptError::PromptCancelled { .. }) => { let new_messages = &history[history_len_before..]; @@ -56,7 +56,7 @@ pub(crate) fn apply_history_after_turn( guard.push(rig::message::Message::Assistant { id: None, content: rig::OneOrMany::one(rig::message::AssistantContent::text( - reply_content, + reply_content.clone(), )), }); @@ -66,7 +66,10 @@ pub(crate) fn apply_history_after_turn( replaced_bridge, "preserved retrigger assistant reply after PromptCancelled" ); - return true; + return AppliedHistory { + retrigger_reply_preserved: true, + reply_text: Some(reply_content), + }; } tracing::debug!( @@ -75,7 +78,7 @@ pub(crate) fn apply_history_after_turn( replaced_bridge, "discarding retrigger PromptCancelled messages (no reply content found)" ); - return false; + return AppliedHistory::default(); } // For regular turns we preserve: @@ -101,10 +104,23 @@ pub(crate) fn apply_history_after_turn( guard.push(rig::message::Message::Assistant { id: None, content: rig::OneOrMany::one(rig::message::AssistantContent::text( - reply_content, + reply_content.clone(), )), }); preserved += 1; + + tracing::debug!( + channel_id = %channel_id, + total_new = new_messages.len(), + preserved, + discarded = new_messages.len() - preserved, + "preserved user message and assistant reply after PromptCancelled" + ); + + return AppliedHistory { + retrigger_reply_preserved: false, + reply_text: Some(reply_content), + }; } tracing::debug!( @@ -115,7 +131,7 @@ pub(crate) fn apply_history_after_turn( "preserved user message and assistant reply after PromptCancelled" ); - false + AppliedHistory::default() } Err(_) => { // Hard errors: history state is unpredictable, truncate to snapshot. @@ -125,11 +141,17 @@ pub(crate) fn apply_history_after_turn( "rolling back history after failed turn" ); guard.truncate(history_len_before); - false + AppliedHistory::default() } } } +#[derive(Debug, Default)] +pub(crate) struct AppliedHistory { + pub retrigger_reply_preserved: bool, + pub reply_text: Option, +} + pub(crate) fn pop_retrigger_bridge_message(history: &mut Vec) -> bool { if history.last().is_some_and(is_retrigger_bridge_message) { history.pop(); @@ -664,7 +686,7 @@ mod tests { let mut expected = initial; expected.push(assistant_msg("Relayed branch result to user.")); assert!( - preserved, + preserved.retrigger_reply_preserved, "retrigger PromptCancelled should report reply preservation" ); assert_eq!( @@ -697,7 +719,7 @@ mod tests { apply_history_after_turn(&err, &mut guard, history, len_before, "test", true); assert!( - !preserved, + !preserved.retrigger_reply_preserved, "retrigger PromptCancelled should report no reply preservation" ); assert_eq!( diff --git a/src/api/channels.rs b/src/api/channels.rs index 0236e5628..b00fda780 100644 --- a/src/api/channels.rs +++ b/src/api/channels.rs @@ -523,6 +523,7 @@ pub(super) async fn inspect_prompt( // ── Gather all dynamic sections ── let identity_context = rc.identity.load().render(); let memory_bulletin = rc.memory_bulletin.load(); + let knowledge_synthesis = rc.knowledge_synthesis.load(); let skills = rc.skills.load(); let skills_prompt = skills .render_channel_prompt(&prompt_engine) @@ -575,6 +576,7 @@ pub(super) async fn inspect_prompt( }; let sandbox_enabled = channel_state.deps.sandbox.containment_active(); + let adapter = query.channel_id.split(':').next().filter(|a| !a.is_empty()); // ── Render working memory layers (Layers 2 + 3) ── let wm_config = **rc.working_memory.load(); @@ -598,6 +600,20 @@ pub(super) async fn inspect_prompt( .await .unwrap_or_default(); + let participant_config = **rc.participant_context.load(); + let tracked_participants = { + let participants = channel_state.active_participants.read().await; + crate::conversation::renderable_participants(&participants, &participant_config) + }; + let participant_context = crate::memory::working::render_participant_context( + &channel_state.deps.working_memory, + &tracked_participants, + &query.channel_id, + &participant_config, + ) + .await + .unwrap_or_default(); + // ── Available channels ── let available_channels = { let channels = channel_state @@ -699,7 +715,6 @@ pub(super) async fn inspect_prompt( }; // ── Adapter prompt ── - let adapter = query.channel_id.split(':').next().filter(|a| !a.is_empty()); let adapter_prompt = adapter.and_then(|adapter| prompt_engine.render_channel_adapter_prompt(adapter)); @@ -767,6 +782,7 @@ pub(super) async fn inspect_prompt( .render_channel_prompt_with_links( empty_to_none(identity_context), empty_to_none(memory_bulletin.to_string()), + empty_to_none(knowledge_synthesis.to_string()), empty_to_none(skills_prompt), worker_capabilities, conversation_context, @@ -780,6 +796,7 @@ pub(super) async fn inspect_prompt( None, // backfill_transcript — only set during channel initialization empty_to_none(working_memory), empty_to_none(channel_activity_map), + empty_to_none(participant_context), ) .unwrap_or_default(); diff --git a/src/config.rs b/src/config.rs index eb0891a9a..4cfb49a04 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1137,6 +1137,27 @@ maintenance_merge_similarity_threshold = 1.1 ); } + #[test] + fn test_participant_context_defaults_and_overrides_resolution() { + let toml = r#" +[defaults.participant_context] +enabled = false +min_participants = 2 +token_budget = 280 +max_participants = 3 + +[[agents]] +id = "main" +"#; + let parsed: TomlConfig = toml::from_str(toml).expect("failed to parse test TOML"); + let config = Config::from_toml(parsed, PathBuf::from(".")).expect("failed to build Config"); + + assert!(!config.defaults.participant_context.enabled); + assert_eq!(config.defaults.participant_context.min_participants, 2); + assert_eq!(config.defaults.participant_context.token_budget, 280); + assert_eq!(config.defaults.participant_context.max_participants, 3); + } + #[test] fn test_work_readiness_requires_warm_state() { let readiness = evaluate_work_readiness( diff --git a/src/config/load.rs b/src/config/load.rs index e0bbf0b13..e0a319a32 100644 --- a/src/config/load.rs +++ b/src/config/load.rs @@ -15,11 +15,11 @@ use super::{ CoalesceConfig, CompactionConfig, Config, CortexConfig, CronDef, DefaultsConfig, DiscordConfig, DiscordInstanceConfig, EmailConfig, EmailInstanceConfig, GroupDef, HumanDef, IngestionConfig, LinkDef, LlmConfig, MattermostConfig, MattermostInstanceConfig, McpServerConfig, McpTransport, - MemoryPersistenceConfig, MessagingConfig, MetricsConfig, OpenCodeConfig, ProjectsConfig, - ProviderConfig, SignalConfig, SignalInstanceConfig, SlackCommandConfig, SlackConfig, - SlackInstanceConfig, TelegramConfig, TelegramInstanceConfig, TelemetryConfig, TwitchConfig, - TwitchInstanceConfig, WarmupConfig, WebhookConfig, normalize_adapter, - validate_named_messaging_adapters, + MemoryPersistenceConfig, MessagingConfig, MetricsConfig, OpenCodeConfig, + ParticipantContextConfig, ProjectsConfig, ProviderConfig, SignalConfig, SignalInstanceConfig, + SlackCommandConfig, SlackConfig, SlackInstanceConfig, TelegramConfig, TelegramInstanceConfig, + TelemetryConfig, TwitchConfig, TwitchInstanceConfig, WarmupConfig, WebhookConfig, + normalize_adapter, validate_named_messaging_adapters, }; use crate::error::{ConfigError, Result}; @@ -1611,6 +1611,24 @@ impl Config { .unwrap_or(base_defaults.warmup.startup_delay_secs), }) .unwrap_or(base_defaults.warmup), + participant_context: toml + .defaults + .participant_context + .map(|participant_context| ParticipantContextConfig { + enabled: participant_context + .enabled + .unwrap_or(base_defaults.participant_context.enabled), + min_participants: participant_context + .min_participants + .unwrap_or(base_defaults.participant_context.min_participants), + token_budget: participant_context + .token_budget + .unwrap_or(base_defaults.participant_context.token_budget), + max_participants: participant_context + .max_participants + .unwrap_or(base_defaults.participant_context.max_participants), + }) + .unwrap_or(base_defaults.participant_context), browser: { let chrome_cache_dir = instance_dir.join("chrome_cache"); toml.defaults diff --git a/src/config/runtime.rs b/src/config/runtime.rs index 065b14d1a..229b5a8bc 100644 --- a/src/config/runtime.rs +++ b/src/config/runtime.rs @@ -89,6 +89,8 @@ pub struct RuntimeConfig { pub projects: ArcSwap, /// Working memory configuration for temporal context injection. pub working_memory: ArcSwap, + /// Participant context configuration for prompt-time participant awareness. + pub participant_context: ArcSwap, /// Shared browser state for persistent sessions. /// /// When `browser.persist_session = true`, all workers share this handle so @@ -160,6 +162,7 @@ impl RuntimeConfig { working_memory: ArcSwap::from_pointee( crate::config::types::WorkingMemoryConfig::default(), ), + participant_context: ArcSwap::from_pointee(defaults.participant_context), shared_browser: if agent_config.browser.persist_session { Some(crate::tools::browser::new_shared_browser_handle()) } else { @@ -277,6 +280,8 @@ impl RuntimeConfig { self.user_timezone.store(Arc::new(resolved.user_timezone)); self.cortex.store(Arc::new(resolved.cortex)); self.warmup.store(Arc::new(resolved.warmup)); + self.participant_context + .store(Arc::new(config.defaults.participant_context)); // Preserve project_paths from the current sandbox config when // reloading — the resolved config only has user-configured paths. let existing_project_paths = self.sandbox.load().project_paths.clone(); diff --git a/src/config/toml_schema.rs b/src/config/toml_schema.rs index 5ab153d86..839fa7fe0 100644 --- a/src/config/toml_schema.rs +++ b/src/config/toml_schema.rs @@ -291,6 +291,7 @@ pub(super) struct TomlDefaultsConfig { pub(super) ingestion: Option, pub(super) cortex: Option, pub(super) warmup: Option, + pub(super) participant_context: Option, pub(super) browser: Option, pub(super) channel: Option, #[serde(default)] @@ -303,6 +304,14 @@ pub(super) struct TomlDefaultsConfig { pub(super) projects: Option, } +#[derive(Deserialize)] +pub(super) struct TomlParticipantContextConfig { + pub(super) enabled: Option, + pub(super) min_participants: Option, + pub(super) token_budget: Option, + pub(super) max_participants: Option, +} + #[derive(Deserialize, Default)] pub(super) struct TomlRoutingConfig { pub(super) channel: Option, diff --git a/src/config/types.rs b/src/config/types.rs index 9bd9073e7..2980ddb85 100644 --- a/src/config/types.rs +++ b/src/config/types.rs @@ -617,6 +617,7 @@ pub struct DefaultsConfig { pub ingestion: IngestionConfig, pub cortex: CortexConfig, pub warmup: WarmupConfig, + pub participant_context: ParticipantContextConfig, pub browser: BrowserConfig, pub channel: ChannelConfig, pub mcp: Vec, @@ -653,6 +654,7 @@ impl std::fmt::Debug for DefaultsConfig { .field("ingestion", &self.ingestion) .field("cortex", &self.cortex) .field("warmup", &self.warmup) + .field("participant_context", &self.participant_context) .field("browser", &self.browser) .field("channel", &self.channel) .field("mcp", &self.mcp) @@ -804,6 +806,38 @@ impl Default for WorkingMemoryConfig { } } +/// Participant context configuration. +/// +/// Keeps the prompt-facing participant-awareness surface separate from working +/// memory so the future humans/user-identity pipeline can evolve behind a +/// stable boundary. +#[derive(Debug, Clone, Copy)] +pub struct ParticipantContextConfig { + /// Whether participant context injection is enabled. + pub enabled: bool, + /// Minimum active participants required before the section appears. + /// + /// Defaults to 1 for the current config-backed implementation so DMs still + /// benefit from participant metadata. The fuller participant-awareness + /// pipeline can raise this later if needed. + pub min_participants: usize, + /// Token budget for the participant context section. + pub token_budget: usize, + /// Maximum participants to render in the prompt. + pub max_participants: usize, +} + +impl Default for ParticipantContextConfig { + fn default() -> Self { + Self { + enabled: true, + min_participants: 1, + token_budget: 400, + max_participants: 5, + } + } +} + impl Default for CompactionConfig { fn default() -> Self { Self { @@ -1374,6 +1408,7 @@ impl Default for DefaultsConfig { ingestion: IngestionConfig::default(), cortex: CortexConfig::default(), warmup: WarmupConfig::default(), + participant_context: ParticipantContextConfig::default(), browser: BrowserConfig::default(), channel: ChannelConfig::default(), mcp: Vec::new(), diff --git a/src/conversation.rs b/src/conversation.rs index 1665391f2..0f11454b9 100644 --- a/src/conversation.rs +++ b/src/conversation.rs @@ -4,6 +4,7 @@ pub mod channel_settings; pub mod channels; pub mod context; pub mod history; +pub mod participants; pub mod portal; pub mod settings; pub mod worker_transcript; @@ -13,6 +14,10 @@ pub use channels::ChannelStore; pub use history::{ ConversationLogger, ProcessRunLogger, TimelineItem, WorkerDetailRow, WorkerRunRow, }; +pub use participants::{ + ActiveParticipant, participant_display_name, participant_memory_key, renderable_participants, + track_active_participant, +}; pub use portal::{PortalConversation, PortalConversationStore, PortalConversationSummary}; pub use settings::{ ConversationDefaultsResponse, ConversationSettings, DelegationMode, MemoryMode, ModelOption, diff --git a/src/conversation/participants.rs b/src/conversation/participants.rs new file mode 100644 index 000000000..f51634ac9 --- /dev/null +++ b/src/conversation/participants.rs @@ -0,0 +1,288 @@ +use crate::InboundMessage; +use crate::config::{HumanDef, ParticipantContextConfig}; + +use chrono::{DateTime, Utc}; + +use std::collections::HashMap; + +/// Active participant state tracked for a live channel session. +/// +/// This is the bridge between today's lightweight config-backed participant +/// context and the future DB-backed participant-awareness pipeline. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ActiveParticipant { + /// Canonical participant key for the current implementation. + /// + /// Uses the configured human ID when available. Otherwise falls back to a + /// platform-scoped sender key so the future humans/user-identifiers store + /// can replace the backing source without changing channel state shape. + pub participant_key: String, + /// Raw platform source ("discord", "slack", etc). + pub platform: String, + /// Raw sender ID from the inbound message. + pub sender_id: String, + /// Best display name available for prompt rendering. + pub display_name: String, + /// Optional org/config role. + pub role: Option, + /// Optional lightweight profile summary from configured human metadata. + pub profile_summary: Option, + /// Most recent time this participant spoke in the current channel session. + pub last_message_at: DateTime, +} + +/// Resolve the working-memory identity key for a sender. +/// +/// Uses the configured human ID when available; otherwise falls back to a +/// platform-scoped sender key to avoid cross-adapter collisions. +pub fn participant_memory_key(humans: &[HumanDef], platform: &str, sender_id: &str) -> String { + find_human_for_sender(humans, platform, sender_id) + .map(|entry| entry.id.clone()) + .unwrap_or_else(|| format!("{platform}:{sender_id}")) +} + +/// Resolve and record a participant seen in an inbound message. +pub fn track_active_participant( + participants: &mut HashMap, + humans: &[HumanDef], + message: &InboundMessage, +) { + if message.source == "system" || message.sender_id.trim().is_empty() { + return; + } + + let platform = message.source.trim(); + if platform.is_empty() { + return; + } + + let human = find_human_for_sender(humans, platform, &message.sender_id); + let display_name = human + .and_then(|entry| entry.display_name.as_deref()) + .map(str::trim) + .filter(|name| !name.is_empty()) + .map(ToOwned::to_owned) + .unwrap_or_else(|| participant_display_name(message)); + let participant_key = participant_memory_key(humans, platform, &message.sender_id); + let fallback_key = format!("{platform}:{}", message.sender_id); + + participants.retain(|existing_key, participant| { + (existing_key == &participant_key) + || participant.platform != platform + || participant.sender_id != message.sender_id + }); + if participant_key != fallback_key { + participants.remove(&fallback_key); + } + + participants.insert( + participant_key.clone(), + ActiveParticipant { + participant_key, + platform: platform.to_string(), + sender_id: message.sender_id.clone(), + display_name, + role: human.and_then(|entry| entry.role.clone()), + profile_summary: human_profile_summary(human), + last_message_at: message.timestamp, + }, + ); +} + +/// Return participants in prompt-render order. +pub fn renderable_participants( + participants: &HashMap, + config: &ParticipantContextConfig, +) -> Vec { + if !config.enabled || participants.len() < config.min_participants { + return Vec::new(); + } + + let mut entries: Vec = participants.values().cloned().collect(); + entries.sort_by(|left, right| { + right + .last_message_at + .cmp(&left.last_message_at) + .then_with(|| left.display_name.cmp(&right.display_name)) + }); + entries.truncate(config.max_participants); + entries +} + +/// Best-effort display name for an inbound message sender. +pub fn participant_display_name(message: &InboundMessage) -> String { + message + .metadata + .get("sender_display_name") + .and_then(|value| value.as_str()) + .map(str::trim) + .filter(|name| !name.is_empty()) + .map(ToOwned::to_owned) + .or_else(|| { + message + .formatted_author + .as_deref() + .map(str::trim) + .filter(|name| !name.is_empty()) + .map(ToOwned::to_owned) + }) + .unwrap_or_else(|| message.sender_id.clone()) +} + +fn find_human_for_sender<'a>( + humans: &'a [HumanDef], + platform: &str, + sender_id: &str, +) -> Option<&'a HumanDef> { + humans.iter().find(|human| { + human.id == sender_id + || human.email.as_deref() == Some(sender_id) + || match platform { + "discord" => human.discord_id.as_deref() == Some(sender_id), + "telegram" => human.telegram_id.as_deref() == Some(sender_id), + "slack" => human.slack_id.as_deref() == Some(sender_id), + _ => false, + } + }) +} + +fn human_profile_summary(human: Option<&HumanDef>) -> Option { + human + .and_then(|entry| entry.description.as_deref().or(entry.bio.as_deref())) + .map(str::trim) + .filter(|text| !text.is_empty()) + .map(ToOwned::to_owned) +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::{InboundMessage, MessageContent}; + + use chrono::TimeZone; + + fn test_message() -> InboundMessage { + let mut metadata = std::collections::HashMap::new(); + metadata.insert( + "sender_display_name".to_string(), + serde_json::Value::String("Victor".to_string()), + ); + + InboundMessage { + id: "message-1".to_string(), + source: "discord".to_string(), + adapter: None, + conversation_id: "discord:chan-1".to_string(), + sender_id: "12345".to_string(), + agent_id: None, + content: MessageContent::Text("hello".to_string()), + timestamp: Utc.with_ymd_and_hms(2026, 3, 31, 12, 0, 0).unwrap(), + metadata, + formatted_author: None, + } + } + + #[test] + fn tracks_known_human_with_canonical_id() { + let mut participants = HashMap::new(); + let message = test_message(); + let humans = vec![HumanDef { + id: "victor".to_string(), + display_name: Some("Victor".to_string()), + role: Some("Maintainer".to_string()), + bio: Some("Prefers direct replies.".to_string()), + description: None, + discord_id: Some("12345".to_string()), + telegram_id: None, + slack_id: None, + email: None, + }]; + + track_active_participant(&mut participants, &humans, &message); + + let participant = participants.get("victor").expect("participant missing"); + assert_eq!(participant.display_name, "Victor"); + assert_eq!(participant.role.as_deref(), Some("Maintainer")); + assert_eq!( + participant.profile_summary.as_deref(), + Some("Prefers direct replies.") + ); + } + + #[test] + fn falls_back_to_platform_scoped_key_for_unknown_sender() { + let mut participants = HashMap::new(); + let message = test_message(); + + track_active_participant(&mut participants, &[], &message); + + let participant = participants + .get("discord:12345") + .expect("participant missing"); + assert_eq!(participant.display_name, "Victor"); + assert!(participant.role.is_none()); + assert!(participant.profile_summary.is_none()); + } + + #[test] + fn renderable_participants_respects_minimum_and_ordering() { + let mut participants = HashMap::new(); + let message = test_message(); + let mut later = test_message(); + later.sender_id = "999".to_string(); + later.metadata.insert( + "sender_display_name".to_string(), + serde_json::Value::String("Alex".to_string()), + ); + later.timestamp = Utc.with_ymd_and_hms(2026, 3, 31, 12, 5, 0).unwrap(); + + track_active_participant(&mut participants, &[], &message); + track_active_participant(&mut participants, &[], &later); + + let config = ParticipantContextConfig { + min_participants: 2, + max_participants: 1, + ..ParticipantContextConfig::default() + }; + + let renderable = renderable_participants(&participants, &config); + assert_eq!(renderable.len(), 1); + assert_eq!(renderable[0].display_name, "Alex"); + } + + #[test] + fn participant_memory_key_uses_platform_scope_for_unknown_sender() { + assert_eq!( + participant_memory_key(&[], "discord", "12345"), + "discord:12345" + ); + } + + #[test] + fn upgrades_fallback_participant_entry_without_duplicates() { + let mut participants = HashMap::new(); + let message = test_message(); + + track_active_participant(&mut participants, &[], &message); + assert!(participants.contains_key("discord:12345")); + + let humans = vec![HumanDef { + id: "victor".to_string(), + display_name: Some("Victor".to_string()), + role: Some("Maintainer".to_string()), + bio: Some("Prefers direct replies.".to_string()), + description: None, + discord_id: Some("12345".to_string()), + telegram_id: None, + slack_id: None, + email: None, + }]; + + track_active_participant(&mut participants, &humans, &message); + + assert_eq!(participants.len(), 1); + assert!(!participants.contains_key("discord:12345")); + assert!(participants.contains_key("victor")); + } +} diff --git a/src/memory/working.rs b/src/memory/working.rs index cad9a586f..812d7926b 100644 --- a/src/memory/working.rs +++ b/src/memory/working.rs @@ -746,6 +746,73 @@ pub async fn render_channel_activity_map( Ok(output) } +/// Render Layer 4: participant context for the most recently active users in +/// the current channel. +pub async fn render_participant_context( + working_memory: &WorkingMemoryStore, + participants: &[crate::conversation::ActiveParticipant], + channel_id: &str, + config: &crate::config::ParticipantContextConfig, +) -> Result { + use std::fmt::Write; + + if !config.enabled || participants.len() < config.min_participants { + return Ok(String::new()); + } + + let now = Utc::now(); + if participants.is_empty() { + return Ok(String::new()); + } + + let mut output = String::new(); + writeln!(output, "## Participants\n").ok(); + + for participant in participants { + write!(output, "**{}**", participant.display_name).ok(); + if let Some(role) = participant.role.as_deref() { + write!(output, " -- {role}").ok(); + } + writeln!(output).ok(); + + if let Some(profile) = participant.profile_summary.as_deref() { + writeln!(output, " {profile}").ok(); + } + + let recent_line = participant_recent_activity( + working_memory, + &participant.participant_key, + participant.last_message_at, + now, + channel_id, + ) + .await?; + writeln!(output, " Recent: {recent_line}.").ok(); + writeln!(output).ok(); + + if estimate_tokens(&output) >= config.token_budget { + break; + } + } + + if estimate_tokens(&output) > config.token_budget { + let mut trimmed = String::new(); + let mut tokens_used = 0usize; + for line in output.lines() { + let candidate = format!("{line}\n"); + let candidate_tokens = estimate_tokens(&candidate); + if tokens_used + candidate_tokens > config.token_budget { + break; + } + trimmed.push_str(&candidate); + tokens_used += candidate_tokens; + } + return Ok(trimmed.trim_end().to_string()); + } + + Ok(output.trim_end().to_string()) +} + /// Format a single event as a one-line summary for the raw tail. fn format_event_line(event: &WorkingMemoryEvent, current_channel_id: &str) -> String { let type_label = match event.event_type { @@ -834,6 +901,42 @@ fn format_time_ago(now: DateTime, then: DateTime) -> String { } } +async fn participant_recent_activity( + working_memory: &WorkingMemoryStore, + user_id: &str, + last_message_at: DateTime, + now: DateTime, + channel_id: &str, +) -> Result { + let recent_events = working_memory.get_user_recent_events(user_id, 3).await?; + if recent_events.is_empty() { + return Ok(format!( + "active in this channel {}", + format_time_ago(now, last_message_at) + )); + } + + let parts: Vec = recent_events + .iter() + .rev() + .map(|event| { + let channel_prefix = match &event.channel_id { + Some(event_channel_id) if event_channel_id != channel_id => { + format!("in {event_channel_id} ") + } + _ => String::new(), + }; + format!( + "{channel_prefix}{} {}", + event.summary, + format_time_ago(now, event.timestamp) + ) + }) + .collect(); + + Ok(parts.join(", ")) +} + // --------------------------------------------------------------------------- // Builder // --------------------------------------------------------------------------- @@ -1470,6 +1573,61 @@ mod tests { assert!(rendered.is_empty(), "should be empty with no channels"); } + #[tokio::test] + async fn test_render_participant_context_uses_humans_and_recent_activity() { + let store = setup_test_store().await; + let config = crate::config::ParticipantContextConfig { + max_participants: 3, + ..crate::config::ParticipantContextConfig::default() + }; + + sqlx::query( + "INSERT INTO conversation_messages \ + (id, channel_id, role, sender_name, sender_id, content) \ + VALUES (?, ?, 'user', ?, ?, ?)", + ) + .bind("message-1") + .bind("discord:chan-1") + .bind("Victor") + .bind("12345") + .bind("Can you review this?") + .execute(&store.pool) + .await + .unwrap(); + + let event = WorkingMemoryEvent { + id: Uuid::new_v4().to_string(), + event_type: WorkingMemoryEventType::Decision, + timestamp: Utc::now() - chrono::Duration::minutes(10), + channel_id: Some("discord:chan-2".to_string()), + user_id: Some("victor".to_string()), + summary: "approved the migration approach".to_string(), + detail: None, + importance: 0.8, + day: store.today(), + }; + insert_event(&store.pool, &event).await.unwrap(); + + let participants = vec![crate::conversation::ActiveParticipant { + participant_key: "victor".to_string(), + platform: "discord".to_string(), + sender_id: "12345".to_string(), + display_name: "Victor".to_string(), + role: Some("Maintainer".to_string()), + profile_summary: Some("Prefers direct, technical responses.".to_string()), + last_message_at: Utc::now(), + }]; + + let rendered = render_participant_context(&store, &participants, "discord:chan-1", &config) + .await + .unwrap(); + + assert!(rendered.contains("## Participants")); + assert!(rendered.contains("**Victor** -- Maintainer")); + assert!(rendered.contains("Prefers direct, technical responses.")); + assert!(rendered.contains("approved the migration approach")); + } + #[test] fn test_format_time_ago() { let now = Utc::now(); diff --git a/src/prompts/engine.rs b/src/prompts/engine.rs index 3a7340ec8..a41326894 100644 --- a/src/prompts/engine.rs +++ b/src/prompts/engine.rs @@ -544,6 +544,7 @@ impl PromptEngine { self.render_channel_prompt_with_links( identity_context, memory_bulletin, + None, skills_prompt, worker_capabilities, conversation_context, @@ -557,6 +558,7 @@ impl PromptEngine { None, None, None, + None, ) } @@ -653,6 +655,7 @@ impl PromptEngine { &self, identity_context: Option, memory_bulletin: Option, + knowledge_synthesis: Option, skills_prompt: Option, worker_capabilities: String, conversation_context: Option, @@ -666,10 +669,9 @@ impl PromptEngine { backfill_transcript: Option, working_memory: Option, channel_activity_map: Option, + participant_context: Option, ) -> Result { - // During the transition, the bulletin is also exposed as knowledge_synthesis - // so the template can render it under the new heading. - let knowledge_synthesis = memory_bulletin.clone(); + let knowledge_synthesis = knowledge_synthesis.or_else(|| memory_bulletin.clone()); self.render( "channel", @@ -689,6 +691,7 @@ impl PromptEngine { backfill_transcript => backfill_transcript, working_memory => working_memory, channel_activity_map => channel_activity_map, + participant_context => participant_context, knowledge_synthesis => knowledge_synthesis, }, ) diff --git a/tests/context_dump.rs b/tests/context_dump.rs index bee401ea6..abf57cf35 100644 --- a/tests/context_dump.rs +++ b/tests/context_dump.rs @@ -252,6 +252,7 @@ async fn dump_channel_context() { )), worker_context_settings: Arc::new(tokio::sync::RwLock::new(Default::default())), model_overrides: Arc::new(Default::default()), + active_participants: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())), }; let tool_server = rig::tool::server::ToolServer::new().run(); @@ -496,6 +497,7 @@ async fn dump_all_contexts() { )), worker_context_settings: Arc::new(tokio::sync::RwLock::new(Default::default())), model_overrides: Arc::new(Default::default()), + active_participants: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())), }; let channel_tool_server = rig::tool::server::ToolServer::new().run(); let skip_flag = spacebot::tools::new_skip_flag(); From c3758033945ef578ab44f3572cecd273015e19fa Mon Sep 17 00:00:00 2001 From: Victor Sumner Date: Fri, 3 Apr 2026 10:34:10 -0400 Subject: [PATCH 2/3] fix(memory): address participant context review feedback --- src/agent/channel.rs | 142 ++++++++++++++++++++++++------- src/api/channels.rs | 9 +- src/conversation/participants.rs | 71 +++++++++++++--- 3 files changed, 175 insertions(+), 47 deletions(-) diff --git a/src/agent/channel.rs b/src/agent/channel.rs index f1a3487cd..7ab5cb0f8 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -86,6 +86,25 @@ fn should_flush_coalesce_buffer_for_event(event: &ProcessEvent) -> bool { ) } +fn sentence_contains_decision_marker(sentence: &str, explicit_markers: &[&str]) -> bool { + let sentence_lower = sentence.to_ascii_lowercase(); + explicit_markers + .iter() + .any(|marker| sentence_lower.contains(marker)) + || (sentence_lower.contains(" instead of ") + && [ + "use ", + "switch", + "adopt ", + "choose ", + "pick ", + "go with ", + "proceed with ", + ] + .iter() + .any(|marker| sentence_lower.contains(marker))) +} + fn extract_decision_summary_from_reply(reply_text: &str) -> Option { let normalized = reply_text.split_whitespace().collect::>().join(" "); let trimmed = normalized.trim(); @@ -136,9 +155,17 @@ fn extract_decision_summary_from_reply(reply_text: &str) -> Option { return None; } - let mut summary = trimmed + let sentences: Vec<&str> = trimmed .split_terminator(['.', '!', '?', '\n']) - .find(|sentence| !sentence.trim().is_empty()) + .map(str::trim) + .filter(|sentence| !sentence.is_empty()) + .collect(); + + let mut summary = sentences + .iter() + .copied() + .find(|sentence| sentence_contains_decision_marker(sentence, &explicit_markers)) + .or_else(|| sentences.first().copied()) .unwrap_or(trimmed) .trim() .to_string(); @@ -152,6 +179,23 @@ fn extract_decision_summary_from_reply(reply_text: &str) -> Option { Some(summary) } +fn decision_user_id( + humans: &[crate::config::HumanDef], + message: &InboundMessage, + is_retrigger: bool, +) -> Option { + if is_retrigger || message.source == "system" { + return None; + } + + let source = message.source.trim(); + if source.is_empty() || message.sender_id.is_empty() { + return None; + } + + Some(participant_memory_key(humans, source, &message.sender_id)) +} + /// Shared state that channel tools need to act on the channel. /// /// Wrapped in Arc and passed to tools (branch, spawn_worker, route, cancel) @@ -556,6 +600,27 @@ impl Drop for MessageDurationGuard { } impl Channel { + fn record_decision_event(&self, reply_text: Option<&str>, user_id: Option) { + let Some(decision_summary) = reply_text.and_then(extract_decision_summary_from_reply) + else { + return; + }; + + let mut event = self + .deps + .working_memory + .emit( + crate::memory::WorkingMemoryEventType::Decision, + decision_summary, + ) + .channel(self.id.as_ref()) + .importance(0.8); + if let Some(user_id) = user_id { + event = event.user(user_id); + } + event.record(); + } + /// Create a new channel. /// /// All tunable config (prompts, routing, thresholds, browser, skills) is read @@ -1587,7 +1652,7 @@ impl Channel { } // Run agent turn with any image/audio attachments preserved - let (result, skip_flag, replied_flag, _, _) = self + let (result, skip_flag, replied_flag, _, reply_text) = self .run_agent_turn( &combined_text, &system_prompt, @@ -1600,6 +1665,9 @@ impl Channel { self.handle_agent_result(result, &skip_flag, &replied_flag, false) .await; + if replied_flag.load(std::sync::atomic::Ordering::Relaxed) { + self.record_decision_event(reply_text.as_deref(), None); + } // Check compaction if let Err(error) = self.compactor.check_and_compact().await { tracing::warn!(channel_id = %self.id, %error, "compaction check failed"); @@ -1954,34 +2022,10 @@ impl Channel { self.handle_agent_result(result, &skip_flag, &replied_flag, is_retrigger) .await; - if replied_flag.load(std::sync::atomic::Ordering::Relaxed) - && let Some(decision_summary) = reply_text - .as_deref() - .and_then(extract_decision_summary_from_reply) - { - let user_id = if message.source.trim().is_empty() || message.sender_id.is_empty() { - None - } else { - let humans = self.deps.humans.load(); - Some(participant_memory_key( - humans.as_ref(), - message.source.trim(), - &message.sender_id, - )) - }; - let mut event = self - .deps - .working_memory - .emit( - crate::memory::WorkingMemoryEventType::Decision, - decision_summary, - ) - .channel(self.id.as_ref()) - .importance(0.8); - if let Some(user_id) = user_id { - event = event.user(user_id); - } - event.record(); + if replied_flag.load(std::sync::atomic::Ordering::Relaxed) { + let humans = self.deps.humans.load(); + let user_id = decision_user_id(humans.as_ref(), &message, is_retrigger); + self.record_decision_event(reply_text.as_deref(), user_id); } // Safety-net: in quiet mode, explicit mention/reply should never be dropped silently. @@ -3739,7 +3783,7 @@ fn is_dm_conversation_id(conv_id: &str) -> bool { #[cfg(test)] mod tests { use super::{ - QuietModeFallbackState, compute_listen_mode_invocation, + QuietModeFallbackState, compute_listen_mode_invocation, decision_user_id, extract_decision_summary_from_reply, is_dm_conversation_id, recv_channel_event, should_process_event_for_channel, should_send_discord_quiet_mode_ping_ack, should_send_quiet_mode_fallback, @@ -3851,6 +3895,40 @@ mod tests { ) .is_none() ); + assert_eq!( + extract_decision_summary_from_reply("Got it. We'll switch to the new routing config.") + .as_deref(), + Some("We'll switch to the new routing config") + ); + } + + #[test] + fn decision_user_id_skips_retrigger_messages() { + let humans = vec![crate::config::HumanDef { + id: "victor".to_string(), + display_name: Some("Victor".to_string()), + role: None, + bio: None, + description: None, + discord_id: Some("12345".to_string()), + telegram_id: None, + slack_id: None, + email: None, + }]; + let message = InboundMessage { + id: "message-1".to_string(), + source: "system".to_string(), + adapter: None, + conversation_id: "discord:chan-1".to_string(), + sender_id: "12345".to_string(), + agent_id: None, + content: crate::MessageContent::Text("retrigger".to_string()), + timestamp: chrono::Utc::now(), + metadata: HashMap::new(), + formatted_author: None, + }; + + assert!(decision_user_id(&humans, &message, true).is_none()); } #[test] diff --git a/src/api/channels.rs b/src/api/channels.rs index b00fda780..8c6028ce0 100644 --- a/src/api/channels.rs +++ b/src/api/channels.rs @@ -612,7 +612,14 @@ pub(super) async fn inspect_prompt( &participant_config, ) .await - .unwrap_or_default(); + .unwrap_or_else(|error| { + tracing::warn!( + %error, + channel_id = %query.channel_id, + "failed to render participant context for prompt inspection" + ); + String::new() + }); // ── Available channels ── let available_channels = { diff --git a/src/conversation/participants.rs b/src/conversation/participants.rs index f51634ac9..4e452ed60 100644 --- a/src/conversation/participants.rs +++ b/src/conversation/participants.rs @@ -5,6 +5,8 @@ use chrono::{DateTime, Utc}; use std::collections::HashMap; +const MAX_DISPLAY_NAME_CHARS: usize = 80; + /// Active participant state tracked for a live channel session. /// /// This is the bridge between today's lightweight config-backed participant @@ -59,9 +61,7 @@ pub fn track_active_participant( let human = find_human_for_sender(humans, platform, &message.sender_id); let display_name = human .and_then(|entry| entry.display_name.as_deref()) - .map(str::trim) - .filter(|name| !name.is_empty()) - .map(ToOwned::to_owned) + .and_then(sanitize_display_name) .unwrap_or_else(|| participant_display_name(message)); let participant_key = participant_memory_key(humans, platform, &message.sender_id); let fallback_key = format!("{platform}:{}", message.sender_id); @@ -112,23 +112,45 @@ pub fn renderable_participants( /// Best-effort display name for an inbound message sender. pub fn participant_display_name(message: &InboundMessage) -> String { message - .metadata - .get("sender_display_name") - .and_then(|value| value.as_str()) - .map(str::trim) - .filter(|name| !name.is_empty()) - .map(ToOwned::to_owned) + .formatted_author + .as_deref() + .and_then(sanitize_display_name) .or_else(|| { message - .formatted_author - .as_deref() - .map(str::trim) - .filter(|name| !name.is_empty()) - .map(ToOwned::to_owned) + .metadata + .get("sender_display_name") + .and_then(|value| value.as_str()) + .and_then(sanitize_display_name) }) + .or_else(|| sanitize_display_name(&message.sender_id)) .unwrap_or_else(|| message.sender_id.clone()) } +fn sanitize_display_name(value: &str) -> Option { + let collapsed = value + .chars() + .map(|character| { + if character.is_control() { + ' ' + } else { + character + } + }) + .collect::(); + let mut sanitized = collapsed.split_whitespace().collect::>().join(" "); + if sanitized.is_empty() { + return None; + } + + if sanitized.len() > MAX_DISPLAY_NAME_CHARS { + let boundary = sanitized.floor_char_boundary(MAX_DISPLAY_NAME_CHARS); + sanitized.truncate(boundary); + sanitized.push_str("..."); + } + + Some(sanitized) +} + fn find_human_for_sender<'a>( humans: &'a [HumanDef], platform: &str, @@ -259,6 +281,27 @@ mod tests { ); } + #[test] + fn participant_display_name_prefers_formatted_author() { + let mut message = test_message(); + message.formatted_author = Some("Victor Summit".to_string()); + + assert_eq!(participant_display_name(&message), "Victor Summit"); + } + + #[test] + fn participant_display_name_sanitizes_user_input() { + let mut message = test_message(); + message.formatted_author = Some(" Victor\n\t\u{0007} ".repeat(10)); + + let display_name = participant_display_name(&message); + assert!(display_name.starts_with("Victor ")); + assert!(display_name.ends_with("...")); + assert!(display_name.len() <= MAX_DISPLAY_NAME_CHARS + 3); + assert!(!display_name.contains('\n')); + assert!(!display_name.contains('\u{0007}')); + } + #[test] fn upgrades_fallback_participant_entry_without_duplicates() { let mut participants = HashMap::new(); From 73e227bdcb439c7f5d4b24b4c2c595e8ac073040 Mon Sep 17 00:00:00 2001 From: Victor Sumner Date: Fri, 3 Apr 2026 16:32:37 -0400 Subject: [PATCH 3/3] refactor(memory): apply review follow-ups --- src/agent/channel.rs | 173 +++++++++++++++++++++++------------------- src/memory/working.rs | 4 - 2 files changed, 95 insertions(+), 82 deletions(-) diff --git a/src/agent/channel.rs b/src/agent/channel.rs index 7ab5cb0f8..e44740a09 100644 --- a/src/agent/channel.rs +++ b/src/agent/channel.rs @@ -64,6 +64,39 @@ struct PendingResult { } const EVENT_LAG_WARNING_INTERVAL_SECS: u64 = 30; +const DECISION_MARKERS: &[&str] = &[ + "we decided to ", + "i decided to ", + "decision:", + "the decision is ", + "approved: ", + "approved to ", + "moving forward with ", + "move forward with ", + "going with ", + "switching to ", + "we will use ", + "i will use ", + "we'll use ", + "i'll use ", + "we will switch to ", + "i will switch to ", + "we'll switch to ", + "i'll switch to ", + "we will proceed with ", + "i will proceed with ", + "we'll proceed with ", + "i'll proceed with ", +]; +const CHANGE_COMPARISON_VERBS: &[&str] = &[ + "use ", + "switch", + "adopt ", + "choose ", + "pick ", + "go with ", + "proceed with ", +]; async fn recv_channel_event( event_rx: &mut broadcast::Receiver, @@ -86,23 +119,15 @@ fn should_flush_coalesce_buffer_for_event(event: &ProcessEvent) -> bool { ) } -fn sentence_contains_decision_marker(sentence: &str, explicit_markers: &[&str]) -> bool { +fn sentence_contains_decision_marker(sentence: &str) -> bool { let sentence_lower = sentence.to_ascii_lowercase(); - explicit_markers + DECISION_MARKERS .iter() .any(|marker| sentence_lower.contains(marker)) || (sentence_lower.contains(" instead of ") - && [ - "use ", - "switch", - "adopt ", - "choose ", - "pick ", - "go with ", - "proceed with ", - ] - .iter() - .any(|marker| sentence_lower.contains(marker))) + && CHANGE_COMPARISON_VERBS + .iter() + .any(|marker| sentence_lower.contains(marker))) } fn extract_decision_summary_from_reply(reply_text: &str) -> Option { @@ -113,43 +138,11 @@ fn extract_decision_summary_from_reply(reply_text: &str) -> Option { } let lower = trimmed.to_ascii_lowercase(); - let explicit_markers = [ - "we decided to ", - "i decided to ", - "decision:", - "the decision is ", - "approved: ", - "approved to ", - "moving forward with ", - "move forward with ", - "going with ", - "switching to ", - "we will use ", - "i will use ", - "we'll use ", - "i'll use ", - "we will switch to ", - "i will switch to ", - "we'll switch to ", - "i'll switch to ", - "we will proceed with ", - "i will proceed with ", - "we'll proceed with ", - "i'll proceed with ", - ]; - let has_explicit_marker = explicit_markers.iter().any(|marker| lower.contains(marker)); + let has_explicit_marker = DECISION_MARKERS.iter().any(|marker| lower.contains(marker)); let has_change_comparison = lower.contains(" instead of ") - && [ - "use ", - "switch", - "adopt ", - "choose ", - "pick ", - "go with ", - "proceed with ", - ] - .iter() - .any(|marker| lower.contains(marker)); + && CHANGE_COMPARISON_VERBS + .iter() + .any(|marker| lower.contains(marker)); if !has_explicit_marker && !has_change_comparison { return None; @@ -164,7 +157,7 @@ fn extract_decision_summary_from_reply(reply_text: &str) -> Option { let mut summary = sentences .iter() .copied() - .find(|sentence| sentence_contains_decision_marker(sentence, &explicit_markers)) + .find(|sentence| sentence_contains_decision_marker(sentence)) .or_else(|| sentences.first().copied()) .unwrap_or(trimmed) .trim() @@ -196,6 +189,14 @@ fn decision_user_id( Some(participant_memory_key(humans, source, &message.sender_id)) } +struct AgentTurnResult { + result: std::result::Result, + skip_flag: crate::tools::SkipFlag, + replied_flag: crate::tools::RepliedFlag, + retrigger_reply_preserved: bool, + reply_text: Option, +} + /// Shared state that channel tools need to act on the channel. /// /// Wrapped in Arc and passed to tools (branch, spawn_worker, route, cancel) @@ -1652,7 +1653,7 @@ impl Channel { } // Run agent turn with any image/audio attachments preserved - let (result, skip_flag, replied_flag, _, reply_text) = self + let turn_result = self .run_agent_turn( &combined_text, &system_prompt, @@ -1663,10 +1664,18 @@ impl Channel { ) .await?; - self.handle_agent_result(result, &skip_flag, &replied_flag, false) - .await; - if replied_flag.load(std::sync::atomic::Ordering::Relaxed) { - self.record_decision_event(reply_text.as_deref(), None); + self.handle_agent_result( + turn_result.result, + &turn_result.skip_flag, + &turn_result.replied_flag, + false, + ) + .await; + if turn_result + .replied_flag + .load(std::sync::atomic::Ordering::Relaxed) + { + self.record_decision_event(turn_result.reply_text.as_deref(), None); } // Check compaction if let Err(error) = self.compactor.check_and_compact().await { @@ -1729,6 +1738,7 @@ impl Channel { .and_then(|adapter| prompt_engine.render_channel_adapter_prompt(adapter)); let empty_to_none = |s: String| if s.is_empty() { None } else { Some(s) }; + let non_empty_option = |value: Option| value.filter(|text| !text.is_empty()); let project_context = self.build_project_context(&prompt_engine).await; @@ -1746,8 +1756,8 @@ impl Channel { let system_prompt = prompt_engine.render_channel_prompt_with_links( empty_to_none(identity_context), - memory_bulletin_text, - knowledge_synthesis_text, + non_empty_option(memory_bulletin_text), + non_empty_option(knowledge_synthesis_text), empty_to_none(skills_prompt), worker_capabilities, self.conversation_context.clone(), @@ -2008,7 +2018,7 @@ impl Channel { .adapter .as_deref() .or_else(|| self.current_adapter()); - let (result, skip_flag, replied_flag, retrigger_reply_preserved, reply_text) = self + let turn_result = self .run_agent_turn( &user_text, &system_prompt, @@ -2019,13 +2029,21 @@ impl Channel { ) .await?; - self.handle_agent_result(result, &skip_flag, &replied_flag, is_retrigger) - .await; + self.handle_agent_result( + turn_result.result, + &turn_result.skip_flag, + &turn_result.replied_flag, + is_retrigger, + ) + .await; - if replied_flag.load(std::sync::atomic::Ordering::Relaxed) { + if turn_result + .replied_flag + .load(std::sync::atomic::Ordering::Relaxed) + { let humans = self.deps.humans.load(); let user_id = decision_user_id(humans.as_ref(), &message, is_retrigger); - self.record_decision_event(reply_text.as_deref(), user_id); + self.record_decision_event(turn_result.reply_text.as_deref(), user_id); } // Safety-net: in quiet mode, explicit mention/reply should never be dropped silently. @@ -2037,8 +2055,12 @@ impl Channel { invoked_by_command, invoked_by_mention, invoked_by_reply, - skip_flag: skip_flag.load(std::sync::atomic::Ordering::Relaxed), - replied_flag: replied_flag.load(std::sync::atomic::Ordering::Relaxed), + skip_flag: turn_result + .skip_flag + .load(std::sync::atomic::Ordering::Relaxed), + replied_flag: turn_result + .replied_flag + .load(std::sync::atomic::Ordering::Relaxed), }, ) { self.send_builtin_text( @@ -2061,8 +2083,10 @@ impl Channel { // reply content payload, this fallback preserves a compact background // result record for the next user turn. if is_retrigger { - let replied = replied_flag.load(std::sync::atomic::Ordering::Relaxed); - if replied && retrigger_reply_preserved { + let replied = turn_result + .replied_flag + .load(std::sync::atomic::Ordering::Relaxed); + if replied && turn_result.retrigger_reply_preserved { tracing::debug!( channel_id = %self.id, "skipping retrigger summary injection; relay reply already preserved" @@ -2528,7 +2552,6 @@ impl Channel { /// Register per-turn tools, run the LLM agentic loop, and clean up. /// /// Returns the prompt result and per-turn flags for the caller to dispatch. - #[allow(clippy::type_complexity)] #[tracing::instrument(skip(self, user_text, system_prompt, attachment_content), fields(channel_id = %self.id, agent_id = %self.deps.agent_id))] async fn run_agent_turn( &self, @@ -2538,13 +2561,7 @@ impl Channel { attachment_content: Vec, is_retrigger: bool, adapter: Option<&str>, - ) -> Result<( - std::result::Result, - crate::tools::SkipFlag, - crate::tools::RepliedFlag, - bool, - Option, - )> { + ) -> Result { let skip_flag = crate::tools::new_skip_flag(); let replied_flag = crate::tools::new_replied_flag(); let allow_direct_reply = !self.suppress_plaintext_fallback(); @@ -2742,13 +2759,13 @@ impl Channel { tracing::warn!(%error, "failed to remove channel tools"); } - Ok(( + Ok(AgentTurnResult { result, skip_flag, replied_flag, - applied_history.retrigger_reply_preserved, - applied_history.reply_text, - )) + retrigger_reply_preserved: applied_history.retrigger_reply_preserved, + reply_text: applied_history.reply_text, + }) } /// Send outbound text and record send metrics. diff --git a/src/memory/working.rs b/src/memory/working.rs index 812d7926b..ea249bd7a 100644 --- a/src/memory/working.rs +++ b/src/memory/working.rs @@ -761,10 +761,6 @@ pub async fn render_participant_context( } let now = Utc::now(); - if participants.is_empty() { - return Ok(String::new()); - } - let mut output = String::new(); writeln!(output, "## Participants\n").ok();