Skip to content

Commit 0bdf5bb

Browse files
authored
Merge pull request #86 from wsp1911/main
feat: introduce DialogScheduler, fix terminal history replay and improve Bash tool result handling
2 parents 25e7da6 + 13e26c0 commit 0bdf5bb

File tree

24 files changed

+1333
-301
lines changed

24 files changed

+1333
-301
lines changed

src/apps/desktop/src/api/agentic_api.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tauri::{AppHandle, State};
77

88
use crate::api::app_state::AppState;
99
use bitfun_core::agentic::tools::image_context::get_image_context;
10-
use bitfun_core::agentic::coordination::{ConversationCoordinator, DialogTriggerSource};
10+
use bitfun_core::agentic::coordination::{ConversationCoordinator, DialogScheduler, DialogTriggerSource};
1111
use bitfun_core::agentic::core::*;
1212
use bitfun_core::agentic::image_analysis::ImageContextData;
1313
use bitfun_core::infrastructure::get_workspace_path;
@@ -186,6 +186,7 @@ pub async fn create_session(
186186
pub async fn start_dialog_turn(
187187
_app: AppHandle,
188188
coordinator: State<'_, Arc<ConversationCoordinator>>,
189+
scheduler: State<'_, Arc<DialogScheduler>>,
189190
request: StartDialogTurnRequest,
190191
) -> Result<StartDialogTurnResponse, String> {
191192
let StartDialogTurnRequest {
@@ -214,8 +215,8 @@ pub async fn start_dialog_turn(
214215
.await
215216
.map_err(|e| format!("Failed to start dialog turn: {}", e))?;
216217
} else {
217-
coordinator
218-
.start_dialog_turn(
218+
scheduler
219+
.submit(
219220
session_id,
220221
user_input,
221222
turn_id,

src/apps/desktop/src/api/image_analysis_api.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
//! Image Analysis API
22
33
use crate::api::app_state::AppState;
4-
use bitfun_core::agentic::coordination::{ConversationCoordinator, DialogTriggerSource};
5-
use bitfun_core::agentic::image_analysis::*;
4+
use bitfun_core::agentic::coordination::{DialogScheduler, DialogTriggerSource};
5+
use bitfun_core::agentic::image_analysis::{
6+
resolve_vision_model_from_ai_config, AnalyzeImagesRequest, ImageAnalysisResult, ImageAnalyzer,
7+
MessageEnhancer, SendEnhancedMessageRequest,
8+
};
69
use log::error;
710
use std::sync::Arc;
811
use tauri::State;
@@ -56,7 +59,7 @@ pub async fn analyze_images(
5659
#[tauri::command]
5760
pub async fn send_enhanced_message(
5861
request: SendEnhancedMessageRequest,
59-
coordinator: State<'_, Arc<ConversationCoordinator>>,
62+
scheduler: State<'_, Arc<DialogScheduler>>,
6063
_state: State<'_, AppState>,
6164
) -> Result<(), String> {
6265
let enhanced_message = MessageEnhancer::enhance_with_image_analysis(
@@ -65,10 +68,10 @@ pub async fn send_enhanced_message(
6568
&request.other_contexts,
6669
);
6770

68-
let _stream = coordinator
69-
.start_dialog_turn(
71+
scheduler
72+
.submit(
7073
request.session_id.clone(),
71-
enhanced_message.clone(),
74+
enhanced_message,
7275
Some(request.dialog_turn_id.clone()),
7376
request.agent_type.clone(),
7477
DialogTriggerSource::DesktopApi,

src/apps/desktop/src/api/terminal_api.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use tokio::sync::Mutex;
1010
use bitfun_core::service::runtime::RuntimeManager;
1111
use bitfun_core::service::terminal::{
1212
AcknowledgeRequest as CoreAcknowledgeRequest, CloseSessionRequest as CoreCloseSessionRequest,
13+
CommandCompletionReason as CoreCommandCompletionReason,
1314
CreateSessionRequest as CoreCreateSessionRequest,
1415
ExecuteCommandRequest as CoreExecuteCommandRequest,
1516
ExecuteCommandResponse as CoreExecuteCommandResponse,
@@ -198,6 +199,7 @@ pub struct ExecuteCommandResponse {
198199
pub command_id: String,
199200
pub output: String,
200201
pub exit_code: Option<i32>,
202+
pub completion_reason: String,
201203
}
202204

203205
impl From<CoreExecuteCommandResponse> for ExecuteCommandResponse {
@@ -207,6 +209,10 @@ impl From<CoreExecuteCommandResponse> for ExecuteCommandResponse {
207209
command_id: resp.command_id,
208210
output: resp.output,
209211
exit_code: resp.exit_code,
212+
completion_reason: match resp.completion_reason {
213+
CoreCommandCompletionReason::Completed => "completed".to_string(),
214+
CoreCommandCompletionReason::TimedOut => "timedOut".to_string(),
215+
},
210216
}
211217
}
212218
}
@@ -230,6 +236,10 @@ pub struct GetHistoryResponse {
230236
pub session_id: String,
231237
pub data: String,
232238
pub history_size: usize,
239+
/// PTY column count at the time history was captured.
240+
pub cols: u16,
241+
/// PTY row count at the time history was captured.
242+
pub rows: u16,
233243
}
234244

235245
impl From<CoreGetHistoryResponse> for GetHistoryResponse {
@@ -238,6 +248,8 @@ impl From<CoreGetHistoryResponse> for GetHistoryResponse {
238248
session_id: resp.session_id,
239249
data: resp.data,
240250
history_size: resp.history_size,
251+
cols: resp.cols,
252+
rows: resp.rows,
241253
}
242254
}
243255
}
@@ -494,6 +506,8 @@ pub async fn terminal_get_history(
494506
session_id: response.session_id,
495507
data: response.data,
496508
history_size: response.history_size,
509+
cols: response.cols,
510+
rows: response.rows,
497511
})
498512
}
499513

src/apps/desktop/src/lib.rs

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ pub struct CoordinatorState {
5050
pub coordinator: Arc<bitfun_core::agentic::coordination::ConversationCoordinator>,
5151
}
5252

53+
/// Dialog scheduler state (primary entry point for user messages)
54+
#[derive(Clone)]
55+
pub struct SchedulerState {
56+
pub scheduler: Arc<bitfun_core::agentic::coordination::DialogScheduler>,
57+
}
58+
5359
/// Tauri application entry point
5460
#[cfg_attr(mobile, tauri::mobile_entry_point)]
5561
pub async fn run() {
@@ -72,7 +78,7 @@ pub async fn run() {
7278
return;
7379
}
7480

75-
let (coordinator, event_queue, event_router, ai_client_factory) =
81+
let (coordinator, scheduler, event_queue, event_router, ai_client_factory) =
7682
match init_agentic_system().await {
7783
Ok(state) => state,
7884
Err(e) => {
@@ -98,6 +104,10 @@ pub async fn run() {
98104
coordinator: coordinator.clone(),
99105
};
100106

107+
let scheduler_state = SchedulerState {
108+
scheduler: scheduler.clone(),
109+
};
110+
101111
let terminal_state = api::terminal_api::TerminalState::new();
102112

103113
let path_manager = get_path_manager_arc();
@@ -149,8 +159,10 @@ pub async fn run() {
149159
})
150160
.manage(app_state)
151161
.manage(coordinator_state)
162+
.manage(scheduler_state)
152163
.manage(path_manager)
153164
.manage(coordinator)
165+
.manage(scheduler)
154166
.manage(terminal_state)
155167
.setup(move |app| {
156168
logging::register_runtime_log_state(startup_log_level, session_log_dir.clone());
@@ -160,14 +172,13 @@ pub async fn run() {
160172
// so the primary candidate is "mobile-web/dist". Additional fallbacks
161173
// handle legacy or non-standard bundle layouts.
162174
{
163-
let candidates = [
164-
"mobile-web/dist",
165-
"mobile-web",
166-
"dist",
167-
];
175+
let candidates = ["mobile-web/dist", "mobile-web", "dist"];
168176
let mut found = false;
169177
for candidate in &candidates {
170-
if let Ok(p) = app.path().resolve(candidate, tauri::path::BaseDirectory::Resource) {
178+
if let Ok(p) = app
179+
.path()
180+
.resolve(candidate, tauri::path::BaseDirectory::Resource)
181+
{
171182
if p.join("index.html").exists() {
172183
log::info!("Found bundled mobile-web at: {}", p.display());
173184
api::remote_connect_api::set_mobile_web_resource_path(p);
@@ -180,9 +191,16 @@ pub async fn run() {
180191
// Last resort: scan the resource root for any index.html
181192
if let Ok(res_dir) = app.path().resource_dir() {
182193
for sub in &["mobile-web/dist", "mobile-web", "dist", ""] {
183-
let p = if sub.is_empty() { res_dir.clone() } else { res_dir.join(sub) };
194+
let p = if sub.is_empty() {
195+
res_dir.clone()
196+
} else {
197+
res_dir.join(sub)
198+
};
184199
if p.join("index.html").exists() {
185-
log::info!("Found mobile-web via resource root scan: {}", p.display());
200+
log::info!(
201+
"Found mobile-web via resource root scan: {}",
202+
p.display()
203+
);
186204
api::remote_connect_api::set_mobile_web_resource_path(p);
187205
break;
188206
}
@@ -575,6 +593,7 @@ pub async fn run() {
575593

576594
async fn init_agentic_system() -> anyhow::Result<(
577595
Arc<bitfun_core::agentic::coordination::ConversationCoordinator>,
596+
Arc<bitfun_core::agentic::coordination::DialogScheduler>,
578597
Arc<bitfun_core::agentic::events::EventQueue>,
579598
Arc<bitfun_core::agentic::events::EventRouter>,
580599
Arc<AIClientFactory>,
@@ -636,7 +655,7 @@ async fn init_agentic_system() -> anyhow::Result<(
636655
));
637656

638657
let coordinator = Arc::new(coordination::ConversationCoordinator::new(
639-
session_manager,
658+
session_manager.clone(),
640659
execution_engine,
641660
tool_pipeline,
642661
event_queue.clone(),
@@ -645,8 +664,20 @@ async fn init_agentic_system() -> anyhow::Result<(
645664

646665
coordination::ConversationCoordinator::set_global(coordinator.clone());
647666

667+
// Create the DialogScheduler and wire up the outcome notification channel
668+
let scheduler =
669+
coordination::DialogScheduler::new(coordinator.clone(), session_manager.clone());
670+
coordinator.set_scheduler_notifier(scheduler.outcome_sender());
671+
coordination::set_global_scheduler(scheduler.clone());
672+
648673
log::info!("Agentic system initialized");
649-
Ok((coordinator, event_queue, event_router, ai_client_factory))
674+
Ok((
675+
coordinator,
676+
scheduler,
677+
event_queue,
678+
event_router,
679+
ai_client_factory,
680+
))
650681
}
651682

652683
async fn init_function_agents(ai_client_factory: Arc<AIClientFactory>) -> anyhow::Result<()> {

src/crates/core/src/agentic/coordination/coordinator.rs

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::util::errors::{BitFunError, BitFunResult};
1818
use log::{debug, error, info, warn};
1919
use std::sync::Arc;
2020
use std::sync::OnceLock;
21+
use tokio::sync::mpsc;
2122
use tokio_util::sync::CancellationToken;
2223

2324
/// Subagent execution result
@@ -65,13 +66,26 @@ impl Drop for CancelTokenGuard {
6566
}
6667
}
6768

69+
/// Outcome of a completed dialog turn, used to notify DialogScheduler
70+
#[derive(Debug, Clone)]
71+
pub enum TurnOutcome {
72+
/// Turn completed normally
73+
Completed,
74+
/// Turn was cancelled by user
75+
Cancelled,
76+
/// Turn failed with an error
77+
Failed,
78+
}
79+
6880
/// Conversation coordinator
6981
pub struct ConversationCoordinator {
7082
session_manager: Arc<SessionManager>,
7183
execution_engine: Arc<ExecutionEngine>,
7284
tool_pipeline: Arc<ToolPipeline>,
7385
event_queue: Arc<EventQueue>,
7486
event_router: Arc<EventRouter>,
87+
/// Notifies DialogScheduler of turn outcomes; injected after construction
88+
scheduler_notify_tx: OnceLock<mpsc::Sender<(String, TurnOutcome)>>,
7589
}
7690

7791
impl ConversationCoordinator {
@@ -88,17 +102,25 @@ impl ConversationCoordinator {
88102
tool_pipeline,
89103
event_queue,
90104
event_router,
105+
scheduler_notify_tx: OnceLock::new(),
91106
}
92107
}
93108

109+
/// Inject the DialogScheduler notification channel after construction.
110+
/// Called once during app initialization after the scheduler is created.
111+
pub fn set_scheduler_notifier(&self, tx: mpsc::Sender<(String, TurnOutcome)>) {
112+
let _ = self.scheduler_notify_tx.set(tx);
113+
}
114+
94115
/// Create a new session
95116
pub async fn create_session(
96117
&self,
97118
session_name: String,
98119
agent_type: String,
99120
config: SessionConfig,
100121
) -> BitFunResult<Session> {
101-
self.create_session_with_workspace(None, session_name, agent_type, config, None).await
122+
self.create_session_with_workspace(None, session_name, agent_type, config, None)
123+
.await
102124
}
103125

104126
/// Create a new session with optional session ID
@@ -109,7 +131,8 @@ impl ConversationCoordinator {
109131
agent_type: String,
110132
config: SessionConfig,
111133
) -> BitFunResult<Session> {
112-
self.create_session_with_workspace(session_id, session_name, agent_type, config, None).await
134+
self.create_session_with_workspace(session_id, session_name, agent_type, config, None)
135+
.await
113136
}
114137

115138
/// Create a new session with optional session ID and workspace binding.
@@ -561,6 +584,7 @@ impl ConversationCoordinator {
561584
let turn_id_clone = turn_id.clone();
562585
let session_workspace_path = session.config.workspace_path.clone();
563586
let effective_agent_type_clone = effective_agent_type.clone();
587+
let scheduler_notify_tx = self.scheduler_notify_tx.get().cloned();
564588

565589
tokio::spawn(async move {
566590
// Note: Don't check cancellation here as cancel token hasn't been created yet
@@ -621,6 +645,10 @@ impl ConversationCoordinator {
621645
let _ = session_manager
622646
.update_session_state(&session_id_clone, SessionState::Idle)
623647
.await;
648+
649+
if let Some(tx) = &scheduler_notify_tx {
650+
let _ = tx.try_send((session_id_clone.clone(), TurnOutcome::Completed));
651+
}
624652
}
625653
Err(e) => {
626654
let is_cancellation = matches!(&e, BitFunError::Cancelled(_));
@@ -632,6 +660,10 @@ impl ConversationCoordinator {
632660
let _ = session_manager
633661
.update_session_state(&session_id_clone, SessionState::Idle)
634662
.await;
663+
664+
if let Some(tx) = &scheduler_notify_tx {
665+
let _ = tx.try_send((session_id_clone.clone(), TurnOutcome::Cancelled));
666+
}
635667
} else {
636668
error!("Dialog turn execution failed: {}", e);
637669

@@ -659,6 +691,10 @@ impl ConversationCoordinator {
659691
},
660692
)
661693
.await;
694+
695+
if let Some(tx) = &scheduler_notify_tx {
696+
let _ = tx.try_send((session_id_clone.clone(), TurnOutcome::Failed));
697+
}
662698
}
663699
}
664700
}
@@ -765,7 +801,9 @@ impl ConversationCoordinator {
765801
limit: usize,
766802
before_message_id: Option<&str>,
767803
) -> BitFunResult<(Vec<Message>, bool)> {
768-
self.session_manager.get_messages_paginated(session_id, limit, before_message_id).await
804+
self.session_manager
805+
.get_messages_paginated(session_id, limit, before_message_id)
806+
.await
769807
}
770808

771809
/// Subscribe to internal events
@@ -830,7 +868,9 @@ impl ConversationCoordinator {
830868
if let Some(token) = cancel_token {
831869
if token.is_cancelled() {
832870
debug!("Subagent task cancelled before execution");
833-
return Err(BitFunError::Cancelled("Subagent task has been cancelled".to_string()));
871+
return Err(BitFunError::Cancelled(
872+
"Subagent task has been cancelled".to_string(),
873+
));
834874
}
835875
}
836876

@@ -851,7 +891,9 @@ impl ConversationCoordinator {
851891
if token.is_cancelled() {
852892
debug!("Subagent task cancelled before AI call, cleaning up resources");
853893
let _ = self.cleanup_subagent_resources(&session.session_id).await;
854-
return Err(BitFunError::Cancelled("Subagent task has been cancelled".to_string()));
894+
return Err(BitFunError::Cancelled(
895+
"Subagent task has been cancelled".to_string(),
896+
));
855897
}
856898
}
857899

0 commit comments

Comments
 (0)