diff --git a/src/apps/desktop/src/api/agentic_api.rs b/src/apps/desktop/src/api/agentic_api.rs index 0205976c..a6c2767f 100644 --- a/src/apps/desktop/src/api/agentic_api.rs +++ b/src/apps/desktop/src/api/agentic_api.rs @@ -7,7 +7,7 @@ use tauri::{AppHandle, State}; use crate::api::app_state::AppState; use bitfun_core::agentic::tools::image_context::get_image_context; -use bitfun_core::agentic::coordination::{ConversationCoordinator, DialogTriggerSource}; +use bitfun_core::agentic::coordination::{ConversationCoordinator, DialogScheduler, DialogTriggerSource}; use bitfun_core::agentic::core::*; use bitfun_core::agentic::image_analysis::ImageContextData; use bitfun_core::infrastructure::get_workspace_path; @@ -186,6 +186,7 @@ pub async fn create_session( pub async fn start_dialog_turn( _app: AppHandle, coordinator: State<'_, Arc>, + scheduler: State<'_, Arc>, request: StartDialogTurnRequest, ) -> Result { let StartDialogTurnRequest { @@ -214,8 +215,8 @@ pub async fn start_dialog_turn( .await .map_err(|e| format!("Failed to start dialog turn: {}", e))?; } else { - coordinator - .start_dialog_turn( + scheduler + .submit( session_id, user_input, turn_id, diff --git a/src/apps/desktop/src/api/image_analysis_api.rs b/src/apps/desktop/src/api/image_analysis_api.rs index bf3043cb..5a3e3b97 100644 --- a/src/apps/desktop/src/api/image_analysis_api.rs +++ b/src/apps/desktop/src/api/image_analysis_api.rs @@ -1,8 +1,11 @@ //! Image Analysis API use crate::api::app_state::AppState; -use bitfun_core::agentic::coordination::{ConversationCoordinator, DialogTriggerSource}; -use bitfun_core::agentic::image_analysis::*; +use bitfun_core::agentic::coordination::{DialogScheduler, DialogTriggerSource}; +use bitfun_core::agentic::image_analysis::{ + resolve_vision_model_from_ai_config, AnalyzeImagesRequest, ImageAnalysisResult, ImageAnalyzer, + MessageEnhancer, SendEnhancedMessageRequest, +}; use log::error; use std::sync::Arc; use tauri::State; @@ -56,7 +59,7 @@ pub async fn analyze_images( #[tauri::command] pub async fn send_enhanced_message( request: SendEnhancedMessageRequest, - coordinator: State<'_, Arc>, + scheduler: State<'_, Arc>, _state: State<'_, AppState>, ) -> Result<(), String> { let enhanced_message = MessageEnhancer::enhance_with_image_analysis( @@ -65,10 +68,10 @@ pub async fn send_enhanced_message( &request.other_contexts, ); - let _stream = coordinator - .start_dialog_turn( + scheduler + .submit( request.session_id.clone(), - enhanced_message.clone(), + enhanced_message, Some(request.dialog_turn_id.clone()), request.agent_type.clone(), DialogTriggerSource::DesktopApi, diff --git a/src/apps/desktop/src/api/terminal_api.rs b/src/apps/desktop/src/api/terminal_api.rs index 039358dc..791f07d4 100644 --- a/src/apps/desktop/src/api/terminal_api.rs +++ b/src/apps/desktop/src/api/terminal_api.rs @@ -10,6 +10,7 @@ use tokio::sync::Mutex; use bitfun_core::service::runtime::RuntimeManager; use bitfun_core::service::terminal::{ AcknowledgeRequest as CoreAcknowledgeRequest, CloseSessionRequest as CoreCloseSessionRequest, + CommandCompletionReason as CoreCommandCompletionReason, CreateSessionRequest as CoreCreateSessionRequest, ExecuteCommandRequest as CoreExecuteCommandRequest, ExecuteCommandResponse as CoreExecuteCommandResponse, @@ -198,6 +199,7 @@ pub struct ExecuteCommandResponse { pub command_id: String, pub output: String, pub exit_code: Option, + pub completion_reason: String, } impl From for ExecuteCommandResponse { @@ -207,6 +209,10 @@ impl From for ExecuteCommandResponse { command_id: resp.command_id, output: resp.output, exit_code: resp.exit_code, + completion_reason: match resp.completion_reason { + CoreCommandCompletionReason::Completed => "completed".to_string(), + CoreCommandCompletionReason::TimedOut => "timedOut".to_string(), + }, } } } @@ -230,6 +236,10 @@ pub struct GetHistoryResponse { pub session_id: String, pub data: String, pub history_size: usize, + /// PTY column count at the time history was captured. + pub cols: u16, + /// PTY row count at the time history was captured. + pub rows: u16, } impl From for GetHistoryResponse { @@ -238,6 +248,8 @@ impl From for GetHistoryResponse { session_id: resp.session_id, data: resp.data, history_size: resp.history_size, + cols: resp.cols, + rows: resp.rows, } } } @@ -494,6 +506,8 @@ pub async fn terminal_get_history( session_id: response.session_id, data: response.data, history_size: response.history_size, + cols: response.cols, + rows: response.rows, }) } diff --git a/src/apps/desktop/src/lib.rs b/src/apps/desktop/src/lib.rs index 437dd7dc..4eded2ec 100644 --- a/src/apps/desktop/src/lib.rs +++ b/src/apps/desktop/src/lib.rs @@ -50,6 +50,12 @@ pub struct CoordinatorState { pub coordinator: Arc, } +/// Dialog scheduler state (primary entry point for user messages) +#[derive(Clone)] +pub struct SchedulerState { + pub scheduler: Arc, +} + /// Tauri application entry point #[cfg_attr(mobile, tauri::mobile_entry_point)] pub async fn run() { @@ -72,7 +78,7 @@ pub async fn run() { return; } - let (coordinator, event_queue, event_router, ai_client_factory) = + let (coordinator, scheduler, event_queue, event_router, ai_client_factory) = match init_agentic_system().await { Ok(state) => state, Err(e) => { @@ -98,6 +104,10 @@ pub async fn run() { coordinator: coordinator.clone(), }; + let scheduler_state = SchedulerState { + scheduler: scheduler.clone(), + }; + let terminal_state = api::terminal_api::TerminalState::new(); let path_manager = get_path_manager_arc(); @@ -149,8 +159,10 @@ pub async fn run() { }) .manage(app_state) .manage(coordinator_state) + .manage(scheduler_state) .manage(path_manager) .manage(coordinator) + .manage(scheduler) .manage(terminal_state) .setup(move |app| { logging::register_runtime_log_state(startup_log_level, session_log_dir.clone()); @@ -160,14 +172,13 @@ pub async fn run() { // so the primary candidate is "mobile-web/dist". Additional fallbacks // handle legacy or non-standard bundle layouts. { - let candidates = [ - "mobile-web/dist", - "mobile-web", - "dist", - ]; + let candidates = ["mobile-web/dist", "mobile-web", "dist"]; let mut found = false; for candidate in &candidates { - if let Ok(p) = app.path().resolve(candidate, tauri::path::BaseDirectory::Resource) { + if let Ok(p) = app + .path() + .resolve(candidate, tauri::path::BaseDirectory::Resource) + { if p.join("index.html").exists() { log::info!("Found bundled mobile-web at: {}", p.display()); api::remote_connect_api::set_mobile_web_resource_path(p); @@ -180,9 +191,16 @@ pub async fn run() { // Last resort: scan the resource root for any index.html if let Ok(res_dir) = app.path().resource_dir() { for sub in &["mobile-web/dist", "mobile-web", "dist", ""] { - let p = if sub.is_empty() { res_dir.clone() } else { res_dir.join(sub) }; + let p = if sub.is_empty() { + res_dir.clone() + } else { + res_dir.join(sub) + }; if p.join("index.html").exists() { - log::info!("Found mobile-web via resource root scan: {}", p.display()); + log::info!( + "Found mobile-web via resource root scan: {}", + p.display() + ); api::remote_connect_api::set_mobile_web_resource_path(p); break; } @@ -575,6 +593,7 @@ pub async fn run() { async fn init_agentic_system() -> anyhow::Result<( Arc, + Arc, Arc, Arc, Arc, @@ -636,7 +655,7 @@ async fn init_agentic_system() -> anyhow::Result<( )); let coordinator = Arc::new(coordination::ConversationCoordinator::new( - session_manager, + session_manager.clone(), execution_engine, tool_pipeline, event_queue.clone(), @@ -645,8 +664,20 @@ async fn init_agentic_system() -> anyhow::Result<( coordination::ConversationCoordinator::set_global(coordinator.clone()); + // Create the DialogScheduler and wire up the outcome notification channel + let scheduler = + coordination::DialogScheduler::new(coordinator.clone(), session_manager.clone()); + coordinator.set_scheduler_notifier(scheduler.outcome_sender()); + coordination::set_global_scheduler(scheduler.clone()); + log::info!("Agentic system initialized"); - Ok((coordinator, event_queue, event_router, ai_client_factory)) + Ok(( + coordinator, + scheduler, + event_queue, + event_router, + ai_client_factory, + )) } async fn init_function_agents(ai_client_factory: Arc) -> anyhow::Result<()> { diff --git a/src/crates/core/src/agentic/coordination/coordinator.rs b/src/crates/core/src/agentic/coordination/coordinator.rs index 250a1f51..bbe6f541 100644 --- a/src/crates/core/src/agentic/coordination/coordinator.rs +++ b/src/crates/core/src/agentic/coordination/coordinator.rs @@ -18,6 +18,7 @@ use crate::util::errors::{BitFunError, BitFunResult}; use log::{debug, error, info, warn}; use std::sync::Arc; use std::sync::OnceLock; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; /// Subagent execution result @@ -65,6 +66,17 @@ impl Drop for CancelTokenGuard { } } +/// Outcome of a completed dialog turn, used to notify DialogScheduler +#[derive(Debug, Clone)] +pub enum TurnOutcome { + /// Turn completed normally + Completed, + /// Turn was cancelled by user + Cancelled, + /// Turn failed with an error + Failed, +} + /// Conversation coordinator pub struct ConversationCoordinator { session_manager: Arc, @@ -72,6 +84,8 @@ pub struct ConversationCoordinator { tool_pipeline: Arc, event_queue: Arc, event_router: Arc, + /// Notifies DialogScheduler of turn outcomes; injected after construction + scheduler_notify_tx: OnceLock>, } impl ConversationCoordinator { @@ -88,9 +102,16 @@ impl ConversationCoordinator { tool_pipeline, event_queue, event_router, + scheduler_notify_tx: OnceLock::new(), } } + /// Inject the DialogScheduler notification channel after construction. + /// Called once during app initialization after the scheduler is created. + pub fn set_scheduler_notifier(&self, tx: mpsc::Sender<(String, TurnOutcome)>) { + let _ = self.scheduler_notify_tx.set(tx); + } + /// Create a new session pub async fn create_session( &self, @@ -98,7 +119,8 @@ impl ConversationCoordinator { agent_type: String, config: SessionConfig, ) -> BitFunResult { - self.create_session_with_workspace(None, session_name, agent_type, config, None).await + self.create_session_with_workspace(None, session_name, agent_type, config, None) + .await } /// Create a new session with optional session ID @@ -109,7 +131,8 @@ impl ConversationCoordinator { agent_type: String, config: SessionConfig, ) -> BitFunResult { - self.create_session_with_workspace(session_id, session_name, agent_type, config, None).await + self.create_session_with_workspace(session_id, session_name, agent_type, config, None) + .await } /// Create a new session with optional session ID and workspace binding. @@ -561,6 +584,7 @@ impl ConversationCoordinator { let turn_id_clone = turn_id.clone(); let session_workspace_path = session.config.workspace_path.clone(); let effective_agent_type_clone = effective_agent_type.clone(); + let scheduler_notify_tx = self.scheduler_notify_tx.get().cloned(); tokio::spawn(async move { // Note: Don't check cancellation here as cancel token hasn't been created yet @@ -621,6 +645,10 @@ impl ConversationCoordinator { let _ = session_manager .update_session_state(&session_id_clone, SessionState::Idle) .await; + + if let Some(tx) = &scheduler_notify_tx { + let _ = tx.try_send((session_id_clone.clone(), TurnOutcome::Completed)); + } } Err(e) => { let is_cancellation = matches!(&e, BitFunError::Cancelled(_)); @@ -632,6 +660,10 @@ impl ConversationCoordinator { let _ = session_manager .update_session_state(&session_id_clone, SessionState::Idle) .await; + + if let Some(tx) = &scheduler_notify_tx { + let _ = tx.try_send((session_id_clone.clone(), TurnOutcome::Cancelled)); + } } else { error!("Dialog turn execution failed: {}", e); @@ -659,6 +691,10 @@ impl ConversationCoordinator { }, ) .await; + + if let Some(tx) = &scheduler_notify_tx { + let _ = tx.try_send((session_id_clone.clone(), TurnOutcome::Failed)); + } } } } @@ -765,7 +801,9 @@ impl ConversationCoordinator { limit: usize, before_message_id: Option<&str>, ) -> BitFunResult<(Vec, bool)> { - self.session_manager.get_messages_paginated(session_id, limit, before_message_id).await + self.session_manager + .get_messages_paginated(session_id, limit, before_message_id) + .await } /// Subscribe to internal events @@ -830,7 +868,9 @@ impl ConversationCoordinator { if let Some(token) = cancel_token { if token.is_cancelled() { debug!("Subagent task cancelled before execution"); - return Err(BitFunError::Cancelled("Subagent task has been cancelled".to_string())); + return Err(BitFunError::Cancelled( + "Subagent task has been cancelled".to_string(), + )); } } @@ -851,7 +891,9 @@ impl ConversationCoordinator { if token.is_cancelled() { debug!("Subagent task cancelled before AI call, cleaning up resources"); let _ = self.cleanup_subagent_resources(&session.session_id).await; - return Err(BitFunError::Cancelled("Subagent task has been cancelled".to_string())); + return Err(BitFunError::Cancelled( + "Subagent task has been cancelled".to_string(), + )); } } diff --git a/src/crates/core/src/agentic/coordination/mod.rs b/src/crates/core/src/agentic/coordination/mod.rs index 16297236..44cc0406 100644 --- a/src/crates/core/src/agentic/coordination/mod.rs +++ b/src/crates/core/src/agentic/coordination/mod.rs @@ -3,10 +3,12 @@ //! Top-level component that integrates all subsystems pub mod coordinator; +pub mod scheduler; pub mod state_manager; pub use coordinator::*; +pub use scheduler::*; pub use state_manager::*; pub use coordinator::get_global_coordinator; - +pub use scheduler::get_global_scheduler; diff --git a/src/crates/core/src/agentic/coordination/scheduler.rs b/src/crates/core/src/agentic/coordination/scheduler.rs new file mode 100644 index 00000000..68532b1b --- /dev/null +++ b/src/crates/core/src/agentic/coordination/scheduler.rs @@ -0,0 +1,390 @@ +//! Dialog scheduler +//! +//! Message queue manager that automatically dispatches queued messages +//! when the target session becomes idle. +//! +//! Acts as the primary entry point for all user-facing message submissions, +//! wrapping ConversationCoordinator with: +//! - Per-session FIFO queue (max 20 messages) +//! - 1-second debounce after session becomes idle (resets on each new incoming message) +//! - Automatic message merging when queue has multiple entries +//! - Queue cleared on cancel or error + +use super::coordinator::{ConversationCoordinator, DialogTriggerSource, TurnOutcome}; +use crate::agentic::core::SessionState; +use crate::agentic::session::SessionManager; +use dashmap::DashMap; +use log::{debug, info, warn}; +use std::sync::Arc; +use std::sync::OnceLock; +use std::time::SystemTime; +use tokio::sync::mpsc; +use tokio::task::AbortHandle; +use tokio::time::Duration; + +const MAX_QUEUE_DEPTH: usize = 20; +const DEBOUNCE_DELAY: Duration = Duration::from_secs(1); + +/// A message waiting to be dispatched to the coordinator +#[derive(Debug)] +pub struct QueuedTurn { + pub user_input: String, + pub turn_id: Option, + pub agent_type: String, + pub trigger_source: DialogTriggerSource, + #[allow(dead_code)] + pub enqueued_at: SystemTime, +} + +/// Message queue manager for dialog turns. +/// +/// All user-facing callers (frontend Tauri commands, remote server, bot router) +/// should submit messages through this scheduler instead of calling +/// ConversationCoordinator directly. +pub struct DialogScheduler { + coordinator: Arc, + session_manager: Arc, + /// Per-session FIFO message queues + queues: Arc>>, + /// Per-session pending debounce task handles (present = debounce window active) + debounce_handles: Arc>, + /// Cloneable sender given to ConversationCoordinator for turn outcome notifications + outcome_tx: mpsc::Sender<(String, TurnOutcome)>, +} + +impl DialogScheduler { + /// Create a new DialogScheduler and start its background outcome handler. + /// + /// The returned `Arc` should be stored globally. + /// Call `coordinator.set_scheduler_notifier(scheduler.outcome_sender())` + /// immediately after to wire up the notification channel. + pub fn new( + coordinator: Arc, + session_manager: Arc, + ) -> Arc { + let (outcome_tx, outcome_rx) = mpsc::channel(128); + + let scheduler = Arc::new(Self { + coordinator, + session_manager, + queues: Arc::new(DashMap::new()), + debounce_handles: Arc::new(DashMap::new()), + outcome_tx, + }); + + let scheduler_for_handler = Arc::clone(&scheduler); + tokio::spawn(async move { + scheduler_for_handler.run_outcome_handler(outcome_rx).await; + }); + + scheduler + } + + /// Returns a sender to give to ConversationCoordinator for turn outcome notifications. + pub fn outcome_sender(&self) -> mpsc::Sender<(String, TurnOutcome)> { + self.outcome_tx.clone() + } + + /// Submit a user message for a session. + /// + /// - Session idle, no debounce window active → dispatched immediately. + /// - Session idle, debounce window active (collecting messages) → queued, timer reset. + /// - Session processing → queued (up to MAX_QUEUE_DEPTH). + /// - Session error → queue cleared, dispatched immediately. + /// + /// Returns `Err(String)` if the queue is full or the coordinator returns an error. + pub async fn submit( + &self, + session_id: String, + user_input: String, + turn_id: Option, + agent_type: String, + trigger_source: DialogTriggerSource, + ) -> Result<(), String> { + let state = self + .session_manager + .get_session(&session_id) + .map(|s| s.state.clone()); + + match state { + None => { + self.coordinator + .start_dialog_turn( + session_id, + user_input, + turn_id, + agent_type, + trigger_source, + ) + .await + .map_err(|e| e.to_string()) + } + + Some(SessionState::Error { .. }) => { + self.clear_queue_and_debounce(&session_id); + self.coordinator + .start_dialog_turn( + session_id, + user_input, + turn_id, + agent_type, + trigger_source, + ) + .await + .map_err(|e| e.to_string()) + } + + Some(SessionState::Idle) => { + let in_debounce = self.debounce_handles.contains_key(&session_id); + let queue_non_empty = self + .queues + .get(&session_id) + .map(|q| !q.is_empty()) + .unwrap_or(false); + + if in_debounce || queue_non_empty { + self.enqueue( + &session_id, + user_input, + turn_id, + agent_type, + trigger_source, + )?; + self.schedule_debounce(session_id); + Ok(()) + } else { + self.coordinator + .start_dialog_turn( + session_id, + user_input, + turn_id, + agent_type, + trigger_source, + ) + .await + .map_err(|e| e.to_string()) + } + } + + Some(SessionState::Processing { .. }) => { + self.enqueue( + &session_id, + user_input, + turn_id, + agent_type, + trigger_source, + )?; + Ok(()) + } + } + } + + /// Number of messages currently queued for a session. + pub fn queue_depth(&self, session_id: &str) -> usize { + self.queues.get(session_id).map(|q| q.len()).unwrap_or(0) + } + + // ── Private helpers ────────────────────────────────────────────────────── + + fn enqueue( + &self, + session_id: &str, + user_input: String, + turn_id: Option, + agent_type: String, + trigger_source: DialogTriggerSource, + ) -> Result<(), String> { + let queue_len = self.queues.get(session_id).map(|q| q.len()).unwrap_or(0); + + if queue_len >= MAX_QUEUE_DEPTH { + warn!( + "Queue full, rejecting message: session_id={}, max={}", + session_id, MAX_QUEUE_DEPTH + ); + return Err(format!( + "Message queue full for session {} (max {} messages)", + session_id, MAX_QUEUE_DEPTH + )); + } + + self.queues + .entry(session_id.to_string()) + .or_default() + .push_back(QueuedTurn { + user_input, + turn_id, + agent_type, + trigger_source, + enqueued_at: SystemTime::now(), + }); + + let new_len = self.queues.get(session_id).map(|q| q.len()).unwrap_or(0); + debug!( + "Message queued: session_id={}, queue_depth={}", + session_id, new_len + ); + Ok(()) + } + + fn clear_queue_and_debounce(&self, session_id: &str) { + if let Some((_, handle)) = self.debounce_handles.remove(session_id) { + handle.abort(); + } + if let Some(mut queue) = self.queues.get_mut(session_id) { + let count = queue.len(); + queue.clear(); + if count > 0 { + info!( + "Cleared {} queued messages: session_id={}", + count, session_id + ); + } + } + } + + /// Start (or restart) the 1-second debounce timer for a session. + /// When the timer fires, all queued messages are merged and dispatched. + fn schedule_debounce(&self, session_id: String) { + // Cancel the existing timer (if any) + if let Some((_, old)) = self.debounce_handles.remove(&session_id) { + old.abort(); + } + + let queues = Arc::clone(&self.queues); + let coordinator = Arc::clone(&self.coordinator); + let debounce_handles = Arc::clone(&self.debounce_handles); + let session_id_clone = session_id.clone(); + + let join_handle = tokio::spawn(async move { + tokio::time::sleep(DEBOUNCE_DELAY).await; + + // Remove our own handle - we are now executing + debounce_handles.remove(&session_id_clone); + + // Drain all queued messages + let messages: Vec = { + let mut entry = queues.entry(session_id_clone.clone()).or_default(); + entry.drain(..).collect() + }; + + if messages.is_empty() { + return; + } + + info!( + "Dispatching {} queued message(s) after debounce: session_id={}", + messages.len(), + session_id_clone + ); + + let (merged_input, turn_id, agent_type, trigger_source) = + merge_messages(messages); + + if let Err(e) = coordinator + .start_dialog_turn( + session_id_clone.clone(), + merged_input, + turn_id, + agent_type, + trigger_source, + ) + .await + { + warn!( + "Failed to dispatch queued messages: session_id={}, error={}", + session_id_clone, e + ); + } + }); + + // Store abort handle; drop the JoinHandle (task is detached but remains abortable) + self.debounce_handles + .insert(session_id, join_handle.abort_handle()); + } + + /// Background loop that receives turn outcome notifications from the coordinator. + async fn run_outcome_handler(&self, mut outcome_rx: mpsc::Receiver<(String, TurnOutcome)>) { + while let Some((session_id, outcome)) = outcome_rx.recv().await { + match outcome { + TurnOutcome::Completed => { + let has_queued = self + .queues + .get(&session_id) + .map(|q| !q.is_empty()) + .unwrap_or(false); + + if has_queued { + debug!( + "Turn completed, queue non-empty, starting debounce: session_id={}", + session_id + ); + self.schedule_debounce(session_id); + } + } + TurnOutcome::Cancelled => { + debug!("Turn cancelled, clearing queue: session_id={}", session_id); + self.clear_queue_and_debounce(&session_id); + } + TurnOutcome::Failed => { + debug!("Turn failed, clearing queue: session_id={}", session_id); + self.clear_queue_and_debounce(&session_id); + } + } + } + } +} + +/// Merge multiple queued turns into a single user input string. +/// +/// Single message → returned as-is (no wrapping). +/// Multiple messages → formatted as: +/// ```text +/// [Queued messages while agent was busy] +/// +/// --- +/// Queued #1 +/// +/// +/// --- +/// Queued #2 +/// +/// ``` +fn merge_messages(messages: Vec) -> (String, Option, String, DialogTriggerSource) { + if messages.len() == 1 { + let m = messages.into_iter().next().unwrap(); + return ( + m.user_input, + m.turn_id, + m.agent_type, + m.trigger_source, + ); + } + + let agent_type = messages[0].agent_type.clone(); + let trigger_source = messages[0].trigger_source; + + let entries: Vec = messages + .iter() + .enumerate() + .map(|(i, m)| format!("---\nQueued #{}\n{}", i + 1, m.user_input)) + .collect(); + + let merged = format!( + "[Queued messages while agent was busy]\n\n{}", + entries.join("\n\n") + ); + + (merged, None, agent_type, trigger_source) +} + +// ── Global instance ────────────────────────────────────────────────────────── + +static GLOBAL_SCHEDULER: OnceLock> = OnceLock::new(); + +pub fn get_global_scheduler() -> Option> { + GLOBAL_SCHEDULER.get().cloned() +} + +pub fn set_global_scheduler(scheduler: Arc) { + let _ = GLOBAL_SCHEDULER.set(scheduler); +} diff --git a/src/crates/core/src/agentic/tools/implementations/bash_tool.rs b/src/crates/core/src/agentic/tools/implementations/bash_tool.rs index 372a526e..e67b9e8a 100644 --- a/src/crates/core/src/agentic/tools/implementations/bash_tool.rs +++ b/src/crates/core/src/agentic/tools/implementations/bash_tool.rs @@ -11,16 +11,17 @@ use async_trait::async_trait; use futures::StreamExt; use log::{debug, error}; use serde_json::{json, Value}; -use std::time::Instant; +use std::time::{Duration, Instant}; use terminal_core::shell::{ShellDetector, ShellType}; use terminal_core::{ - CommandStreamEvent, ExecuteCommandRequest, SendCommandRequest, SignalRequest, TerminalApi, - TerminalBindingOptions, TerminalSessionBinding, + CommandCompletionReason, CommandStreamEvent, ExecuteCommandRequest, SendCommandRequest, + SignalRequest, TerminalApi, TerminalBindingOptions, TerminalSessionBinding, }; use tokio::io::AsyncWriteExt; use tool_runtime::util::ansi_cleaner::strip_ansi; const MAX_OUTPUT_LENGTH: usize = 30000; +const INTERRUPT_OUTPUT_DRAIN_MS: u64 = 500; const BANNED_COMMANDS: &[&str] = &[ "alias", @@ -110,6 +111,7 @@ impl BashTool { session_id: &str, output_text: &str, interrupted: bool, + timed_out: bool, exit_code: i32, ) -> String { let mut result_string = String::new(); @@ -136,7 +138,11 @@ impl BashTool { } // Interruption notice - if interrupted { + if timed_out { + result_string.push_str( + "Command timed out before completion. Partial output, if any, is included above.", + ); + } else if interrupted { result_string.push_str( "Command was canceled by the user. ASK THE USER what they would like to do next." ); @@ -186,6 +192,7 @@ Usage notes: - If the output exceeds {MAX_OUTPUT_LENGTH} characters, output will be truncated before being returned to you. - You can use the `run_in_background` parameter to run the command in a new dedicated background terminal session. The tool returns the background session ID immediately without waiting for the command to finish. Only use this for long-running processes (e.g., dev servers, watchers) where you don't need the output right away. You do not need to append '&' to the command. NOTE: `timeout_ms` is ignored when `run_in_background` is true. - Each result includes a `` tag identifying the terminal session. The persistent shell session ID remains constant throughout the entire conversation; background sessions each have their own unique ID. + - The output may include the command echo and/or the shell prompt (e.g., `PS C:\path>`). Do not treat these as part of the command's actual result. - Avoid using this tool with the `find`, `grep`, `cat`, `head`, `tail`, `sed`, `awk`, or `echo` commands, unless explicitly instructed or when these commands are truly necessary for the task. Instead, always prefer using the dedicated tools for these commands: - File search: Use Glob (NOT find or ls) @@ -458,16 +465,40 @@ Usage notes: let mut accumulated_output = String::new(); let mut final_exit_code: Option = None; let mut was_interrupted = false; + let mut timed_out = false; + let mut interrupt_drain_deadline: Option = None; // Get event system for sending progress let event_system = get_global_event_system(); - while let Some(event) = stream.next().await { + loop { + let next_event = if let Some(deadline) = interrupt_drain_deadline { + let now = tokio::time::Instant::now(); + if now >= deadline { + break; + } + + match tokio::time::timeout_at(deadline, stream.next()).await { + Ok(event) => event, + Err(_) => break, + } + } else { + stream.next().await + }; + + let Some(event) = next_event else { + break; + }; + // Check cancellation request if let Some(token) = &context.cancellation_token { if token.is_cancelled() && !was_interrupted { debug!("Bash tool received cancellation request, sending interrupt signal, tool_id: {}", tool_use_id); was_interrupted = true; + interrupt_drain_deadline = Some( + tokio::time::Instant::now() + + Duration::from_millis(INTERRUPT_OUTPUT_DRAIN_MS), + ); let _ = terminal_api .signal(SignalRequest { @@ -484,7 +515,6 @@ Usage notes: { final_exit_code = Some(130); } - break; } } @@ -514,14 +544,16 @@ Usage notes: CommandStreamEvent::Completed { exit_code, total_output, + completion_reason, } => { debug!( "Bash command completed, exit_code: {:?}, tool_id: {}", exit_code, tool_use_id ); - final_exit_code = exit_code; + final_exit_code = exit_code.or(final_exit_code); + timed_out = completion_reason == CommandCompletionReason::TimedOut; - if matches!(exit_code, Some(130) | Some(-1073741510)) { + if !timed_out && matches!(exit_code, Some(130) | Some(-1073741510)) { was_interrupted = true; } @@ -552,6 +584,7 @@ Usage notes: "output": accumulated_output, "exit_code": final_exit_code, "interrupted": was_interrupted, + "timed_out": timed_out, "working_directory": primary_cwd, "execution_time_ms": execution_time_ms, "terminal_session_id": primary_session_id, @@ -561,6 +594,7 @@ Usage notes: &primary_session_id, &accumulated_output, was_interrupted, + timed_out, final_exit_code.unwrap_or(-1), ); diff --git a/src/crates/core/src/service/remote_connect/bot/command_router.rs b/src/crates/core/src/service/remote_connect/bot/command_router.rs index e9b51591..c9585153 100644 --- a/src/crates/core/src/service/remote_connect/bot/command_router.rs +++ b/src/crates/core/src/service/remote_connect/bot/command_router.rs @@ -207,7 +207,10 @@ Available commands: /help - Show this help message"; pub fn paired_success_message() -> String { - format!("Pairing successful! BitFun is now connected.\n\n{}", HELP_MESSAGE) + format!( + "Pairing successful! BitFun is now connected.\n\n{}", + HELP_MESSAGE + ) } pub fn main_menu_actions() -> Vec { @@ -482,10 +485,8 @@ async fn handle_switch_workspace(state: &mut BotChatState) -> HandleResult { // global path only if the bot has not yet selected one. Using || across // both sources simultaneously can mark two different workspaces as // [current] when the desktop and the bot session are on different paths. - let effective_current: Option<&str> = state - .current_workspace - .as_deref() - .or(current_ws.as_deref()); + let effective_current: Option<&str> = + state.current_workspace.as_deref().or(current_ws.as_deref()); let mut text = String::from("Select a workspace:\n\n"); let mut options: Vec<(String, String)> = Vec::new(); @@ -654,7 +655,11 @@ async fn handle_new_session(state: &mut BotChatState, agent_type: &str) -> Handl Ok(session) => { let session_id = session.session_id.clone(); state.current_session_id = Some(session_id.clone()); - let label = if agent_type == "Cowork" { "cowork" } else { "coding" }; + let label = if agent_type == "Cowork" { + "cowork" + } else { + "coding" + }; let workspace = ws_path.as_deref().unwrap_or("(unknown)"); HandleResult { reply: format!( @@ -691,10 +696,18 @@ async fn handle_number_selection(state: &mut BotChatState, n: usize) -> HandleRe let (path, name) = options[n - 1].clone(); select_workspace(state, &path, &name).await } - Some(PendingAction::SelectSession { options, page, has_more }) => { + Some(PendingAction::SelectSession { + options, + page, + has_more, + }) => { if n < 1 || n > options.len() { let max = options.len(); - state.pending_action = Some(PendingAction::SelectSession { options, page, has_more }); + state.pending_action = Some(PendingAction::SelectSession { + options, + page, + has_more, + }); return HandleResult { reply: format!("Invalid selection. Please enter 1-{max}."), actions: vec![], @@ -791,7 +804,11 @@ async fn count_workspace_sessions(workspace_path: &str) -> usize { Ok(m) => m, Err(_) => return 0, }; - conv_mgr.get_session_list().await.map(|v| v.len()).unwrap_or(0) + conv_mgr + .get_session_list() + .await + .map(|v| v.len()) + .unwrap_or(0) } fn build_workspace_switched_reply(name: &str, session_count: usize) -> String { diff --git a/src/crates/core/src/service/remote_connect/remote_server.rs b/src/crates/core/src/service/remote_connect/remote_server.rs index 0ebad07e..a9a370f0 100644 --- a/src/crates/core/src/service/remote_connect/remote_server.rs +++ b/src/crates/core/src/service/remote_connect/remote_server.rs @@ -1227,9 +1227,11 @@ impl RemoteServer { let ws_path = get_workspace_path(); let (has_workspace, path_str, project_name, git_branch) = if let Some(ref p) = ws_path { let name = p.file_name().map(|n| n.to_string_lossy().to_string()); - let branch = git2::Repository::open(p) - .ok() - .and_then(|repo| repo.head().ok().and_then(|h| h.shorthand().map(String::from))); + let branch = git2::Repository::open(p).ok().and_then(|repo| { + repo.head() + .ok() + .and_then(|h| h.shorthand().map(String::from)) + }); (true, Some(p.to_string_lossy().to_string()), name, branch) } else { (false, None, None, None) @@ -1385,9 +1387,7 @@ impl RemoteServer { let ws_service = match get_global_workspace_service() { Some(s) => s, None => { - return RemoteResponse::RecentWorkspaces { - workspaces: vec![], - }; + return RemoteResponse::RecentWorkspaces { workspaces: vec![] }; } }; let recent = ws_service.get_recent_workspaces().await; @@ -1399,7 +1399,9 @@ impl RemoteServer { last_opened: w.last_accessed.to_rfc3339(), }) .collect(); - RemoteResponse::RecentWorkspaces { workspaces: entries } + RemoteResponse::RecentWorkspaces { + workspaces: entries, + } } RemoteCommand::SetWorkspace { path } => { let ws_service = match get_global_workspace_service() { diff --git a/src/crates/core/src/service/terminal/docs/STREAMING_OUTPUT_COLLECTION.md b/src/crates/core/src/service/terminal/docs/STREAMING_OUTPUT_COLLECTION.md new file mode 100644 index 00000000..6ab15494 --- /dev/null +++ b/src/crates/core/src/service/terminal/docs/STREAMING_OUTPUT_COLLECTION.md @@ -0,0 +1,272 @@ +# Streaming Output Collection Timing + +This document describes how the terminal service collects command output during +streaming execution, covering the OSC 633 state machine, the polling-based +completion detection, and the Windows ConPTY workarounds. + +## Architecture Overview + +Output collection involves two independent layers: + +``` +┌──────────────────────────────────────────────────────────────┐ +│ bash_tool.rs │ +│ Consumes CommandStream, accumulates output for tool result │ +└────────────────────────┬─────────────────────────────────────┘ + │ CommandStream (mpsc channel) + │ Started / Output / Completed / Error +┌────────────────────────┴─────────────────────────────────────┐ +│ manager.rs — execute_command_stream_with_options │ +│ Layer 2: Polls integration every 50ms, detects completion │ +│ by output stabilization, sends Completed with │ +│ explicit completion_reason │ +└────────────────────────┬─────────────────────────────────────┘ + │ reads integration.get_output().len() + │ reads integration.state() +┌────────────────────────┴─────────────────────────────────────┐ +│ integration.rs — ShellIntegration │ +│ Layer 1: Parses OSC 633 sequences from PTY data stream, │ +│ drives CommandState machine, accumulates output_buffer │ +└────────────────────────┬─────────────────────────────────────┘ + │ raw PTY data (process_data calls) +┌────────────────────────┴─────────────────────────────────────┐ +│ PTY process (ConPTY on Windows, unix PTY on Linux/macOS) │ +└──────────────────────────────────────────────────────────────┘ +``` + +## Layer 1: ShellIntegration State Machine + +### OSC 633 Sequence Lifecycle + +A single command execution produces the following OSC 633 sequence: + +``` +633;A ─→ 633;B ─→ (user types) ─→ 633;E ─→ 633;C ─→ [output] ─→ 633;D ─→ 633;A ─→ ... + │ │ │ │ │ │ + │ │ │ │ │ └ next prompt + │ │ │ │ └ CommandFinished + │ │ │ └ CommandExecutionStart (Enter pressed) + │ │ └ CommandLine (command text recorded) + │ └ CommandInputStart (prompt ended, cursor waiting for input) + └ PromptStart (shell begins rendering prompt) +``` + +### CommandState Transitions + +``` + ┌──────────┐ + ─────────────│ Idle │ (initial state) + └────┬─────┘ + │ 633;A + ┌────▼─────┐ + │ Prompt │ + └────┬─────┘ + │ 633;B + ┌────▼─────┐ + │ Input │ + └────┬─────┘ + │ 633;C + ┌────▼──────┐ + ┌──────│ Executing │ ← output_buffer.clear() + │ └────┬──────┘ + │ │ 633;D + │ ┌────▼──────┐ + │ │ Finished │ ← post_command_collecting = true + │ └────┬──────┘ + │ │ 633;A + │ ┌────▼─────┐ + │ │ Prompt │ ← begin ConPTY reorder detection + │ └────┬─────┘ + │ │ 633;B + │ ┌────▼─────┐ + │ │ Input │ ← ConPTY reorder detection resolved + │ └────┬─────┘ + │ │ 633;C (next command) + └───────────┘ +``` + +### Output Collection Rules + +`should_collect()` returns `true` when either condition is met: + +1. **State-based**: `state` is `Executing` or `Finished` +2. **Flag-based**: `post_command_collecting` is `true` + +Within `process_data()`, plain text (non-OSC content) is handled as follows: + +- **Before OSC sequences**: Accumulated in a local `plain_output` buffer +- **At `should_flush` sequences** (CommandFinished, PromptStart): If `should_collect()`, + flush `plain_output` into `output_buffer` before the state transition +- **At end of data chunk**: If `should_collect()`, append remaining `plain_output` to + `output_buffer`; otherwise discard + +### Key Flags + +| Flag | Set `true` | Set `false` | Purpose | +|------|-----------|------------|---------| +| `post_command_collecting` | CommandFinished (D) | PromptStart (A), CommandExecutionStart (C), or ConPTY reorder detection at CommandInputStart (B) | Keep collecting late ConPTY output after state leaves Executing/Finished | +| `detecting_conpty_reorder` | PromptStart (A) when `post_command_collecting` was true | CommandInputStart (B), CommandExecutionStart (C) | Detect whether ConPTY reordered sequences ahead of rendered output | +| `command_just_finished` | CommandFinished (D) | Cleared by manager after reading | One-shot flag so manager catches Finished even if state already moved to Prompt/Input | + +## Layer 2: Manager Polling Loop + +`execute_command_stream_with_options` spawns a task that polls +`ShellIntegration` every **50ms** and decides when the command stream is +complete. + +### Completion Decision Logic + +``` +poll every 50ms: + read state, output, output_len + + if timeout reached: + send SIGINT immediately + keep polling during a short interrupt grace window + if command still has not settled when grace expires: + COMPLETE with completion_reason = TimedOut + + if command_just_finished and no finished_exit_code yet: + record finished_exit_code, reset idle counter + + match state: + Finished: + if first time seeing Finished: + record finished_exit_code + else: + if output_len == last_output_len: + idle++ → if idle >= 4 (200ms): COMPLETE ✓ + else: + reset idle + + Idle / Prompt / Input: + if finished_exit_code is set: + if output_len == last_output_len: + idle++ → if idle >= 10 (500ms): COMPLETE ✓ + else: + reset idle + else (no finish signal): + if output_len == last_output_len: + idle++ → if idle >= 20 (1000ms): COMPLETE ✓ (fallback) + else: + reset idle + + Executing: + reset all counters (still running) +``` + +`Completed` now carries an explicit `completion_reason`: + +- `Completed` - command reached a normal terminal state +- `TimedOut` - timeout fired, terminal sent `SIGINT`, and the stream returned the best available output snapshot + +### Stabilization Thresholds + +| Condition | Idle polls required | Wall time | +|-----------|-------------------|-----------| +| State = Finished, output stable | 4 | 200ms | +| State = Prompt/Input, has finished_exit_code | 10 | 500ms | +| No finish signal (fallback) | 20 | 1000ms | + +The longer 500ms window for Prompt/Input exists specifically because ConPTY may +deliver rendered output **after** the state has already transitioned past +Finished. The `post_command_collecting` flag ensures this late data enters +`output_buffer`, which resets the idle counter and extends the wait. + +When a timeout occurs, the manager also uses a separate **500ms interrupt grace +window** after sending `SIGINT` so partial output and the final exit transition +can still be collected before the stream completes as `TimedOut`. + +## Interaction Between Layers + +A typical bash tool execution timeline: + +``` +Time PTY Data Stream integration.rs manager.rs +───── ───────────────────────── ───────────────────────── ────────────────── + 0ms 633;A state → Prompt + 2ms 633;B state → Input + 4ms (bash_tool writes cmd+\n) + 6ms 633;E;ls record command text + 8ms 633;C state → Executing poll: Executing + output_buffer.clear() +10ms "file1.txt\r\n" output_buffer += 12B poll: Executing +15ms "file2.txt\r\n" output_buffer += 12B +20ms 633;D;0 state → Finished poll: Finished (1st) + post_command_collecting=true record exit_code +22ms 633;A state → Prompt + detecting_conpty_reorder=true +24ms "PS E:\path> " (between A and B, prompt) +26ms 633;B state → Input + plain_output not empty → + don't re-enable collecting +50ms poll: Input, len=24 +100ms poll: Input, len=24, idle=1 +... ... +500ms poll: Input, len=24, idle=10 + → COMPLETE ✓ (send 24B) +``` + +## Windows ConPTY Reordering + +ConPTY is the Windows pseudo-terminal layer that translates VT sequences for +the Windows console subsystem. It introduces a well-known issue: **rendered +output and pass-through OSC sequences may be delivered out of order**. + +### Observed Reordering Patterns + +**Pattern 1 — Late output (sequences arrive before rendered content):** + +``` +Expected: [output] [633;D] [633;A] [prompt] [633;B] +Actual: [633;D] [633;A] [633;B] [output+prompt] +``` + +The shell integration sequences pass through immediately, but ConPTY's +rendering pipeline buffers the actual text and delivers it later. Without +mitigation, the state machine reaches Input before the output arrives, causing +data loss. + +**Fix**: `post_command_collecting` flag keeps `should_collect()` returning true +after Finished, so late-arriving output still enters the buffer. + +**Pattern 2 — Early prompt (rendered content arrives before sequences):** + +``` +Expected: [output] [633;D] [633;A] [prompt] [633;B] +Actual: [output+prompt] [633;D] [633;A] [633;B] +``` + +ConPTY renders both the command output AND the prompt text before delivering +the CommandFinished sequence. Since the prompt is part of the `plain_output` +when the `should_flush` before CommandFinished fires, it gets flushed into +`output_buffer` as command output. + +**Status**: This pattern cannot be reliably fixed at the shell integration +level without content-based heuristics (e.g., regex matching the prompt text). +The prompt may appear in tool output in this case. + +### ConPTY Reorder Detection Mechanism + +To handle Pattern 1 while minimizing prompt inclusion, the code uses a +two-phase detection between PromptStart (A) and CommandInputStart (B): + +``` +At 633;A (PromptStart): + if post_command_collecting: + post_command_collecting = false // tentatively stop + detecting_conpty_reorder = true // start watching + +At 633;B (CommandInputStart): + if detecting_conpty_reorder: + if plain_output between A and B is empty: + // No prompt text arrived → ConPTY reordered (Pattern 1) + post_command_collecting = true // re-enable for late output + else: + // Prompt text present → normal ordering + // post_command_collecting stays false + detecting_conpty_reorder = false +``` + +This heuristic correctly excludes the prompt in normal ordering while still +capturing late output in Pattern 1. Pattern 2 remains unmitigated. diff --git a/src/crates/core/src/service/terminal/src/api.rs b/src/crates/core/src/service/terminal/src/api.rs index f04dcf53..0986b964 100644 --- a/src/crates/core/src/service/terminal/src/api.rs +++ b/src/crates/core/src/service/terminal/src/api.rs @@ -14,7 +14,7 @@ use crate::config::TerminalConfig; use crate::events::TerminalEvent; use crate::session::{ get_session_manager, init_session_manager, is_session_manager_initialized, - CommandExecuteResult, ExecuteOptions, SessionManager, TerminalSession, + CommandCompletionReason, CommandExecuteResult, ExecuteOptions, SessionManager, TerminalSession, }; use crate::shell::{ShellDetector, ShellType}; use crate::{TerminalError, TerminalResult}; @@ -155,6 +155,10 @@ pub struct GetHistoryResponse { /// Current history size in bytes #[serde(rename = "historySize")] pub history_size: usize, + /// Terminal column count when history was recorded (PTY current size) + pub cols: u16, + /// Terminal row count when history was recorded (PTY current size) + pub rows: u16, } /// Shell information response @@ -202,6 +206,9 @@ pub struct ExecuteCommandResponse { /// Exit code (if available) #[serde(rename = "exitCode")] pub exit_code: Option, + /// Why command execution stopped. + #[serde(rename = "completionReason")] + pub completion_reason: CommandCompletionReason, } impl From for ExecuteCommandResponse { @@ -211,6 +218,7 @@ impl From for ExecuteCommandResponse { command_id: result.command_id, output: result.output, exit_code: result.exit_code, + completion_reason: result.completion_reason, } } } @@ -383,6 +391,8 @@ impl TerminalApi { session_id: request.session_id, data, history_size, + cols: session.cols, + rows: session.rows, }) } diff --git a/src/crates/core/src/service/terminal/src/lib.rs b/src/crates/core/src/service/terminal/src/lib.rs index 79b68ca9..c8a62f3f 100644 --- a/src/crates/core/src/service/terminal/src/lib.rs +++ b/src/crates/core/src/service/terminal/src/lib.rs @@ -46,8 +46,9 @@ pub use pty::{ SpawnResult, }; pub use session::{ - CommandExecuteResult, CommandStream, CommandStreamEvent, ExecuteOptions, SessionManager, - SessionStatus, TerminalBindingOptions, TerminalSession, TerminalSessionBinding, + CommandCompletionReason, CommandExecuteResult, CommandStream, CommandStreamEvent, + ExecuteOptions, SessionManager, SessionStatus, TerminalBindingOptions, TerminalSession, + TerminalSessionBinding, }; pub use shell::{ get_integration_script_content, CommandState, ScriptsManager, ShellDetector, ShellIntegration, diff --git a/src/crates/core/src/service/terminal/src/session/manager.rs b/src/crates/core/src/service/terminal/src/session/manager.rs index b2382904..983618a6 100644 --- a/src/crates/core/src/service/terminal/src/session/manager.rs +++ b/src/crates/core/src/service/terminal/src/session/manager.rs @@ -6,10 +6,10 @@ use std::sync::Arc; use std::time::Duration; use dashmap::DashMap; -use futures::Stream; -use log::warn; +use futures::{Stream, StreamExt}; +use log::{debug, warn}; +use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, RwLock}; -use tokio::time::timeout; use crate::config::{ShellConfig, TerminalConfig}; use crate::events::{TerminalEvent, TerminalEventEmitter}; @@ -22,6 +22,18 @@ use crate::{TerminalError, TerminalResult}; use super::{SessionStatus, TerminalSession}; +const COMMAND_TIMEOUT_INTERRUPT_GRACE_MS: Duration = Duration::from_millis(500); + +/// Why a command stream reached completion. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum CommandCompletionReason { + /// Command finished normally, including signal-driven exits not caused by timeout. + Completed, + /// Command hit the configured timeout and terminal attempted to interrupt it. + TimedOut, +} + /// Result of executing a command #[derive(Debug, Clone)] pub struct CommandExecuteResult { @@ -33,6 +45,8 @@ pub struct CommandExecuteResult { pub output: String, /// Exit code (if available) pub exit_code: Option, + /// Why command execution stopped. + pub completion_reason: CommandCompletionReason, } /// Options for command execution @@ -60,10 +74,11 @@ pub enum CommandStreamEvent { Started { command_id: String }, /// Output data received Output { data: String }, - /// Command completed successfully + /// Command reached a terminal state. Completed { exit_code: Option, total_output: String, + completion_reason: CommandCompletionReason, }, /// Command execution failed Error { message: String }, @@ -72,6 +87,33 @@ pub enum CommandStreamEvent { /// A stream of command execution events pub type CommandStream = Pin + Send>>; +fn compute_stream_output_delta(last_sent_output: &mut String, output: &str) -> Option { + if output.len() < last_sent_output.len() || !output.starts_with(last_sent_output.as_str()) { + last_sent_output.clear(); + } + + let new_data = output + .strip_prefix(last_sent_output.as_str()) + .filter(|data| !data.is_empty()) + .map(|data| data.to_string()); + + last_sent_output.clear(); + last_sent_output.push_str(output); + + new_data +} + +async fn get_integration_output_snapshot( + session_integrations: &Arc>>, + session_id: &str, +) -> String { + let integrations = session_integrations.read().await; + integrations + .get(session_id) + .map(|i| i.get_output().to_string()) + .unwrap_or_default() +} + /// Session manager for terminal sessions pub struct SessionManager { /// Configuration @@ -701,211 +743,51 @@ impl SessionManager { command: &str, options: ExecuteOptions, ) -> TerminalResult { - // Check if session exists - let _session = { - let sessions = self.sessions.read().await; - sessions - .get(session_id) - .cloned() - .ok_or_else(|| TerminalError::SessionNotFound(session_id.to_string()))? - }; - - // Check if shell integration is available - let has_integration = { - let integrations = self.session_integrations.read().await; - integrations.contains_key(session_id) - }; - - if !has_integration { - return Err(TerminalError::Session( - "Shell integration is not enabled for this session".to_string(), - )); - } - - // Wait for session to be ready before executing command - Self::wait_for_session_ready_static(&self.sessions, &self.session_integrations, session_id) - .await?; - - // Generate command ID - let command_id = uuid::Uuid::new_v4().to_string(); - - // Clear any previous output - { - let mut integrations = self.session_integrations.write().await; - if let Some(integration) = integrations.get_mut(session_id) { - integration.clear_output(); - } - } - - // Prepare the command (optionally with leading space to prevent history) - let cmd_to_send = if options.prevent_history { - format!(" {}\r", command) // Leading space prevents bash history - } else { - format!("{}\r", command) - }; - - // Send the command - self.write(session_id, cmd_to_send.as_bytes()).await?; - - // Wait for command completion (with optional timeout) - match options.timeout { - Some(timeout_duration) => { - let result = timeout( - timeout_duration, - self.wait_for_command_completion(session_id), - ) - .await; + let mut stream = self.execute_command_stream_with_options( + session_id.to_string(), + command.to_string(), + options, + ); + let mut command_id = uuid::Uuid::new_v4().to_string(); + let mut output = String::new(); + + while let Some(event) = stream.next().await { + match event { + CommandStreamEvent::Started { + command_id: started_command_id, + } => { + command_id = started_command_id; + } + CommandStreamEvent::Output { data } => { + output.push_str(&data); + } + CommandStreamEvent::Completed { + exit_code, + total_output, + completion_reason, + } => { + if !total_output.is_empty() { + output = total_output; + } - match result { - Ok(Ok((output, exit_code))) => Ok(CommandExecuteResult { + return Ok(CommandExecuteResult { command: command.to_string(), command_id, output, exit_code, - }), - Ok(Err(e)) => Err(e), - Err(_) => { - // Timeout - get whatever output we have - let output = { - let integrations = self.session_integrations.read().await; - integrations - .get(session_id) - .map(|i| i.get_output().to_string()) - .unwrap_or_default() - }; - - Err(TerminalError::Timeout(format!( - "Command timed out after {:?}. Partial output: {}", - timeout_duration, - if output.len() > 200 { - &output[..200] - } else { - &output - } - ))) - } - } - } - None => { - // No timeout - wait indefinitely - let (output, exit_code) = self.wait_for_command_completion(session_id).await?; - Ok(CommandExecuteResult { - command: command.to_string(), - command_id, - output, - exit_code, - }) - } - } - } - - /// Wait for command completion using shell integration - async fn wait_for_command_completion( - &self, - session_id: &str, - ) -> TerminalResult<(String, Option)> { - let poll_interval = Duration::from_millis(50); - let max_idle_checks = 20; // After 1 second of idle, check for prompt - let mut idle_count = 0; - let mut last_output_len = 0; - let mut finished_exit_code: Option> = None; - let mut post_finish_idle_count = 0; - // Wait for output to stabilize after CommandFinished - let post_finish_idle_required = 4; // 200ms of idle after finish - - loop { - tokio::time::sleep(poll_interval).await; - - // Check current state - let (state, output, output_len, cmd_finished, last_exit) = { - let integrations = self.session_integrations.read().await; - if let Some(integration) = integrations.get(session_id) { - let output = integration.get_output().to_string(); - let len = output.len(); - let cmd_finished = integration.command_just_finished(); - let last_exit = integration.last_exit_code(); - ( - integration.state().clone(), - output, - len, - cmd_finished, - last_exit, - ) - } else { - return Err(TerminalError::Session("Integration not found".to_string())); - } - }; - - // If command just finished, record it even if state already changed - if cmd_finished && finished_exit_code.is_none() { - finished_exit_code = Some(last_exit); - post_finish_idle_count = 0; - last_output_len = output_len; - // Clear the flag - let mut integrations = self.session_integrations.write().await; - if let Some(integration) = integrations.get_mut(session_id) { - integration.clear_command_finished(); - } - } - - // Check if command finished - match state { - CommandState::Finished { exit_code } => { - // First time seeing Finished state - record it - if finished_exit_code.is_none() { - finished_exit_code = Some(exit_code); - post_finish_idle_count = 0; - last_output_len = output_len; - } else { - // Already in finished state - wait for output to stabilize - if output_len == last_output_len { - post_finish_idle_count += 1; - if post_finish_idle_count >= post_finish_idle_required { - // Output has been stable, return result - return Ok((output, finished_exit_code.flatten())); - } - } else { - // New output arrived, reset counter - post_finish_idle_count = 0; - last_output_len = output_len; - } - } - } - CommandState::Idle | CommandState::Prompt | CommandState::Input => { - // If we previously detected Finished, wait for output to stabilize then return - if finished_exit_code.is_some() { - if output_len == last_output_len { - post_finish_idle_count += 1; - if post_finish_idle_count >= post_finish_idle_required { - return Ok((output, finished_exit_code.flatten())); - } - } else { - post_finish_idle_count = 0; - last_output_len = output_len; - } - } else { - // Command might have completed without proper shell integration sequence - // Use idle detection as fallback - if output_len == last_output_len { - idle_count += 1; - if idle_count >= max_idle_checks { - // Assume command completed - return Ok((output, None)); - } - } else { - idle_count = 0; - last_output_len = output_len; - } - } + completion_reason, + }); } - CommandState::Executing => { - // Still executing, reset idle count - idle_count = 0; - finished_exit_code = None; - last_output_len = output_len; + CommandStreamEvent::Error { message } => { + return Err(TerminalError::Session(message)); } } } + + Err(TerminalError::Session(format!( + "Command stream ended unexpectedly for session {}", + session_id + ))) } /// Execute a command and return a stream of events @@ -1017,30 +899,44 @@ impl SessionManager { let max_idle_checks = 20; let mut idle_count = 0; let mut last_output_len = 0; - let mut last_sent_len = 0; + let mut last_sent_output = String::new(); let start_time = std::time::Instant::now(); let mut finished_exit_code: Option> = None; let mut post_finish_idle_count = 0; let post_finish_idle_required = 4; // 200ms of idle after finish + let mut timed_out = false; + let mut timeout_interrupt_deadline: Option = None; loop { - // Check timeout (only if timeout is configured) - if let Some(timeout_dur) = timeout_duration { - if start_time.elapsed() > timeout_dur { - let output = { - let integrations = session_integrations.read().await; - integrations - .get(&session_id) - .map(|i| i.get_output().to_string()) - .unwrap_or_default() - }; - send(CommandStreamEvent::Error { - message: format!("Command timed out after {:?}", timeout_dur), - }) - .await; + if !timed_out { + if let Some(timeout_dur) = timeout_duration { + if start_time.elapsed() > timeout_dur { + timed_out = true; + timeout_interrupt_deadline = Some( + tokio::time::Instant::now() + COMMAND_TIMEOUT_INTERRUPT_GRACE_MS, + ); + + debug!( + "Command timed out in session {}, sending SIGINT", + session_id + ); + if let Err(err) = pty_service.signal(pty_id, "SIGINT").await { + warn!( + "Failed to interrupt timed out command in session {}: {}", + session_id, err + ); + } + } + } + } else if let Some(deadline) = timeout_interrupt_deadline { + if tokio::time::Instant::now() >= deadline { + let output = + get_integration_output_snapshot(&session_integrations, &session_id) + .await; send(CommandStreamEvent::Completed { - exit_code: None, + exit_code: finished_exit_code.flatten(), total_output: output, + completion_reason: CommandCompletionReason::TimedOut, }) .await; return; @@ -1080,11 +976,10 @@ impl SessionManager { let output_len = output.len(); - // Send any new output - if output_len > last_sent_len { - let new_data = output[last_sent_len..].to_string(); + if let Some(new_data) = + compute_stream_output_delta(&mut last_sent_output, output.as_str()) + { send(CommandStreamEvent::Output { data: new_data }).await; - last_sent_len = output_len; } // Check if command finished @@ -1103,6 +998,11 @@ impl SessionManager { send(CommandStreamEvent::Completed { exit_code: finished_exit_code.flatten(), total_output: output, + completion_reason: if timed_out { + CommandCompletionReason::TimedOut + } else { + CommandCompletionReason::Completed + }, }) .await; return; @@ -1124,6 +1024,11 @@ impl SessionManager { send(CommandStreamEvent::Completed { exit_code: finished_exit_code.flatten(), total_output: output, + completion_reason: if timed_out { + CommandCompletionReason::TimedOut + } else { + CommandCompletionReason::Completed + }, }) .await; return; @@ -1141,6 +1046,11 @@ impl SessionManager { send(CommandStreamEvent::Completed { exit_code: None, total_output: output, + completion_reason: if timed_out { + CommandCompletionReason::TimedOut + } else { + CommandCompletionReason::Completed + }, }) .await; return; @@ -1427,3 +1337,52 @@ impl SessionManager { impl Drop for SessionManager { fn drop(&mut self) {} } + +#[cfg(test)] +mod tests { + use super::{compute_stream_output_delta, CommandCompletionReason}; + + #[test] + fn stream_output_delta_returns_utf8_suffix_without_cutting_chars() { + let mut last_sent_output = "你好!我是 Bitfun,".to_string(); + let output = "你好!我是 Bitfun,可以帮助你完成软件工程任务。".to_string(); + + let delta = compute_stream_output_delta(&mut last_sent_output, &output); + + assert_eq!(delta.as_deref(), Some("可以帮助你完成软件工程任务。")); + assert_eq!(last_sent_output, output); + } + + #[test] + fn stream_output_delta_resets_when_previous_snapshot_is_not_prefix() { + let mut last_sent_output = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxx".to_string(); + let output = "你好!我是 Bitfun,可以帮助你完成软件工程任务。有什么我可以帮你的吗?"; + + let delta = compute_stream_output_delta(&mut last_sent_output, output); + + assert_eq!(delta.as_deref(), Some(output)); + assert_eq!(last_sent_output, output); + } + + #[test] + fn stream_output_delta_returns_none_when_output_is_unchanged() { + let mut last_sent_output = "hello 你好".to_string(); + + let delta = compute_stream_output_delta(&mut last_sent_output, "hello 你好"); + + assert_eq!(delta, None); + assert_eq!(last_sent_output, "hello 你好"); + } + + #[test] + fn completion_reason_serializes_with_camel_case_contract() { + assert_eq!( + serde_json::to_string(&CommandCompletionReason::Completed).unwrap(), + "\"completed\"" + ); + assert_eq!( + serde_json::to_string(&CommandCompletionReason::TimedOut).unwrap(), + "\"timedOut\"" + ); + } +} diff --git a/src/crates/core/src/service/terminal/src/session/mod.rs b/src/crates/core/src/service/terminal/src/session/mod.rs index 2b3e6930..372da17f 100644 --- a/src/crates/core/src/service/terminal/src/session/mod.rs +++ b/src/crates/core/src/service/terminal/src/session/mod.rs @@ -11,7 +11,8 @@ mod singleton; pub use binding::{TerminalBindingOptions, TerminalSessionBinding}; pub use manager::{ - CommandExecuteResult, CommandStream, CommandStreamEvent, ExecuteOptions, SessionManager, + CommandCompletionReason, CommandExecuteResult, CommandStream, CommandStreamEvent, + ExecuteOptions, SessionManager, }; pub use persistent::PersistentSession; pub use serializer::SessionSerializer; diff --git a/src/crates/core/src/service/terminal/src/shell/integration.rs b/src/crates/core/src/service/terminal/src/shell/integration.rs index 66eb3204..527d2874 100644 --- a/src/crates/core/src/service/terminal/src/shell/integration.rs +++ b/src/crates/core/src/service/terminal/src/shell/integration.rs @@ -56,6 +56,9 @@ pub enum CommandState { impl CommandState { /// Check if we should still collect output (executing or just finished) + /// + /// Note: This only checks the state itself. `ShellIntegration::should_collect_output()` + /// also considers the `post_command_collecting` flag for ConPTY late output. pub fn should_collect_output(&self) -> bool { matches!( self, @@ -114,6 +117,14 @@ pub struct ShellIntegration { last_exit_code: Option, /// Flag indicating a command just finished (for output collection) command_just_finished: bool, + /// Flag for collecting late output after CommandFinished. + /// On Windows, ConPTY may deliver rendered output AFTER shell integration + /// sequences (CommandFinished/PromptStart/CommandInputStart). This flag + /// keeps output collection active until the next CommandExecutionStart. + post_command_collecting: bool, + /// When true, we are between PromptStart and CommandInputStart, + /// checking whether prompt text exists to detect ConPTY reordering. + detecting_conpty_reorder: bool, } impl ShellIntegration { @@ -132,6 +143,8 @@ impl ShellIntegration { in_osc: false, last_exit_code: None, command_just_finished: false, + post_command_collecting: false, + detecting_conpty_reorder: false, } } @@ -170,6 +183,13 @@ impl ShellIntegration { self.has_rich_detection } + /// Check if output should be collected, considering both state and post-command flag. + /// On Windows ConPTY, rendered output may arrive after shell integration sequences + /// have already transitioned the state to Prompt/Input. + fn should_collect(&self) -> bool { + self.state.should_collect_output() || self.post_command_collecting + } + /// Get accumulated output for current command pub fn get_output(&self) -> &str { &self.output_buffer @@ -207,7 +227,7 @@ impl ShellIntegration { ); if should_flush && !plain_output.is_empty() - && self.state.should_collect_output() + && self.should_collect() { self.output_buffer.push_str(&plain_output); if let Some(cmd_id) = &self.current_command_id { @@ -220,6 +240,19 @@ impl ShellIntegration { } } + // ConPTY reorder detection: at CommandInputStart, if no + // prompt text accumulated since PromptStart, ConPTY sent + // the sequences before the rendered output. Re-enable + // post-command collection so late output is captured. + if self.detecting_conpty_reorder + && matches!(seq, OscSequence::CommandInputStart) + { + if plain_output.is_empty() { + self.post_command_collecting = true; + } + self.detecting_conpty_reorder = false; + } + if let Some(event) = self.handle_sequence(seq) { events.push(event); } @@ -245,9 +278,10 @@ impl ShellIntegration { } } - // Accumulate plain output if we should collect output - // Continue collecting even after Finished until we see PromptStart - if !plain_output.is_empty() && self.state.should_collect_output() { + // Accumulate plain output if we should collect output. + // Continue collecting after Finished via post_command_collecting flag, + // because ConPTY may deliver rendered output after shell integration sequences. + if !plain_output.is_empty() && self.should_collect() { self.output_buffer.push_str(&plain_output); if let Some(cmd_id) = &self.current_command_id { @@ -347,6 +381,14 @@ impl ShellIntegration { OscSequence::PromptStart => { // When we see the next prompt, the previous command is truly done // Clear all state from previous command + if self.post_command_collecting { + // Temporarily disable post-command collection. + // If no prompt text appears between PromptStart and + // CommandInputStart, ConPTY reordering is detected and + // collection will be re-enabled at CommandInputStart. + self.post_command_collecting = false; + self.detecting_conpty_reorder = true; + } self.current_command_id = None; self.current_command = None; self.state = CommandState::Prompt; @@ -362,6 +404,8 @@ impl ShellIntegration { // Clear previous command's exit code when new command starts self.last_exit_code = None; self.command_just_finished = false; + self.post_command_collecting = false; + self.detecting_conpty_reorder = false; // Generate command ID if we have a command if self.current_command.is_some() { @@ -383,6 +427,9 @@ impl ShellIntegration { // Save exit code - this survives state transitions self.last_exit_code = exit_code; self.command_just_finished = true; + // Keep collecting output after finish — ConPTY may deliver + // rendered output after the shell integration sequences. + self.post_command_collecting = true; // Emit event but keep command_id for output collection let event = if let Some(cmd_id) = &self.current_command_id { diff --git a/src/web-ui/src/app/components/NavPanel/sections/shell-hub/ShellHubSection.tsx b/src/web-ui/src/app/components/NavPanel/sections/shell-hub/ShellHubSection.tsx index 35cc2d60..5d97c05d 100644 --- a/src/web-ui/src/app/components/NavPanel/sections/shell-hub/ShellHubSection.tsx +++ b/src/web-ui/src/app/components/NavPanel/sections/shell-hub/ShellHubSection.tsx @@ -59,6 +59,16 @@ interface HubConfig { worktrees: Record; } +function activateOnEnterOrSpace( + event: React.KeyboardEvent, + action: () => void +) { + if (event.key === 'Enter' || event.key === ' ') { + event.preventDefault(); + action(); + } +} + function loadHubConfig(workspacePath: string): HubConfig { try { const raw = localStorage.getItem(`${TERMINAL_HUB_STORAGE_KEY}:${workspacePath}`); @@ -404,11 +414,13 @@ const ShellHubSection: React.FC = () => { const running = isRunning(entry.sessionId); return ( - - + ); }; @@ -488,10 +500,12 @@ const ShellHubSection: React.FC = () => { return (
-
- + {expanded && terms.length > 0 && (
{terms.map(entry => renderTerminalItem(entry, wt.path))} diff --git a/src/web-ui/src/app/components/NavPanel/sections/shells/ShellsSection.tsx b/src/web-ui/src/app/components/NavPanel/sections/shells/ShellsSection.tsx index c1aad390..b5d53c28 100644 --- a/src/web-ui/src/app/components/NavPanel/sections/shells/ShellsSection.tsx +++ b/src/web-ui/src/app/components/NavPanel/sections/shells/ShellsSection.tsx @@ -75,6 +75,16 @@ interface ShellEntry { startupCommand?: string; } +function activateOnEnterOrSpace( + event: React.KeyboardEvent, + action: () => void +) { + if (event.key === 'Enter' || event.key === ' ') { + event.preventDefault(); + action(); + } +} + const ShellsSection: React.FC = () => { const { t } = useTranslation('panels/terminal'); const setActiveSession = useTerminalSceneStore(s => s.setActiveSession); @@ -419,11 +429,13 @@ const ShellsSection: React.FC = () => { const renderTerminalItem = (entry: ShellEntry) => { return ( -
- + ); }; diff --git a/src/web-ui/src/app/components/RemoteConnectDialog/RemoteConnectDialog.tsx b/src/web-ui/src/app/components/RemoteConnectDialog/RemoteConnectDialog.tsx index 67d8a472..45f6a550 100644 --- a/src/web-ui/src/app/components/RemoteConnectDialog/RemoteConnectDialog.tsx +++ b/src/web-ui/src/app/components/RemoteConnectDialog/RemoteConnectDialog.tsx @@ -136,9 +136,6 @@ export const RemoteConnectDialog: React.FC = ({ for (let attempt = 0; attempt < 3; attempt++) { try { const s = await remoteConnectAPI.getStatus(); - // #region agent log - fetch('http://127.0.0.1:7682/ingest/19e63f07-99ee-4098-b8c6-1e032fa6efd0',{method:'POST',headers:{'Content-Type':'application/json','X-Debug-Session-Id':'c7eac2'},body:JSON.stringify({sessionId:'c7eac2',location:'RemoteConnectDialog:checkExisting',message:'status check',data:{attempt,pairing_state:s.pairing_state,bot_connected:s.bot_connected,active_method:s.active_method},timestamp:Date.now()})}).catch(()=>{}); - // #endregion if (cancelled) return; setStatus(s); diff --git a/src/web-ui/src/tools/terminal/components/ConnectedTerminal.tsx b/src/web-ui/src/tools/terminal/components/ConnectedTerminal.tsx index dc3db2a4..5311720b 100644 --- a/src/web-ui/src/tools/terminal/components/ConnectedTerminal.tsx +++ b/src/web-ui/src/tools/terminal/components/ConnectedTerminal.tsx @@ -18,6 +18,13 @@ const log = createLogger('ConnectedTerminal'); /** Line threshold for multi-line paste confirmation. */ const MULTILINE_PASTE_THRESHOLD = 1; +/** + * Matches a standalone absolute cursor position command: ESC [ R ; C H + * ConPTY sends these after resize to reposition the cursor in its own coordinate + * system, which diverges from xterm.js coordinates after history replay. + */ +const CURSOR_POS_RE = /^\x1b\[(\d+);(\d+)H$/; + export interface ConnectedTerminalProps { sessionId: string; className?: string; @@ -54,7 +61,61 @@ const ConnectedTerminal: React.FC = memo(({ const isTerminalReadyRef = useRef(false); const outputQueueRef = useRef([]); + // After history replay, ConPTY sends absolute cursor-position commands (ESC[R;CH) + // that reference its own coordinate system, which diverges from xterm.js after replay. + // We let those commands pass through (to avoid side effects from redirecting them) and + // instead restore the correct cursor position via write callbacks after each one. + const postHistoryCursorRef = useRef<{ row: number; col: number; ignoreCount: number } | null>(null); + + // PTY dimensions stored with the history snapshot. + // Used to resize xterm.js to the correct size before replaying history, so that + // absolute cursor-position sequences in the history (e.g. ESC[27;1H) are not + // clamped to the xterm.js default row count (24). + const historyDimsRef = useRef<{ cols: number; rows: number } | null>(null); + + // While set to a positive value, Terminal's doXtermResize will refuse to shrink + // the column count below this threshold. Set during history flush so that the + // CSS open-animation (which drives the terminal through many narrow intermediate + // widths) cannot permanently truncate content written at the historical width. + // Cleared when post-history cursor mode exits. + const preventShrinkBelowColsRef = useRef(0); + const handleOutput = useCallback((data: string) => { + // Post-history cursor restoration: + // ConPTY sends standalone cursor-position commands after resize in its own coordinate + // system. We let them pass through unmodified (redirecting them caused content side + // effects) and instead snap the cursor back to the saved correct position via a + // write callback after each one is processed by xterm.js. + if (postHistoryCursorRef.current && postHistoryCursorRef.current.ignoreCount > 0) { + const isCursorOnly = CURSOR_POS_RE.test(data); + if (isCursorOnly) { + const cursor = postHistoryCursorRef.current; + cursor.ignoreCount--; + const restoreSeq = `\x1b[${cursor.row};${cursor.col}H`; + if (!isTerminalReadyRef.current || !terminalRef.current) { + outputQueueRef.current.push(data); + outputQueueRef.current.push(restoreSeq); + return; + } + const xterm = terminalRef.current.getTerminal?.(); + if (xterm) { + // Write the original cursor move, then immediately queue the restore so + // the visible cursor always lands at the correct history-end position. + xterm.write(data, () => { + xterm.write(restoreSeq); + }); + } else { + terminalRef.current.write(data); + terminalRef.current.write(restoreSeq); + } + return; + } else { + // Real content arrived — cursor is already at correct position from last restore. + postHistoryCursorRef.current = null; + preventShrinkBelowColsRef.current = 0; + } + } + if (!isTerminalReadyRef.current || !terminalRef.current) { outputQueueRef.current.push(data); return; @@ -67,7 +128,44 @@ const ConnectedTerminal: React.FC = memo(({ if (queue.length === 0) return; // Clear first to prevent orphaned items if new data arrives during flush outputQueueRef.current = []; + + // If we have historical PTY dimensions, resize xterm.js to the correct row + // count before replaying content. This prevents absolute cursor-position + // sequences embedded in the history (e.g. ESC[27;1H) from being clamped to + // xterm.js's default 24-row size, which would corrupt the rendered output. + // We also lock doXtermResize against shrinking so that the CSS open-animation + // (which passes through many narrow column counts) cannot permanently truncate + // the history content that is about to be written at the historical width. + const dims = historyDimsRef.current; + if (dims) { + const xterm = terminalRef.current?.getTerminal?.(); + if (xterm) { + const targetRows = Math.max(xterm.rows, dims.rows); + try { + if (xterm.rows !== targetRows) { + xterm.resize(xterm.cols, targetRows); + } + } catch { /* ignore */ } + // Prevent doXtermResize from shrinking below the historical col width. + preventShrinkBelowColsRef.current = dims.cols; + } + } + queue.forEach(data => terminalRef.current?.write(data)); + + // After all history writes complete, save the cursor row so that subsequent + // ConPTY cursor-only updates can be redirected to this correct position. + const xterm = terminalRef.current?.getTerminal?.(); + if (xterm) { + xterm.write('', () => { + const cursorY = xterm.buffer.active.cursorY; // 0-indexed + const cursorRow = cursorY + 1; // 1-indexed for ANSI + if (cursorRow > 0) { + const cursorCol = xterm.buffer.active.cursorX + 1; // 1-indexed + postHistoryCursorRef.current = { row: cursorRow, col: cursorCol, ignoreCount: 10 }; + } + }); + } }, []); const handleReady = useCallback(() => { @@ -85,6 +183,10 @@ const ConnectedTerminal: React.FC = memo(({ log.error('Terminal error', { sessionId, message }); }, [sessionId]); + const handleHistoryDims = useCallback((cols: number, rows: number) => { + historyDimsRef.current = { cols, rows }; + }, []); + const { session, isLoading, @@ -101,6 +203,7 @@ const ConnectedTerminal: React.FC = memo(({ onReady: handleReady, onExit: handleExit, onError: handleError, + onHistoryDims: handleHistoryDims, }); const handleData = useCallback((data: string) => { @@ -118,6 +221,18 @@ const ConnectedTerminal: React.FC = memo(({ } lastSentSizeRef.current = { cols, rows }; + // If post-history cursor mode is active, update the saved cursor position + // ONLY when the terminal is growing (wider cols). Shrinking resizes may place + // the cursor at a damaged/truncated position, so we ignore those updates. + // Growing resizes simply add columns on the right; the cursor row/col stays valid. + if (postHistoryCursorRef.current) { + const xterm = terminalRef.current?.getTerminal?.(); + if (xterm && cols >= xterm.cols) { + postHistoryCursorRef.current.row = xterm.buffer.active.cursorY + 1; + postHistoryCursorRef.current.col = xterm.buffer.active.cursorX + 1; + } + } + resize(cols, rows).then(() => { }).catch(err => { log.error('Resize failed', { sessionId, cols, rows, error: err }); @@ -295,6 +410,7 @@ const ConnectedTerminal: React.FC = memo(({ onTitleChange={handleTitleChange} onReady={handleTerminalReady} onPaste={handlePaste} + preventShrinkBelowColsRef={preventShrinkBelowColsRef} /> {showStatusBar && session && ( diff --git a/src/web-ui/src/tools/terminal/components/Terminal.tsx b/src/web-ui/src/tools/terminal/components/Terminal.tsx index 994af45d..f6edb1c8 100644 --- a/src/web-ui/src/tools/terminal/components/Terminal.tsx +++ b/src/web-ui/src/tools/terminal/components/Terminal.tsx @@ -96,6 +96,13 @@ export interface TerminalProps { * Uses the default multi-line confirmation when omitted. */ onPaste?: (text: string) => Promise | boolean; + /** + * When set to a positive value, doXtermResize skips any resize that would + * shrink the terminal below this column count. Used during history replay to + * prevent CSS-animation intermediate sizes from permanently truncating buffered + * content. Set back to 0 (or leave unset) to restore normal resize behaviour. + */ + preventShrinkBelowColsRef?: React.MutableRefObject; } export interface TerminalRef { @@ -157,6 +164,7 @@ const Terminal = forwardRef(({ onResize, onReady, onPaste, + preventShrinkBelowColsRef, }, ref) => { const containerRef = useRef(null); const terminalRef = useRef(null); @@ -195,12 +203,21 @@ const Terminal = forwardRef(({ if (terminal.cols === cols && terminal.rows === rows) { return; } - + + // While the caller has set a minimum column guard (e.g., during history + // replay), skip any resize that would shrink below that value. This + // prevents CSS open-animation intermediate widths from permanently + // truncating buffered content that was written at a wider column count. + const minCols = preventShrinkBelowColsRef?.current ?? 0; + if (minCols > 0 && cols < minCols) { + return; + } + terminal.resize(cols, rows); } catch (error) { log.warn('Xterm resize error', { cols, rows, error }); } - }, []); + }, [preventShrinkBelowColsRef]); // Notify backend PTY with deduping. const doBackendResize = useCallback((cols: number, rows: number) => { @@ -243,6 +260,16 @@ const Terminal = forwardRef(({ return; } + // Skip tiny intermediate dimensions that occur when a panel CSS-animates + // from zero width to its final size. xterm.js permanently truncates buffer + // lines to the current column count on resize, so we must avoid resizing + // to columns fewer than any content already in the buffer. + // 40 cols is the minimum usable terminal width (below this, most shells + // are unusable anyway and content would be permanently damaged). + if (dims.cols < 40 || dims.rows < 3) { + return; + } + if (resizeDebouncerRef.current) { resizeDebouncerRef.current.resize(dims.cols, dims.rows, immediate); } else { diff --git a/src/web-ui/src/tools/terminal/hooks/useTerminal.ts b/src/web-ui/src/tools/terminal/hooks/useTerminal.ts index 09718012..cdea9578 100644 --- a/src/web-ui/src/tools/terminal/hooks/useTerminal.ts +++ b/src/web-ui/src/tools/terminal/hooks/useTerminal.ts @@ -21,6 +21,8 @@ export interface UseTerminalOptions { onReady?: () => void; onExit?: (exitCode?: number) => void; onError?: (message: string) => void; + /** Called with the PTY dimensions stored alongside history, before onOutput. */ + onHistoryDims?: (cols: number, rows: number) => void; } export interface UseTerminalReturn { @@ -45,6 +47,7 @@ export function useTerminal(options: UseTerminalOptions): UseTerminalReturn { onReady, onExit, onError, + onHistoryDims, } = options; const [session, setSession] = useState(null); @@ -61,6 +64,7 @@ export function useTerminal(options: UseTerminalOptions): UseTerminalReturn { const onReadyRef = useRef(onReady); const onExitRef = useRef(onExit); const onErrorRef = useRef(onError); + const onHistoryDimsRef = useRef(onHistoryDims); // Keep refs updated useEffect(() => { @@ -69,6 +73,7 @@ export function useTerminal(options: UseTerminalOptions): UseTerminalReturn { onReadyRef.current = onReady; onExitRef.current = onExit; onErrorRef.current = onError; + onHistoryDimsRef.current = onHistoryDims; }); // Stable event handler that uses refs @@ -114,7 +119,32 @@ export function useTerminal(options: UseTerminalOptions): UseTerminalReturn { setIsConnected(service.isConnected()); - // Subscribe to events + // Get session info + const sessionInfo = await service.getSession(sessionId); + if (cancelled) return; + + setSession(sessionInfo); + + // Replay output history BEFORE subscribing to live events. + // This prevents overlap: history covers [past → now], events cover [now → future]. + // If we subscribed first, events arriving between subscribe and getHistory would + // appear in both the event stream and the history buffer, causing duplicate output. + try { + const historyResponse = await service.getHistory(sessionId); + if (!cancelled && historyResponse.data) { + // Notify before queuing data so the terminal can resize to the correct + // dimensions before history is written to the buffer. + onHistoryDimsRef.current?.(historyResponse.cols, historyResponse.rows); + onOutputRef.current?.(historyResponse.data); + } + } catch (histErr) { + // History replay is optional — a failed fetch must not break the terminal. + log.warn('Failed to fetch terminal history', { sessionId, error: histErr }); + } + + if (cancelled) return; + + // Subscribe to live events AFTER history replay to eliminate overlap. const unsubscribe = service.onSessionEvent(sessionId, handleEvent); if (cancelled) { unsubscribe(); @@ -122,12 +152,6 @@ export function useTerminal(options: UseTerminalOptions): UseTerminalReturn { } unsubscribeRef.current = unsubscribe; - // Get session info - const sessionInfo = await service.getSession(sessionId); - if (cancelled) return; - - setSession(sessionInfo); - setIsLoading(false); } catch (err) { if (cancelled) return; diff --git a/src/web-ui/src/tools/terminal/services/TerminalService.ts b/src/web-ui/src/tools/terminal/services/TerminalService.ts index 5a0f8e48..c9c8c786 100644 --- a/src/web-ui/src/tools/terminal/services/TerminalService.ts +++ b/src/web-ui/src/tools/terminal/services/TerminalService.ts @@ -96,6 +96,7 @@ export class TerminalService { const eventType = rawEvent.type; const payload = rawEvent.payload || {}; const sessionId = payload.session_id; + const isSessionDestroyed = eventType === 'SessionDestroyed'; let event: TerminalEvent; switch (eventType) { @@ -112,8 +113,9 @@ export class TerminalService { event = { type: 'exit', sessionId, exitCode: payload.exit_code }; break; case 'SessionDestroyed': - // Map backend-initiated session removal to exit. - event = { type: 'exit', sessionId, exitCode: null }; + // Backend-initiated terminal removal should both mark the terminal exited + // and notify other UI surfaces to close tabs/scenes bound to this session. + event = { type: 'exit', sessionId, exitCode: undefined }; break; case 'Error': event = { type: 'error', sessionId, message: payload.message || payload.error }; @@ -153,6 +155,15 @@ export class TerminalService { log.error('Global callback error', error); } }); + + if (isSessionDestroyed && sessionId) { + if (typeof window !== 'undefined') { + window.dispatchEvent( + new CustomEvent('terminal-session-destroyed', { detail: { sessionId } }) + ); + } + this.eventListeners.delete(sessionId); + } } onSessionEvent(sessionId: string, callback: TerminalEventCallback): UnsubscribeFunction { diff --git a/src/web-ui/src/tools/terminal/types/session.ts b/src/web-ui/src/tools/terminal/types/session.ts index 1343004b..f377cba6 100644 --- a/src/web-ui/src/tools/terminal/types/session.ts +++ b/src/web-ui/src/tools/terminal/types/session.ts @@ -79,6 +79,7 @@ export interface ExecuteCommandResponse { commandId: string; output: string; exitCode?: number; + completionReason: 'completed' | 'timedOut'; } export interface SendCommandRequest { @@ -91,6 +92,10 @@ export interface GetHistoryResponse { data: string; /** Current history size in bytes. */ historySize: number; + /** PTY column count when history was captured. */ + cols: number; + /** PTY row count when history was captured. */ + rows: number; } export type TerminalEventType =