Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/apps/desktop/src/api/agentic_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tauri::{AppHandle, State};

use crate::api::app_state::AppState;
use bitfun_core::agentic::tools::image_context::get_image_context;
use bitfun_core::agentic::coordination::{ConversationCoordinator, DialogTriggerSource};
use bitfun_core::agentic::coordination::{ConversationCoordinator, DialogScheduler, DialogTriggerSource};
use bitfun_core::agentic::core::*;
use bitfun_core::agentic::image_analysis::ImageContextData;
use bitfun_core::infrastructure::get_workspace_path;
Expand Down Expand Up @@ -186,6 +186,7 @@ pub async fn create_session(
pub async fn start_dialog_turn(
_app: AppHandle,
coordinator: State<'_, Arc<ConversationCoordinator>>,
scheduler: State<'_, Arc<DialogScheduler>>,
request: StartDialogTurnRequest,
) -> Result<StartDialogTurnResponse, String> {
let StartDialogTurnRequest {
Expand Down Expand Up @@ -214,8 +215,8 @@ pub async fn start_dialog_turn(
.await
.map_err(|e| format!("Failed to start dialog turn: {}", e))?;
} else {
coordinator
.start_dialog_turn(
scheduler
.submit(
session_id,
user_input,
turn_id,
Expand Down
15 changes: 9 additions & 6 deletions src/apps/desktop/src/api/image_analysis_api.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
//! Image Analysis API

use crate::api::app_state::AppState;
use bitfun_core::agentic::coordination::{ConversationCoordinator, DialogTriggerSource};
use bitfun_core::agentic::image_analysis::*;
use bitfun_core::agentic::coordination::{DialogScheduler, DialogTriggerSource};
use bitfun_core::agentic::image_analysis::{
resolve_vision_model_from_ai_config, AnalyzeImagesRequest, ImageAnalysisResult, ImageAnalyzer,
MessageEnhancer, SendEnhancedMessageRequest,
};
use log::error;
use std::sync::Arc;
use tauri::State;
Expand Down Expand Up @@ -56,7 +59,7 @@ pub async fn analyze_images(
#[tauri::command]
pub async fn send_enhanced_message(
request: SendEnhancedMessageRequest,
coordinator: State<'_, Arc<ConversationCoordinator>>,
scheduler: State<'_, Arc<DialogScheduler>>,
_state: State<'_, AppState>,
) -> Result<(), String> {
let enhanced_message = MessageEnhancer::enhance_with_image_analysis(
Expand All @@ -65,10 +68,10 @@ pub async fn send_enhanced_message(
&request.other_contexts,
);

let _stream = coordinator
.start_dialog_turn(
scheduler
.submit(
request.session_id.clone(),
enhanced_message.clone(),
enhanced_message,
Some(request.dialog_turn_id.clone()),
request.agent_type.clone(),
DialogTriggerSource::DesktopApi,
Expand Down
14 changes: 14 additions & 0 deletions src/apps/desktop/src/api/terminal_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio::sync::Mutex;
use bitfun_core::service::runtime::RuntimeManager;
use bitfun_core::service::terminal::{
AcknowledgeRequest as CoreAcknowledgeRequest, CloseSessionRequest as CoreCloseSessionRequest,
CommandCompletionReason as CoreCommandCompletionReason,
CreateSessionRequest as CoreCreateSessionRequest,
ExecuteCommandRequest as CoreExecuteCommandRequest,
ExecuteCommandResponse as CoreExecuteCommandResponse,
Expand Down Expand Up @@ -198,6 +199,7 @@ pub struct ExecuteCommandResponse {
pub command_id: String,
pub output: String,
pub exit_code: Option<i32>,
pub completion_reason: String,
}

impl From<CoreExecuteCommandResponse> for ExecuteCommandResponse {
Expand All @@ -207,6 +209,10 @@ impl From<CoreExecuteCommandResponse> for ExecuteCommandResponse {
command_id: resp.command_id,
output: resp.output,
exit_code: resp.exit_code,
completion_reason: match resp.completion_reason {
CoreCommandCompletionReason::Completed => "completed".to_string(),
CoreCommandCompletionReason::TimedOut => "timedOut".to_string(),
},
}
}
}
Expand All @@ -230,6 +236,10 @@ pub struct GetHistoryResponse {
pub session_id: String,
pub data: String,
pub history_size: usize,
/// PTY column count at the time history was captured.
pub cols: u16,
/// PTY row count at the time history was captured.
pub rows: u16,
}

impl From<CoreGetHistoryResponse> for GetHistoryResponse {
Expand All @@ -238,6 +248,8 @@ impl From<CoreGetHistoryResponse> for GetHistoryResponse {
session_id: resp.session_id,
data: resp.data,
history_size: resp.history_size,
cols: resp.cols,
rows: resp.rows,
}
}
}
Expand Down Expand Up @@ -494,6 +506,8 @@ pub async fn terminal_get_history(
session_id: response.session_id,
data: response.data,
history_size: response.history_size,
cols: response.cols,
rows: response.rows,
})
}

Expand Down
53 changes: 42 additions & 11 deletions src/apps/desktop/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ pub struct CoordinatorState {
pub coordinator: Arc<bitfun_core::agentic::coordination::ConversationCoordinator>,
}

/// Dialog scheduler state (primary entry point for user messages)
#[derive(Clone)]
pub struct SchedulerState {
pub scheduler: Arc<bitfun_core::agentic::coordination::DialogScheduler>,
}

/// Tauri application entry point
#[cfg_attr(mobile, tauri::mobile_entry_point)]
pub async fn run() {
Expand All @@ -72,7 +78,7 @@ pub async fn run() {
return;
}

let (coordinator, event_queue, event_router, ai_client_factory) =
let (coordinator, scheduler, event_queue, event_router, ai_client_factory) =
match init_agentic_system().await {
Ok(state) => state,
Err(e) => {
Expand All @@ -98,6 +104,10 @@ pub async fn run() {
coordinator: coordinator.clone(),
};

let scheduler_state = SchedulerState {
scheduler: scheduler.clone(),
};

let terminal_state = api::terminal_api::TerminalState::new();

let path_manager = get_path_manager_arc();
Expand Down Expand Up @@ -149,8 +159,10 @@ pub async fn run() {
})
.manage(app_state)
.manage(coordinator_state)
.manage(scheduler_state)
.manage(path_manager)
.manage(coordinator)
.manage(scheduler)
.manage(terminal_state)
.setup(move |app| {
logging::register_runtime_log_state(startup_log_level, session_log_dir.clone());
Expand All @@ -160,14 +172,13 @@ pub async fn run() {
// so the primary candidate is "mobile-web/dist". Additional fallbacks
// handle legacy or non-standard bundle layouts.
{
let candidates = [
"mobile-web/dist",
"mobile-web",
"dist",
];
let candidates = ["mobile-web/dist", "mobile-web", "dist"];
let mut found = false;
for candidate in &candidates {
if let Ok(p) = app.path().resolve(candidate, tauri::path::BaseDirectory::Resource) {
if let Ok(p) = app
.path()
.resolve(candidate, tauri::path::BaseDirectory::Resource)
{
if p.join("index.html").exists() {
log::info!("Found bundled mobile-web at: {}", p.display());
api::remote_connect_api::set_mobile_web_resource_path(p);
Expand All @@ -180,9 +191,16 @@ pub async fn run() {
// Last resort: scan the resource root for any index.html
if let Ok(res_dir) = app.path().resource_dir() {
for sub in &["mobile-web/dist", "mobile-web", "dist", ""] {
let p = if sub.is_empty() { res_dir.clone() } else { res_dir.join(sub) };
let p = if sub.is_empty() {
res_dir.clone()
} else {
res_dir.join(sub)
};
if p.join("index.html").exists() {
log::info!("Found mobile-web via resource root scan: {}", p.display());
log::info!(
"Found mobile-web via resource root scan: {}",
p.display()
);
api::remote_connect_api::set_mobile_web_resource_path(p);
break;
}
Expand Down Expand Up @@ -575,6 +593,7 @@ pub async fn run() {

async fn init_agentic_system() -> anyhow::Result<(
Arc<bitfun_core::agentic::coordination::ConversationCoordinator>,
Arc<bitfun_core::agentic::coordination::DialogScheduler>,
Arc<bitfun_core::agentic::events::EventQueue>,
Arc<bitfun_core::agentic::events::EventRouter>,
Arc<AIClientFactory>,
Expand Down Expand Up @@ -636,7 +655,7 @@ async fn init_agentic_system() -> anyhow::Result<(
));

let coordinator = Arc::new(coordination::ConversationCoordinator::new(
session_manager,
session_manager.clone(),
execution_engine,
tool_pipeline,
event_queue.clone(),
Expand All @@ -645,8 +664,20 @@ async fn init_agentic_system() -> anyhow::Result<(

coordination::ConversationCoordinator::set_global(coordinator.clone());

// Create the DialogScheduler and wire up the outcome notification channel
let scheduler =
coordination::DialogScheduler::new(coordinator.clone(), session_manager.clone());
coordinator.set_scheduler_notifier(scheduler.outcome_sender());
coordination::set_global_scheduler(scheduler.clone());

log::info!("Agentic system initialized");
Ok((coordinator, event_queue, event_router, ai_client_factory))
Ok((
coordinator,
scheduler,
event_queue,
event_router,
ai_client_factory,
))
}

async fn init_function_agents(ai_client_factory: Arc<AIClientFactory>) -> anyhow::Result<()> {
Expand Down
52 changes: 47 additions & 5 deletions src/crates/core/src/agentic/coordination/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::util::errors::{BitFunError, BitFunResult};
use log::{debug, error, info, warn};
use std::sync::Arc;
use std::sync::OnceLock;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

/// Subagent execution result
Expand Down Expand Up @@ -65,13 +66,26 @@ impl Drop for CancelTokenGuard {
}
}

/// Outcome of a completed dialog turn, used to notify DialogScheduler
#[derive(Debug, Clone)]
pub enum TurnOutcome {
/// Turn completed normally
Completed,
/// Turn was cancelled by user
Cancelled,
/// Turn failed with an error
Failed,
}

/// Conversation coordinator
pub struct ConversationCoordinator {
session_manager: Arc<SessionManager>,
execution_engine: Arc<ExecutionEngine>,
tool_pipeline: Arc<ToolPipeline>,
event_queue: Arc<EventQueue>,
event_router: Arc<EventRouter>,
/// Notifies DialogScheduler of turn outcomes; injected after construction
scheduler_notify_tx: OnceLock<mpsc::Sender<(String, TurnOutcome)>>,
}

impl ConversationCoordinator {
Expand All @@ -88,17 +102,25 @@ impl ConversationCoordinator {
tool_pipeline,
event_queue,
event_router,
scheduler_notify_tx: OnceLock::new(),
}
}

/// Inject the DialogScheduler notification channel after construction.
/// Called once during app initialization after the scheduler is created.
pub fn set_scheduler_notifier(&self, tx: mpsc::Sender<(String, TurnOutcome)>) {
let _ = self.scheduler_notify_tx.set(tx);
}

/// Create a new session
pub async fn create_session(
&self,
session_name: String,
agent_type: String,
config: SessionConfig,
) -> BitFunResult<Session> {
self.create_session_with_workspace(None, session_name, agent_type, config, None).await
self.create_session_with_workspace(None, session_name, agent_type, config, None)
.await
}

/// Create a new session with optional session ID
Expand All @@ -109,7 +131,8 @@ impl ConversationCoordinator {
agent_type: String,
config: SessionConfig,
) -> BitFunResult<Session> {
self.create_session_with_workspace(session_id, session_name, agent_type, config, None).await
self.create_session_with_workspace(session_id, session_name, agent_type, config, None)
.await
}

/// Create a new session with optional session ID and workspace binding.
Expand Down Expand Up @@ -561,6 +584,7 @@ impl ConversationCoordinator {
let turn_id_clone = turn_id.clone();
let session_workspace_path = session.config.workspace_path.clone();
let effective_agent_type_clone = effective_agent_type.clone();
let scheduler_notify_tx = self.scheduler_notify_tx.get().cloned();

tokio::spawn(async move {
// Note: Don't check cancellation here as cancel token hasn't been created yet
Expand Down Expand Up @@ -621,6 +645,10 @@ impl ConversationCoordinator {
let _ = session_manager
.update_session_state(&session_id_clone, SessionState::Idle)
.await;

if let Some(tx) = &scheduler_notify_tx {
let _ = tx.try_send((session_id_clone.clone(), TurnOutcome::Completed));
}
}
Err(e) => {
let is_cancellation = matches!(&e, BitFunError::Cancelled(_));
Expand All @@ -632,6 +660,10 @@ impl ConversationCoordinator {
let _ = session_manager
.update_session_state(&session_id_clone, SessionState::Idle)
.await;

if let Some(tx) = &scheduler_notify_tx {
let _ = tx.try_send((session_id_clone.clone(), TurnOutcome::Cancelled));
}
} else {
error!("Dialog turn execution failed: {}", e);

Expand Down Expand Up @@ -659,6 +691,10 @@ impl ConversationCoordinator {
},
)
.await;

if let Some(tx) = &scheduler_notify_tx {
let _ = tx.try_send((session_id_clone.clone(), TurnOutcome::Failed));
}
}
}
}
Expand Down Expand Up @@ -765,7 +801,9 @@ impl ConversationCoordinator {
limit: usize,
before_message_id: Option<&str>,
) -> BitFunResult<(Vec<Message>, bool)> {
self.session_manager.get_messages_paginated(session_id, limit, before_message_id).await
self.session_manager
.get_messages_paginated(session_id, limit, before_message_id)
.await
}

/// Subscribe to internal events
Expand Down Expand Up @@ -830,7 +868,9 @@ impl ConversationCoordinator {
if let Some(token) = cancel_token {
if token.is_cancelled() {
debug!("Subagent task cancelled before execution");
return Err(BitFunError::Cancelled("Subagent task has been cancelled".to_string()));
return Err(BitFunError::Cancelled(
"Subagent task has been cancelled".to_string(),
));
}
}

Expand All @@ -851,7 +891,9 @@ impl ConversationCoordinator {
if token.is_cancelled() {
debug!("Subagent task cancelled before AI call, cleaning up resources");
let _ = self.cleanup_subagent_resources(&session.session_id).await;
return Err(BitFunError::Cancelled("Subagent task has been cancelled".to_string()));
return Err(BitFunError::Cancelled(
"Subagent task has been cancelled".to_string(),
));
}
}

Expand Down
Loading