Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 61 additions & 23 deletions src/crates/core/src/agentic/coordination/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use crate::agentic::events::{
AgenticEvent, EventPriority, EventQueue, EventRouter, EventSubscriber,
};
use crate::agentic::execution::{ExecutionContext, ExecutionEngine};
use crate::agentic::image_analysis::ImageContextData;
use crate::agentic::session::SessionManager;
use crate::agentic::tools::pipeline::{SubagentParentInfo, ToolPipeline};
use crate::agentic::image_analysis::ImageContextData;
use crate::util::errors::{BitFunError, BitFunResult};
use log::{debug, error, info, warn};
use std::sync::Arc;
Expand Down Expand Up @@ -147,8 +147,7 @@ impl ConversationCoordinator {
workspace_path: Option<String>,
) -> BitFunResult<Session> {
let effective_workspace_path = workspace_path.or_else(|| {
crate::infrastructure::get_workspace_path()
.map(|p| p.to_string_lossy().to_string())
crate::infrastructure::get_workspace_path().map(|p| p.to_string_lossy().to_string())
});

// Persist the workspace binding inside the session config so execution can
Expand Down Expand Up @@ -246,17 +245,16 @@ impl ConversationCoordinator {
terminal_session_id: existing
.as_ref()
.and_then(|m| m.terminal_session_id.clone()),
snapshot_session_id: session
.snapshot_session_id
.clone()
.or_else(|| existing.as_ref().and_then(|m| m.snapshot_session_id.clone())),
snapshot_session_id: session.snapshot_session_id.clone().or_else(|| {
existing
.as_ref()
.and_then(|m| m.snapshot_session_id.clone())
}),
tags: existing
.as_ref()
.map(|m| m.tags.clone())
.unwrap_or_default(),
custom_metadata: existing
.as_ref()
.and_then(|m| m.custom_metadata.clone()),
custom_metadata: existing.as_ref().and_then(|m| m.custom_metadata.clone()),
todos: existing.as_ref().and_then(|m| m.todos.clone()),
workspace_path: Some(workspace_path),
};
Expand Down Expand Up @@ -390,8 +388,15 @@ impl ConversationCoordinator {
agent_type: String,
trigger_source: DialogTriggerSource,
) -> BitFunResult<()> {
self.start_dialog_turn_internal(session_id, user_input, None, turn_id, agent_type, trigger_source)
.await
self.start_dialog_turn_internal(
session_id,
user_input,
None,
turn_id,
agent_type,
trigger_source,
)
.await
}

pub async fn start_dialog_turn_with_image_contexts(
Expand Down Expand Up @@ -560,12 +565,40 @@ impl ConversationCoordinator {
}
};

let effective_agent_type = if session.agent_type.is_empty() {
agent_type
} else {
let requested_agent_type = agent_type.trim().to_string();

let effective_agent_type = if !requested_agent_type.is_empty() {
requested_agent_type.clone()
} else if !session.agent_type.is_empty() {
session.agent_type.clone()
} else {
"agentic".to_string()
};

debug!(
"Resolved dialog turn agent type: session_id={}, turn_id={}, requested_agent_type={}, session_agent_type={}, effective_agent_type={}, trigger_source={:?}",
session_id,
turn_id.as_deref().unwrap_or(""),
if requested_agent_type.is_empty() {
"<empty>"
} else {
requested_agent_type.as_str()
},
if session.agent_type.is_empty() {
"<empty>"
} else {
session.agent_type.as_str()
},
effective_agent_type,
trigger_source
);

if session.agent_type != effective_agent_type {
self.session_manager
.update_session_agent_type(&session_id, &effective_agent_type)
.await?;
}

debug!(
"Checking session state: session_id={}, state={:?}",
session_id, session.state
Expand Down Expand Up @@ -709,8 +742,14 @@ impl ConversationCoordinator {
// vision model to pre-analyze them, then enhance the user message with text descriptions.
// This is the single authoritative code path for all image handling (desktop, remote, bot).
// If no vision model is configured, the request is rejected with a user-friendly message.
let (user_input, image_contexts) =
self.pre_analyze_images_if_needed(user_input, image_contexts, &session_id, user_message_metadata.clone()).await?;
let (user_input, image_contexts) = self
.pre_analyze_images_if_needed(
user_input,
image_contexts,
&session_id,
user_message_metadata.clone(),
)
.await?;

let wrapped_user_input = self
.wrap_user_input(&effective_agent_type, user_input)
Expand Down Expand Up @@ -906,7 +945,10 @@ impl ConversationCoordinator {
let is_cancellation = matches!(&e, BitFunError::Cancelled(_));

if is_cancellation {
info!("Dialog turn cancelled: session={}, turn={}", session_id_clone, turn_id_clone);
info!(
"Dialog turn cancelled: session={}, turn={}",
session_id_clone, turn_id_clone
);

// The execution engine only emits DialogTurnCancelled when
// cancellation is detected between rounds. If cancellation
Expand Down Expand Up @@ -968,11 +1010,7 @@ impl ConversationCoordinator {
.await;

let _ = session_manager
.fail_dialog_turn(
&session_id_clone,
&turn_id_clone,
e.to_string(),
)
.fail_dialog_turn(&session_id_clone, &turn_id_clone, e.to_string())
.await;

let _ = session_manager
Expand Down
65 changes: 21 additions & 44 deletions src/crates/core/src/agentic/coordination/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,29 +107,16 @@ impl DialogScheduler {
.map(|s| s.state.clone());

match state {
None => {
self.coordinator
.start_dialog_turn(
session_id,
user_input,
turn_id,
agent_type,
trigger_source,
)
.await
.map_err(|e| e.to_string())
}
None => self
.coordinator
.start_dialog_turn(session_id, user_input, turn_id, agent_type, trigger_source)
.await
.map_err(|e| e.to_string()),

Some(SessionState::Error { .. }) => {
self.clear_queue_and_debounce(&session_id);
self.coordinator
.start_dialog_turn(
session_id,
user_input,
turn_id,
agent_type,
trigger_source,
)
.start_dialog_turn(session_id, user_input, turn_id, agent_type, trigger_source)
.await
.map_err(|e| e.to_string())
}
Expand All @@ -143,13 +130,7 @@ impl DialogScheduler {
.unwrap_or(false);

if in_debounce || queue_non_empty {
self.enqueue(
&session_id,
user_input,
turn_id,
agent_type,
trigger_source,
)?;
self.enqueue(&session_id, user_input, turn_id, agent_type, trigger_source)?;
self.schedule_debounce(session_id);
Ok(())
} else {
Expand All @@ -167,13 +148,7 @@ impl DialogScheduler {
}

Some(SessionState::Processing { .. }) => {
self.enqueue(
&session_id,
user_input,
turn_id,
agent_type,
trigger_source,
)?;
self.enqueue(&session_id, user_input, turn_id, agent_type, trigger_source)?;
Ok(())
}
}
Expand Down Expand Up @@ -277,8 +252,7 @@ impl DialogScheduler {
session_id_clone
);

let (merged_input, turn_id, agent_type, trigger_source) =
merge_messages(messages);
let (merged_input, turn_id, agent_type, trigger_source) = merge_messages(messages);

if let Err(e) = coordinator
.start_dialog_turn(
Expand Down Expand Up @@ -349,19 +323,22 @@ impl DialogScheduler {
/// Queued #2
/// <second message>
/// ```
fn merge_messages(messages: Vec<QueuedTurn>) -> (String, Option<String>, String, DialogTriggerSource) {
fn merge_messages(
messages: Vec<QueuedTurn>,
) -> (String, Option<String>, String, DialogTriggerSource) {
if messages.len() == 1 {
let m = messages.into_iter().next().unwrap();
return (
m.user_input,
m.turn_id,
m.agent_type,
m.trigger_source,
);
return (m.user_input, m.turn_id, m.agent_type, m.trigger_source);
}

let agent_type = messages[0].agent_type.clone();
let trigger_source = messages[0].trigger_source;
let agent_type = messages
.last()
.map(|m| m.agent_type.clone())
.unwrap_or_else(|| "agentic".to_string());
let trigger_source = messages
.last()
.map(|m| m.trigger_source)
.unwrap_or(DialogTriggerSource::DesktopUi);

let entries: Vec<String> = messages
.iter()
Expand Down
64 changes: 43 additions & 21 deletions src/crates/core/src/agentic/session/session_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,7 @@ impl SessionManager {
}

/// Update session title (in-memory + persistence)
pub async fn update_session_title(
&self,
session_id: &str,
title: &str,
) -> BitFunResult<()> {
pub async fn update_session_title(&self, session_id: &str, title: &str) -> BitFunResult<()> {
let workspace_path = self
.sessions
.get(session_id)
Expand All @@ -206,9 +202,7 @@ impl SessionManager {
.await
{
Ok(conv_mgr) => {
if let Ok(Some(mut meta)) =
conv_mgr.load_session_metadata(session_id).await
{
if let Ok(Some(mut meta)) = conv_mgr.load_session_metadata(session_id).await {
meta.session_name = title.to_string();
meta.touch();
if let Err(e) = conv_mgr.save_session_metadata(&meta).await {
Expand All @@ -233,6 +227,37 @@ impl SessionManager {
Ok(())
}

/// Update session agent type (in-memory + persistence)
pub async fn update_session_agent_type(
&self,
session_id: &str,
agent_type: &str,
) -> BitFunResult<()> {
if let Some(mut session) = self.sessions.get_mut(session_id) {
session.agent_type = agent_type.to_string();
session.updated_at = SystemTime::now();
session.last_activity_at = SystemTime::now();
} else {
return Err(BitFunError::NotFound(format!(
"Session not found: {}",
session_id
)));
}

if self.config.enable_persistence {
if let Some(session) = self.sessions.get(session_id) {
self.persistence_manager.save_session(&session).await?;
}
}

debug!(
"Session agent type updated: session_id={}, agent_type={}",
session_id, agent_type
);

Ok(())
}

/// Update session activity time
pub fn touch_session(&self, session_id: &str) {
if let Some(mut session) = self.sessions.get_mut(session_id) {
Expand Down Expand Up @@ -552,13 +577,12 @@ impl SessionManager {
}

// 2. Add user message to history and compression managers
let user_message = if let Some(images) =
image_contexts.as_ref().filter(|v| !v.is_empty()).cloned()
{
Message::user_multimodal(user_input, images).with_turn_id(turn_id.clone())
} else {
Message::user(user_input).with_turn_id(turn_id.clone())
};
let user_message =
if let Some(images) = image_contexts.as_ref().filter(|v| !v.is_empty()).cloned() {
Message::user_multimodal(user_input, images).with_turn_id(turn_id.clone())
} else {
Message::user(user_input).with_turn_id(turn_id.clone())
};
self.history_manager
.add_message(session_id, user_message.clone())
.await?;
Expand Down Expand Up @@ -662,11 +686,7 @@ impl SessionManager {
Ok(context_messages) => {
if let Err(err) = self
.persistence_manager
.save_turn_context_snapshot(
session_id,
turn.turn_index,
&context_messages,
)
.save_turn_context_snapshot(session_id, turn.turn_index, &context_messages)
.await
{
warn!(
Expand Down Expand Up @@ -707,7 +727,9 @@ impl SessionManager {
limit: usize,
before_message_id: Option<&str>,
) -> BitFunResult<(Vec<Message>, bool)> {
self.history_manager.get_messages_paginated(session_id, limit, before_message_id).await
self.history_manager
.get_messages_paginated(session_id, limit, before_message_id)
.await
}

/// Get session's context messages (may be compressed)
Expand Down