diff --git a/src/apps/desktop/src/api/app_state.rs b/src/apps/desktop/src/api/app_state.rs index c873d5bb..af685d48 100644 --- a/src/apps/desktop/src/api/app_state.rs +++ b/src/apps/desktop/src/api/app_state.rs @@ -115,6 +115,7 @@ impl AppState { .map(|workspace| workspace.root_path); if let Some(workspace_path) = initial_workspace_path.clone() { + bitfun_core::infrastructure::set_workspace_path(Some(workspace_path.clone())); miniapp_manager .set_workspace_path(Some(workspace_path.clone())) .await; diff --git a/src/apps/desktop/src/lib.rs b/src/apps/desktop/src/lib.rs index bda8ff6d..d1fdbc5e 100644 --- a/src/apps/desktop/src/lib.rs +++ b/src/apps/desktop/src/lib.rs @@ -766,13 +766,11 @@ fn start_event_loop_with_transport( if !batch.is_empty() { for envelope in batch { - let router = event_router.clone(); - let env_clone = envelope.clone(); - tokio::spawn(async move { - if let Err(e) = router.route(env_clone).await { - log::warn!("Internal event routing failed: {:?}", e); - } - }); + // Route to internal subscribers (e.g. RemoteSessionStateTracker) + // sequentially so that text chunks are appended in order. + if let Err(e) = event_router.route(envelope.clone()).await { + log::warn!("Internal event routing failed: {:?}", e); + } if let Err(e) = transport.emit_event("", envelope.event).await { log::error!("Failed to emit event: {:?}", e); diff --git a/src/apps/relay-server/README.md b/src/apps/relay-server/README.md index 73750750..2e1698e3 100644 --- a/src/apps/relay-server/README.md +++ b/src/apps/relay-server/README.md @@ -18,9 +18,6 @@ WebSocket relay server for BitFun Remote Connect. Bridges desktop (WebSocket) an ```bash # One-click deploy bash deploy.sh - -# With mobile web client rebuild -bash deploy.sh --build-mobile ``` ### What URL should I fill in BitFun Desktop? @@ -123,7 +120,7 @@ Only desktop clients connect via WebSocket. Mobile clients use the HTTP endpoint 1. Clone the repository 2. Navigate to `src/apps/relay-server/` -3. Run `bash deploy.sh --build-mobile` +3. Run `bash deploy.sh` 4. Configure DNS/firewall as needed 5. In BitFun desktop, select "Custom Server" and enter your server URL diff --git a/src/apps/relay-server/deploy.sh b/src/apps/relay-server/deploy.sh index 8786b934..48b81a30 100755 --- a/src/apps/relay-server/deploy.sh +++ b/src/apps/relay-server/deploy.sh @@ -1,15 +1,13 @@ #!/usr/bin/env bash # BitFun Relay Server — one-click deploy script. -# Usage: bash deploy.sh [--build-mobile] [--skip-build] [--skip-health-check] +# Usage: bash deploy.sh [--skip-build] [--skip-health-check] # # Prerequisites: Docker, Docker Compose set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -PROJECT_ROOT="$(cd "$SCRIPT_DIR/../../.." && pwd)" -BUILD_MOBILE=false SKIP_BUILD=false SKIP_HEALTH_CHECK=false @@ -21,7 +19,6 @@ Usage: bash deploy.sh [options] Options: - --build-mobile Build mobile-web static files before deploy --skip-build Skip docker compose build, only restart services --skip-health-check Skip post-deploy health check -h, --help Show this help message @@ -46,7 +43,6 @@ check_docker_compose() { for arg in "$@"; do case "$arg" in - --build-mobile) BUILD_MOBILE=true ;; --skip-build) SKIP_BUILD=true ;; --skip-health-check) SKIP_HEALTH_CHECK=true ;; -h|--help) @@ -65,31 +61,16 @@ echo "=== BitFun Relay Server Deploy ===" check_command docker check_docker_compose -# Build mobile web static files if requested -if [ "$BUILD_MOBILE" = true ] && [ -d "$PROJECT_ROOT/src/mobile-web" ]; then - check_command npm - echo "[1/3] Building mobile web client..." - cd "$PROJECT_ROOT/src/mobile-web" - npm ci - npm run build - mkdir -p "$SCRIPT_DIR/static" - cp -r dist/* "$SCRIPT_DIR/static/" - cd "$SCRIPT_DIR" - echo " Mobile web built → $SCRIPT_DIR/static/" -else - echo "[1/3] Skipping mobile web build (use --build-mobile to include)" -fi - # Build and start containers cd "$SCRIPT_DIR" if [ "$SKIP_BUILD" = true ]; then - echo "[2/3] Skipping Docker build (--skip-build)" + echo "[1/2] Skipping Docker build (--skip-build)" else - echo "[2/3] Building Docker images..." + echo "[1/2] Building Docker images..." docker compose build fi -echo "[3/3] Starting services..." +echo "[2/2] Starting services..." docker compose up -d if [ "$SKIP_HEALTH_CHECK" = false ]; then diff --git a/src/apps/relay-server/src/lib.rs b/src/apps/relay-server/src/lib.rs index e133f5b5..3be16d62 100644 --- a/src/apps/relay-server/src/lib.rs +++ b/src/apps/relay-server/src/lib.rs @@ -247,7 +247,10 @@ pub fn build_relay_router( .route("/health", get(routes::api::health_check)) .route("/api/info", get(routes::api::server_info)) .route("/api/rooms/:room_id/pair", post(routes::api::pair)) - .route("/api/rooms/:room_id/command", post(routes::api::command)) + .route( + "/api/rooms/:room_id/command", + post(routes::api::command).layer(DefaultBodyLimit::max(10 * 1024 * 1024)), + ) .route( "/api/rooms/:room_id/upload-web", post(routes::api::upload_web).layer(DefaultBodyLimit::max(10 * 1024 * 1024)), diff --git a/src/apps/relay-server/src/routes/websocket.rs b/src/apps/relay-server/src/routes/websocket.rs index f323213c..ca264d16 100644 --- a/src/apps/relay-server/src/routes/websocket.rs +++ b/src/apps/relay-server/src/routes/websocket.rs @@ -70,7 +70,8 @@ pub async fn websocket_handler( ws: WebSocketUpgrade, State(state): State, ) -> Response { - ws.on_upgrade(move |socket| handle_socket(socket, state)) + ws.max_message_size(64 * 1024 * 1024) + .on_upgrade(move |socket| handle_socket(socket, state)) } async fn handle_socket(socket: WebSocket, state: AppState) { diff --git a/src/crates/core/src/agentic/coordination/coordinator.rs b/src/crates/core/src/agentic/coordination/coordinator.rs index bbe6f541..639e8a48 100644 --- a/src/crates/core/src/agentic/coordination/coordinator.rs +++ b/src/crates/core/src/agentic/coordination/coordinator.rs @@ -269,6 +269,78 @@ impl ConversationCoordinator { } } + /// Ensure the completed/failed/cancelled turn is persisted to the workspace + /// conversation storage. If the frontend already saved a richer version + /// during streaming, we only update the final status; otherwise we create + /// a minimal record with the user message so the turn is never lost. + /// Safety-net persistence: only creates a minimal record when the frontend + /// has not saved anything yet. The frontend's PersistenceModule is the + /// authoritative writer for turn content (model rounds, text, tools, etc.) + /// and final status. This function must NOT overwrite frontend-managed + /// data, because the spawned task always runs before the frontend receives + /// the DialogTurnCompleted event via the transport layer, and the existing + /// disk data from debounced saves may have incomplete model rounds. + async fn finalize_turn_in_workspace( + session_id: &str, + turn_id: &str, + turn_index: usize, + user_input: &str, + workspace_path: &str, + status: crate::service::conversation::TurnStatus, + user_message_metadata: Option, + ) { + use crate::infrastructure::PathManager; + use crate::service::conversation::{ + ConversationPersistenceManager, DialogTurnData, UserMessageData, + }; + + let path_manager = match PathManager::new() { + Ok(pm) => std::sync::Arc::new(pm), + Err(_) => return, + }; + + let conv_mgr = match ConversationPersistenceManager::new( + path_manager, + std::path::PathBuf::from(workspace_path), + ) + .await + { + Ok(mgr) => mgr, + Err(_) => return, + }; + + if let Ok(Some(_existing)) = conv_mgr.load_dialog_turn(session_id, turn_index).await { + return; + } + + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + + let mut turn_data = DialogTurnData::new( + turn_id.to_string(), + turn_index, + session_id.to_string(), + UserMessageData { + id: format!("{}-user", turn_id), + content: user_input.to_string(), + timestamp: now_ms, + metadata: user_message_metadata, + }, + ); + turn_data.status = status; + turn_data.end_time = Some(now_ms); + turn_data.duration_ms = Some(now_ms.saturating_sub(turn_data.start_time)); + + if let Err(e) = conv_mgr.save_dialog_turn(&turn_data).await { + warn!( + "Failed to finalize turn in workspace: session_id={}, turn_index={}, error={}", + session_id, turn_index, e + ); + } + } + /// Create a subagent session for internal AI execution. /// Unlike `create_session`, this does NOT emit `SessionCreated` to the transport layer, /// because subagent sessions are internal implementation details of the execution engine @@ -342,6 +414,130 @@ impl ConversationCoordinator { .await } + /// Pre-analyze images using the configured vision model. + /// + /// Strategy: + /// 1. Vision model configured → analyze images → enhance user message with text descriptions → clear image_contexts + /// 2. No vision model → reject with a user-friendly message + async fn pre_analyze_images_if_needed( + &self, + user_input: String, + image_contexts: Option>, + session_id: &str, + image_metadata: Option, + ) -> BitFunResult<(String, Option>)> { + let images = match &image_contexts { + Some(imgs) if !imgs.is_empty() => imgs, + _ => return Ok((user_input, image_contexts)), + }; + + use crate::agentic::image_analysis::{ + resolve_vision_model_from_global_config, AnalyzeImagesRequest, ImageAnalyzer, + MessageEnhancer, + }; + use crate::infrastructure::ai::get_global_ai_client_factory; + + let vision_model = match resolve_vision_model_from_global_config().await { + Ok(m) => m, + Err(_e) => { + let is_chinese = Self::is_chinese_locale().await; + let msg = if is_chinese { + "请先在桌面端「设置 → AI 模型」中配置图片理解模型,然后再发送图片。" + } else { + "Please configure an Image Understanding Model in Settings → AI Models on the desktop app before sending images." + }; + return Err(BitFunError::service(msg)); + } + }; + + let factory = match get_global_ai_client_factory().await { + Ok(f) => f, + Err(e) => { + warn!("Failed to get AI client factory for vision: {}", e); + return Ok((user_input, image_contexts)); + } + }; + + let vision_client = match factory.get_client_by_id(&vision_model.id).await { + Ok(c) => c, + Err(e) => { + warn!("Failed to create vision AI client: {}", e); + return Ok((user_input, image_contexts)); + } + }; + + let workspace_path = crate::infrastructure::get_workspace_path(); + let analyzer = ImageAnalyzer::new(workspace_path, vision_client); + let request = AnalyzeImagesRequest { + images: images.clone(), + user_message: Some(user_input.clone()), + session_id: session_id.to_string(), + }; + + self.emit_event(AgenticEvent::ImageAnalysisStarted { + session_id: session_id.to_string(), + image_count: images.len(), + user_input: user_input.clone(), + image_metadata: image_metadata.clone(), + }) + .await; + + let analysis_start = std::time::Instant::now(); + + match analyzer.analyze_images(request, &vision_model).await { + Ok(results) => { + let duration_ms = analysis_start.elapsed().as_millis() as u64; + + self.emit_event(AgenticEvent::ImageAnalysisCompleted { + session_id: session_id.to_string(), + success: true, + duration_ms, + }) + .await; + + info!( + "Vision pre-analysis completed: session={}, images={}, results={}, duration={}ms", + session_id, + images.len(), + results.len(), + duration_ms + ); + let enhanced = + MessageEnhancer::enhance_with_image_analysis(&user_input, &results, &[]); + Ok((enhanced, None)) + } + Err(e) => { + let duration_ms = analysis_start.elapsed().as_millis() as u64; + + self.emit_event(AgenticEvent::ImageAnalysisCompleted { + session_id: session_id.to_string(), + success: false, + duration_ms, + }) + .await; + + warn!( + "Vision pre-analysis failed, falling back to multimodal: session={}, error={}", + session_id, e + ); + Ok((user_input, image_contexts)) + } + } + } + + async fn is_chinese_locale() -> bool { + use crate::service::config::get_global_config_service; + use crate::service::config::types::AppConfig; + let Ok(config_service) = get_global_config_service().await else { + return true; + }; + let app: AppConfig = config_service + .get_config(Some("app")) + .await + .unwrap_or_default(); + app.language.starts_with("zh") + } + async fn start_dialog_turn_internal( &self, session_id: String, @@ -472,6 +668,50 @@ impl ConversationCoordinator { } let original_user_input = user_input.clone(); + + // Build image metadata for ConversationPersistenceManager (before image_contexts is consumed) + // Also stores original_text so the UI can display the user's actual input + // instead of the vision-enhanced text. + let user_message_metadata: Option = image_contexts + .as_ref() + .filter(|imgs| !imgs.is_empty()) + .map(|imgs| { + let image_meta: Vec = imgs + .iter() + .map(|img| { + let name = img + .metadata + .as_ref() + .and_then(|m| m.get("name")) + .and_then(|v| v.as_str()) + .unwrap_or("image.png"); + let mut meta = serde_json::json!({ + "id": &img.id, + "name": name, + "mime_type": &img.mime_type, + }); + if let Some(url) = &img.data_url { + meta["data_url"] = serde_json::json!(url); + } + if let Some(path) = &img.image_path { + meta["image_path"] = serde_json::json!(path); + } + meta + }) + .collect(); + serde_json::json!({ + "images": image_meta, + "original_text": &original_user_input, + }) + }); + + // Auto vision pre-analysis: when images are present, try to use the configured + // vision model to pre-analyze them, then enhance the user message with text descriptions. + // This is the single authoritative code path for all image handling (desktop, remote, bot). + // If no vision model is configured, the request is rejected with a user-friendly message. + let (user_input, image_contexts) = + self.pre_analyze_images_if_needed(user_input, image_contexts, &session_id, user_message_metadata.clone()).await?; + let wrapped_user_input = self .wrap_user_input(&effective_agent_type, user_input) .await?; @@ -489,12 +729,20 @@ impl ConversationCoordinator { ) .await?; - // Send dialog turn started event + // Send dialog turn started event with original input and image metadata + // so all frontends (desktop, mobile, bot) can display correctly. + let has_images = user_message_metadata.is_some(); self.emit_event(AgenticEvent::DialogTurnStarted { session_id: session_id.clone(), turn_id: turn_id.clone(), turn_index, user_input: wrapped_user_input.clone(), + original_user_input: if has_images { + Some(original_user_input.clone()) + } else { + None + }, + user_message_metadata: user_message_metadata.clone(), subagent_parent_info: None, }) .await; @@ -583,7 +831,9 @@ impl ConversationCoordinator { let session_id_clone = session_id.clone(); let turn_id_clone = turn_id.clone(); let session_workspace_path = session.config.workspace_path.clone(); + let user_input_for_workspace = wrapped_user_input.clone(); let effective_agent_type_clone = effective_agent_type.clone(); + let user_message_metadata_clone = user_message_metadata; let scheduler_notify_tx = self.scheduler_notify_tx.get().cloned(); tokio::spawn(async move { @@ -591,7 +841,7 @@ impl ConversationCoordinator { // Cancel token is created in execute_dialog_turn -> execute_round // execute_dialog_turn has proper cancellation checks internally - if let Some(workspace_path) = session_workspace_path { + if let Some(ref workspace_path) = session_workspace_path { use crate::infrastructure::{get_workspace_path, set_workspace_path}; let current = get_workspace_path().map(|p| p.to_string_lossy().to_string()); @@ -614,7 +864,7 @@ impl ConversationCoordinator { ) .await; - match execution_engine + let workspace_turn_status = match execution_engine .execute_dialog_turn(effective_agent_type_clone, messages, execution_context) .await { @@ -649,13 +899,46 @@ impl ConversationCoordinator { if let Some(tx) = &scheduler_notify_tx { let _ = tx.try_send((session_id_clone.clone(), TurnOutcome::Completed)); } + + Some(crate::service::conversation::TurnStatus::Completed) } Err(e) => { let is_cancellation = matches!(&e, BitFunError::Cancelled(_)); if is_cancellation { - // DialogTurnCancelled already sent in execution_engine - debug!("Dialog turn cancelled: {}", e); + info!("Dialog turn cancelled: session={}, turn={}", session_id_clone, turn_id_clone); + + // The execution engine only emits DialogTurnCancelled when + // cancellation is detected between rounds. If cancellation + // interrupted streaming mid-round, no event was emitted. + // Emit it here unconditionally (duplicates are harmless). + let _ = event_queue + .enqueue( + AgenticEvent::DialogTurnCancelled { + session_id: session_id_clone.clone(), + turn_id: turn_id_clone.clone(), + subagent_parent_info: None, + }, + Some(EventPriority::Critical), + ) + .await; + + // Mark the turn as completed in persistence so its partial + // content appears in historical messages (turns_to_chat_messages + // skips InProgress turns). + let _ = session_manager + .complete_dialog_turn( + &session_id_clone, + &turn_id_clone, + String::new(), + TurnStats { + total_rounds: 0, + total_tools: 0, + total_tokens: 0, + duration_ms: 0, + }, + ) + .await; let _ = session_manager .update_session_state(&session_id_clone, SessionState::Idle) @@ -664,6 +947,8 @@ impl ConversationCoordinator { if let Some(tx) = &scheduler_notify_tx { let _ = tx.try_send((session_id_clone.clone(), TurnOutcome::Cancelled)); } + + Some(crate::service::conversation::TurnStatus::Cancelled) } else { error!("Dialog turn execution failed: {}", e); @@ -682,6 +967,14 @@ impl ConversationCoordinator { ) .await; + let _ = session_manager + .fail_dialog_turn( + &session_id_clone, + &turn_id_clone, + e.to_string(), + ) + .await; + let _ = session_manager .update_session_state( &session_id_clone, @@ -695,8 +988,23 @@ impl ConversationCoordinator { if let Some(tx) = &scheduler_notify_tx { let _ = tx.try_send((session_id_clone.clone(), TurnOutcome::Failed)); } + + Some(crate::service::conversation::TurnStatus::Error) } } + }; + + if let (Some(ref wp), Some(status)) = (&session_workspace_path, workspace_turn_status) { + Self::finalize_turn_in_workspace( + &session_id_clone, + &turn_id_clone, + turn_index, + &user_input_for_workspace, + wp, + status, + user_message_metadata_clone, + ) + .await; } }); diff --git a/src/crates/core/src/agentic/execution/execution_engine.rs b/src/crates/core/src/agentic/execution/execution_engine.rs index 61adeb16..26d50b11 100644 --- a/src/crates/core/src/agentic/execution/execution_engine.rs +++ b/src/crates/core/src/agentic/execution/execution_engine.rs @@ -182,8 +182,6 @@ impl ExecutionEngine { &err, is_current_turn_message, ) { - // Degrade only for historical multimodal messages. Current-turn - // image failures should still surface to users. warn!( "Failed to rebuild multimodal payload, falling back to text-only message: message_id={}, provider={}, turn_id={:?}, current_turn_id={}, error={}", msg.id, provider, msg.metadata.turn_id, current_turn_id, err diff --git a/src/crates/core/src/agentic/image_analysis/enhancer.rs b/src/crates/core/src/agentic/image_analysis/enhancer.rs index 767fef3f..93b38876 100644 --- a/src/crates/core/src/agentic/image_analysis/enhancer.rs +++ b/src/crates/core/src/agentic/image_analysis/enhancer.rs @@ -57,6 +57,8 @@ impl MessageEnhancer { enhanced.push_str("\n"); } + enhanced.push_str("The above image analysis has already been performed. Do NOT suggest the user to view or re-analyze the image. Respond directly to the user's question based on the analysis.\n\n"); + // 3. Separator enhanced.push_str("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n"); diff --git a/src/crates/core/src/agentic/image_analysis/image_processing.rs b/src/crates/core/src/agentic/image_analysis/image_processing.rs index d4cd763e..d5c5bbea 100644 --- a/src/crates/core/src/agentic/image_analysis/image_processing.rs +++ b/src/crates/core/src/agentic/image_analysis/image_processing.rs @@ -152,8 +152,24 @@ pub fn optimize_image_for_provider( image_data: Vec, provider: &str, fallback_mime: Option<&str>, +) -> BitFunResult { + optimize_image_with_size_limit(image_data, provider, fallback_mime, None) +} + +/// Like `optimize_image_for_provider` but allows an explicit size cap. +/// When `max_output_size` is `Some(n)`, the effective limit is +/// `min(provider_limit, n)`. +pub fn optimize_image_with_size_limit( + image_data: Vec, + provider: &str, + fallback_mime: Option<&str>, + max_output_size: Option, ) -> BitFunResult { let limits = ImageLimits::for_provider(provider); + let effective_max = match max_output_size { + Some(cap) => cap.min(limits.max_size), + None => limits.max_size, + }; let guessed_format = image::guess_format(&image_data).ok(); let dynamic = image::load_from_memory(&image_data) @@ -162,7 +178,7 @@ pub fn optimize_image_for_provider( let (orig_width, orig_height) = (dynamic.width(), dynamic.height()); let needs_resize = orig_width > limits.max_width || orig_height > limits.max_height; - if !needs_resize && image_data.len() <= limits.max_size { + if !needs_resize && image_data.len() <= effective_max { let mime_type = detect_mime_type_from_bytes(&image_data, fallback_mime)?; return Ok(ProcessedImage { data: image_data, @@ -185,33 +201,33 @@ pub fn optimize_image_for_provider( let mut encoded = encode_dynamic_image(&working, preferred_format, 85)?; - if encoded.0.len() > limits.max_size { + if encoded.0.len() > effective_max { for quality in [80u8, 65, 50, 35] { encoded = encode_dynamic_image(&working, ImageFormat::Jpeg, quality)?; - if encoded.0.len() <= limits.max_size { + if encoded.0.len() <= effective_max { break; } } } - if encoded.0.len() > limits.max_size { - for _ in 0..3 { - let next_w = ((working.width() as f32) * 0.85).round().max(64.0) as u32; - let next_h = ((working.height() as f32) * 0.85).round().max(64.0) as u32; + if encoded.0.len() > effective_max { + for _ in 0..5 { + let next_w = ((working.width() as f32) * 0.75).round().max(64.0) as u32; + let next_h = ((working.height() as f32) * 0.75).round().max(64.0) as u32; if next_w == working.width() && next_h == working.height() { break; } working = working.resize(next_w, next_h, FilterType::Triangle); - for quality in [70u8, 55, 40] { + for quality in [70u8, 55, 40, 25] { encoded = encode_dynamic_image(&working, ImageFormat::Jpeg, quality)?; - if encoded.0.len() <= limits.max_size { + if encoded.0.len() <= effective_max { break; } } - if encoded.0.len() <= limits.max_size { + if encoded.0.len() <= effective_max { break; } } diff --git a/src/crates/core/src/agentic/image_analysis/mod.rs b/src/crates/core/src/agentic/image_analysis/mod.rs index 4ba156f8..0778eb2a 100644 --- a/src/crates/core/src/agentic/image_analysis/mod.rs +++ b/src/crates/core/src/agentic/image_analysis/mod.rs @@ -10,7 +10,8 @@ pub mod types; pub use enhancer::MessageEnhancer; pub use image_processing::{ build_multimodal_message, decode_data_url, detect_mime_type_from_bytes, load_image_from_path, - optimize_image_for_provider, process_image_contexts_for_provider, resolve_image_path, + optimize_image_for_provider, optimize_image_with_size_limit, + process_image_contexts_for_provider, resolve_image_path, resolve_vision_model_from_ai_config, resolve_vision_model_from_global_config, build_multimodal_message_with_images, ProcessedImage, }; diff --git a/src/crates/core/src/agentic/image_analysis/processor.rs b/src/crates/core/src/agentic/image_analysis/processor.rs index 2363738d..33fc6276 100644 --- a/src/crates/core/src/agentic/image_analysis/processor.rs +++ b/src/crates/core/src/agentic/image_analysis/processor.rs @@ -4,7 +4,7 @@ use super::image_processing::{ build_multimodal_message, decode_data_url, detect_mime_type_from_bytes, load_image_from_path, - optimize_image_for_provider, resolve_image_path, + optimize_image_with_size_limit, resolve_image_path, }; use super::types::{AnalyzeImagesRequest, ImageAnalysisResult, ImageContextData}; use crate::infrastructure::ai::AIClient; @@ -94,8 +94,13 @@ impl ImageAnalyzer { let (image_data, fallback_mime) = Self::load_image_from_context(&image_ctx, workspace_path.as_deref()).await?; - let processed = - optimize_image_for_provider(image_data, &model.provider, fallback_mime.as_deref())?; + const IMAGE_ANALYSIS_MAX_BYTES: usize = 1024 * 1024; + let processed = optimize_image_with_size_limit( + image_data, + &model.provider, + fallback_mime.as_deref(), + Some(IMAGE_ANALYSIS_MAX_BYTES), + )?; debug!( "Image processing completed: mime={}, size={}KB, dimensions={}x{}", diff --git a/src/crates/core/src/agentic/session/session_manager.rs b/src/crates/core/src/agentic/session/session_manager.rs index db282dae..373c2cbc 100644 --- a/src/crates/core/src/agentic/session/session_manager.rs +++ b/src/crates/core/src/agentic/session/session_manager.rs @@ -641,6 +641,58 @@ impl SessionManager { Ok(()) } + /// Mark a dialog turn as failed and persist it. + /// Unlike `complete_dialog_turn`, this sets the state to `Failed` with an error message. + pub async fn fail_dialog_turn( + &self, + session_id: &str, + turn_id: &str, + error: String, + ) -> BitFunResult<()> { + let mut turn = self + .persistence_manager + .load_dialog_turn(session_id, turn_id) + .await?; + + turn.state = DialogTurnState::Failed { error }; + turn.completed_at = Some(SystemTime::now()); + + if self.config.enable_persistence { + match self.get_context_messages(session_id).await { + Ok(context_messages) => { + if let Err(err) = self + .persistence_manager + .save_turn_context_snapshot( + session_id, + turn.turn_index, + &context_messages, + ) + .await + { + warn!( + "failed to save turn context snapshot on failure: session_id={}, turn_index={}, err={}", + session_id, turn.turn_index, err + ); + } + } + Err(err) => { + warn!( + "failed to build context messages for snapshot on failure: session_id={}, turn_index={}, err={}", + session_id, turn.turn_index, err + ); + } + } + self.persistence_manager.save_dialog_turn(&turn).await?; + } + + debug!( + "Dialog turn marked as failed: turn_id={}, turn_index={}", + turn_id, turn.turn_index + ); + + Ok(()) + } + // ============ Helper Methods ============ /// Get session's message history (complete) diff --git a/src/crates/core/src/agentic/tools/image_context.rs b/src/crates/core/src/agentic/tools/image_context.rs index 3b1ba885..35dda919 100644 --- a/src/crates/core/src/agentic/tools/image_context.rs +++ b/src/crates/core/src/agentic/tools/image_context.rs @@ -73,13 +73,13 @@ pub fn format_image_context_reference(image: &ImageContextData) -> String { if let Some(image_path) = &image.image_path { format!( - "[Image: {}{}]\nPath: {}\nTip: You can use the AnalyzeImage tool with the image_path parameter.", + "[Image: {}{}]\nPath: {}", image.image_name, size_label, image_path ) } else { format!( - "[Image: {}{} (from clipboard)]\nImage ID: {}\nTip: You can use the AnalyzeImage tool.\nParameter: image_id=\"{}\"", - image.image_name, size_label, image.id, image.id + "[Image: {}{} (from clipboard)]\nImage ID: {}", + image.image_name, size_label, image.id ) } } diff --git a/src/crates/core/src/agentic/tools/implementations/ask_user_question_tool.rs b/src/crates/core/src/agentic/tools/implementations/ask_user_question_tool.rs index a7ac71f2..75b0b286 100644 --- a/src/crates/core/src/agentic/tools/implementations/ask_user_question_tool.rs +++ b/src/crates/core/src/agentic/tools/implementations/ask_user_question_tool.rs @@ -76,8 +76,8 @@ impl AskUserQuestionTool { } // Validate options - if question.options.len() < 2 || question.options.len() > 5 { - return Err(format!("Question {} must have 2-5 options", q_num)); + if question.options.len() < 2 || question.options.len() > 10 { + return Err(format!("Question {} must have 2-10 options", q_num)); } for (opt_idx, opt) in question.options.iter().enumerate() { @@ -215,8 +215,8 @@ Usage notes: "additionalProperties": false }, "minItems": 2, - "maxItems": 5, - "description": "The available choices for this question. Must have 2-5 options. Each option should be a distinct, mutually exclusive choice (unless multiSelect is enabled). There should be no 'Other' option, that will be provided automatically." + "maxItems": 10, + "description": "The available choices for this question. Must have 2-10 options. Each option should be a distinct, mutually exclusive choice (unless multiSelect is enabled). There should be no 'Other' option, that will be provided automatically." }, "multiSelect": { "type": "boolean", diff --git a/src/crates/core/src/service/conversation/persistence_manager.rs b/src/crates/core/src/service/conversation/persistence_manager.rs index 9a26d89c..f234b941 100644 --- a/src/crates/core/src/service/conversation/persistence_manager.rs +++ b/src/crates/core/src/service/conversation/persistence_manager.rs @@ -184,7 +184,7 @@ impl ConversationPersistenceManager { if let Some(turn) = self.load_dialog_turn(session_id, i).await? { turns.push(turn); } else { - warn!("Missing dialog turn: session={}, turn={}", session_id, i); + debug!("Missing dialog turn: session={}, turn={}", session_id, i); } } diff --git a/src/crates/core/src/service/remote_connect/bot/command_router.rs b/src/crates/core/src/service/remote_connect/bot/command_router.rs index c9585153..c7d52d00 100644 --- a/src/crates/core/src/service/remote_connect/bot/command_router.rs +++ b/src/crates/core/src/service/remote_connect/bot/command_router.rs @@ -100,6 +100,7 @@ pub struct ForwardRequest { pub content: String, pub agent_type: String, pub turn_id: String, + pub image_contexts: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -248,7 +249,15 @@ fn cancel_task_actions(command: impl Into) -> Vec { // ── Main dispatch ─────────────────────────────────────────────────── -pub async fn handle_command(state: &mut BotChatState, cmd: BotCommand) -> HandleResult { +pub async fn handle_command( + state: &mut BotChatState, + cmd: BotCommand, + images: Vec, +) -> HandleResult { + let image_contexts: Vec = + super::super::remote_server::images_to_contexts( + if images.is_empty() { None } else { Some(&images) }, + ); match cmd { BotCommand::Start | BotCommand::Help => { if state.paired { @@ -327,7 +336,7 @@ pub async fn handle_command(state: &mut BotChatState, cmd: BotCommand) -> Handle if !state.paired { return not_paired(); } - handle_chat_message(state, &msg).await + handle_chat_message(state, &msg, image_contexts).await } } } @@ -663,9 +672,9 @@ async fn handle_new_session(state: &mut BotChatState, agent_type: &str) -> Handl let workspace = ws_path.as_deref().unwrap_or("(unknown)"); HandleResult { reply: format!( - "Created new {} session: {}\nSession ID: {}\nWorkspace: {}\n\n\ + "Created new {} session: {}\nWorkspace: {}\n\n\ You can now send messages to interact with the AI agent.", - label, session_name, session_id, workspace + label, session_name, workspace ), actions: vec![], forward_to_session: None, @@ -737,7 +746,7 @@ async fn handle_number_selection(state: &mut BotChatState, n: usize) -> HandleRe ) .await } - None => handle_chat_message(state, &n.to_string()).await, + None => handle_chat_message(state, &n.to_string(), vec![]).await, } } @@ -1233,11 +1242,15 @@ async fn handle_next_page(state: &mut BotChatState) -> HandleResult { forward_to_session: None, } } - None => handle_chat_message(state, "0").await, + None => handle_chat_message(state, "0", vec![]).await, } } -async fn handle_chat_message(state: &mut BotChatState, message: &str) -> HandleResult { +async fn handle_chat_message( + state: &mut BotChatState, + message: &str, + image_contexts: Vec, +) -> HandleResult { if let Some(PendingAction::AskUserQuestion { tool_id, questions, @@ -1310,6 +1323,7 @@ async fn handle_chat_message(state: &mut BotChatState, message: &str) -> HandleR content: message.to_string(), agent_type: "agentic".to_string(), turn_id, + image_contexts, }), } } @@ -1345,7 +1359,7 @@ pub async fn execute_forwarded_turn( &forward.session_id, forward.content, Some(&forward.agent_type), - None, + forward.image_contexts, DialogTriggerSource::Bot, Some(forward.turn_id), ) @@ -1419,7 +1433,11 @@ pub async fn execute_forwarded_turn( Ok(mut text) => { const MAX_BOT_MSG_LEN: usize = 4000; if text.len() > MAX_BOT_MSG_LEN { - text.truncate(MAX_BOT_MSG_LEN); + let mut end = MAX_BOT_MSG_LEN; + while !text.is_char_boundary(end) { + end -= 1; + } + text.truncate(end); text.push_str("\n\n... (truncated)"); } text diff --git a/src/crates/core/src/service/remote_connect/bot/feishu.rs b/src/crates/core/src/service/remote_connect/bot/feishu.rs index 3ad860fe..4260c8a5 100644 --- a/src/crates/core/src/service/remote_connect/bot/feishu.rs +++ b/src/crates/core/src/service/remote_connect/bot/feishu.rs @@ -228,6 +228,13 @@ struct PendingPairing { created_at: i64, } +struct ParsedMessage { + chat_id: String, + message_id: String, + text: String, + image_keys: Vec, +} + impl FeishuBot { pub fn new(config: FeishuConfig) -> Self { Self { @@ -307,6 +314,97 @@ impl FeishuBot { Ok(()) } + /// Download a user-sent image from a Feishu message using the message resources API. + /// The returned data-URL is compressed to at most 1 MB. + async fn download_image_as_data_url(&self, message_id: &str, file_key: &str) -> Result { + use base64::{engine::general_purpose::STANDARD as B64, Engine}; + + let token = match self.get_access_token().await { + Ok(t) => t, + Err(e) => { + return Err(e); + } + }; + let client = reqwest::Client::new(); + let url = format!( + "https://open.feishu.cn/open-apis/im/v1/messages/{}/resources/{}?type=image", + message_id, file_key + ); + let resp = client + .get(&url) + .bearer_auth(&token) + .send() + .await + .map_err(|e| { + anyhow!("feishu download image: {e}") + })?; + + let status = resp.status(); + if !status.is_success() { + let body = resp.text().await.unwrap_or_default(); + return Err(anyhow!("feishu image download failed: HTTP {status} — {body}")); + } + + let content_type = resp + .headers() + .get("content-type") + .and_then(|v| v.to_str().ok()) + .unwrap_or("image/png") + .to_string(); + let raw_bytes = resp.bytes().await?; + + const MAX_BYTES: usize = 1024 * 1024; + if raw_bytes.len() <= MAX_BYTES { + let b64 = B64.encode(&raw_bytes); + return Ok(format!("data:{};base64,{}", content_type, b64)); + } + + log::info!( + "Feishu image exceeds {}KB ({}KB), compressing", + MAX_BYTES / 1024, + raw_bytes.len() / 1024 + ); + match crate::agentic::image_analysis::optimize_image_with_size_limit( + raw_bytes.to_vec(), + "openai", + Some(&content_type), + Some(MAX_BYTES), + ) { + Ok(processed) => { + let b64 = B64.encode(&processed.data); + Ok(format!("data:{};base64,{}", processed.mime_type, b64)) + } + Err(e) => { + log::warn!("Feishu image compression failed, using original: {e}"); + let b64 = B64.encode(&raw_bytes); + Ok(format!("data:{};base64,{}", content_type, b64)) + } + } + } + + /// Download multiple images and convert to ImageAttachment list. + async fn download_images( + &self, + message_id: &str, + image_keys: &[String], + ) -> Vec { + let mut attachments = Vec::new(); + for (i, key) in image_keys.iter().enumerate() { + match self.download_image_as_data_url(message_id, key).await { + Ok(data_url) => { + attachments.push(super::super::remote_server::ImageAttachment { + name: format!("image_{}.png", i + 1), + data_url, + }); + } + Err(e) => { + warn!("Failed to download Feishu image {key}: {e}"); + } + } + } + attachments + } + pub async fn send_action_card( &self, chat_id: &str, @@ -498,8 +596,9 @@ impl FeishuBot { Ok((url, client_config)) } - /// Extract (chat_id, text) from a Feishu text message event. - fn parse_message_event(event: &serde_json::Value) -> Option<(String, String)> { + /// Parse a Feishu message event into text + image keys. + /// Supports `text`, `post` (rich text with images), and `image` message types. + fn parse_message_event_full(event: &serde_json::Value) -> Option { let event_type = event .pointer("/header/event_type") .and_then(|v| v.as_str())?; @@ -507,23 +606,105 @@ impl FeishuBot { return None; } - let msg_type = event - .pointer("/event/message/message_type") - .and_then(|v| v.as_str())?; - if msg_type != "text" { - return None; - } - let chat_id = event .pointer("/event/message/chat_id") .and_then(|v| v.as_str())? .to_string(); + let message_id = event + .pointer("/event/message/message_id") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let msg_type = event + .pointer("/event/message/message_type") + .and_then(|v| v.as_str())?; let content_str = event .pointer("/event/message/content") .and_then(|v| v.as_str())?; let content: serde_json::Value = serde_json::from_str(content_str).ok()?; - let text = content["text"].as_str()?.trim().to_string(); - Some((chat_id, text)) + + match msg_type { + "text" => { + let text = content["text"].as_str()?.trim().to_string(); + if text.is_empty() { return None; } + Some(ParsedMessage { chat_id, message_id, text, image_keys: vec![] }) + } + "post" => { + let (text, image_keys) = Self::extract_from_post(&content); + if text.is_empty() && image_keys.is_empty() { return None; } + Some(ParsedMessage { chat_id, message_id, text, image_keys }) + } + "image" => { + let image_key = content["image_key"].as_str()?.to_string(); + Some(ParsedMessage { + chat_id, + message_id, + text: String::new(), + image_keys: vec![image_key], + }) + } + _ => None, + } + } + + /// Backward-compatible wrapper: returns (chat_id, text) only for text/post with text content. + fn parse_message_event(event: &serde_json::Value) -> Option<(String, String)> { + let parsed = Self::parse_message_event_full(event)?; + if parsed.text.is_empty() { return None; } + Some((parsed.chat_id, parsed.text)) + } + + /// Extract text and image keys from a Feishu `post` (rich-text) message. + fn extract_from_post(content: &serde_json::Value) -> (String, Vec) { + let root = if content["content"].is_array() { + content + } else { + content + .get("zh_cn") + .or_else(|| content.get("en_us")) + .or_else(|| content.as_object().and_then(|obj| obj.values().next())) + .unwrap_or(content) + }; + + let paragraphs = match root["content"].as_array() { + Some(p) => p, + None => return (String::new(), vec![]), + }; + + let mut text_parts: Vec = Vec::new(); + let mut image_keys: Vec = Vec::new(); + + for para in paragraphs { + if let Some(elements) = para.as_array() { + for elem in elements { + match elem["tag"].as_str().unwrap_or("") { + "text" | "a" => { + if let Some(t) = elem["text"].as_str() { + let trimmed = t.trim(); + if !trimmed.is_empty() { + text_parts.push(trimmed.to_string()); + } + } + } + "img" => { + if let Some(key) = elem["image_key"].as_str() { + if !key.is_empty() { + image_keys.push(key.to_string()); + } + } + } + _ => {} + } + } + } + } + + let title = root["title"].as_str().unwrap_or("").trim(); + if !title.is_empty() { + text_parts.insert(0, title.to_string()); + } + + (text_parts.join(" "), image_keys) } /// Extract (chat_id, command) from a Feishu card action callback. @@ -552,10 +733,23 @@ impl FeishuBot { Some((chat_id, command)) } + #[cfg(test)] fn parse_ws_event(event: &serde_json::Value) -> Option<(String, String)> { Self::parse_message_event(event).or_else(|| Self::parse_card_action_event(event)) } + /// Extract chat_id from any im.message.receive_v1 event (regardless of msg_type). + fn extract_message_chat_id(event: &serde_json::Value) -> Option { + let event_type = event.pointer("/header/event_type").and_then(|v| v.as_str())?; + if event_type != "im.message.receive_v1" { + return None; + } + event + .pointer("/event/message/chat_id") + .and_then(|v| v.as_str()) + .map(String::from) + } + /// Handle a single incoming protobuf data frame. /// Returns Some(chat_id) if pairing succeeded, None to continue waiting. async fn handle_data_frame_for_pairing( @@ -609,6 +803,11 @@ impl FeishuBot { self.send_message(&chat_id, "Please enter the 6-digit pairing code from BitFun Desktop.") .await.ok(); } + } else if let Some(chat_id) = Self::extract_message_chat_id(&event) { + self.send_message( + &chat_id, + "Only text messages are supported. Please send the 6-digit pairing code as text.", + ).await.ok(); } None } @@ -766,10 +965,48 @@ impl FeishuBot { let resp = pb::Frame::new_response(&frame, 200); let _ = write.write().await.send(WsMessage::Binary(pb::encode_frame(&resp))).await; - if let Some((chat_id, msg_text)) = Self::parse_ws_event(&event) { + if let Some(parsed) = Self::parse_message_event_full(&event) { + let bot = self.clone(); + tokio::spawn(async move { + const MAX_IMAGES: usize = 5; + let truncated = parsed.image_keys.len() > MAX_IMAGES; + let keys_to_use = if truncated { + &parsed.image_keys[..MAX_IMAGES] + } else { + &parsed.image_keys + }; + let images = if keys_to_use.is_empty() { + vec![] + } else { + bot.download_images(&parsed.message_id, keys_to_use).await + }; + if truncated { + let msg = format!( + "⚠️ Only the first {} images will be processed; the remaining {} were discarded.", + MAX_IMAGES, + parsed.image_keys.len() - MAX_IMAGES, + ); + bot.send_message(&parsed.chat_id, &msg).await.ok(); + } + let text = if parsed.text.is_empty() && !images.is_empty() { + "[User sent an image]".to_string() + } else { + parsed.text + }; + bot.handle_incoming_message(&parsed.chat_id, &text, images).await; + }); + } else if let Some((chat_id, cmd)) = Self::parse_card_action_event(&event) { let bot = self.clone(); tokio::spawn(async move { - bot.handle_incoming_message(&chat_id, &msg_text).await; + bot.handle_incoming_message(&chat_id, &cmd, vec![]).await; + }); + } else if let Some(chat_id) = Self::extract_message_chat_id(&event) { + let bot = self.clone(); + tokio::spawn(async move { + bot.send_message( + &chat_id, + "This message type is not supported. Please send text or images.", + ).await.ok(); }); } } @@ -810,7 +1047,12 @@ impl FeishuBot { } } - async fn handle_incoming_message(self: &Arc, chat_id: &str, text: &str) { + async fn handle_incoming_message( + self: &Arc, + chat_id: &str, + text: &str, + images: Vec, + ) { let mut states = self.chat_states.write().await; let state = states .entry(chat_id.to_string()) @@ -857,7 +1099,7 @@ impl FeishuBot { } let cmd = parse_command(text); - let result = handle_command(state, cmd).await; + let result = handle_command(state, cmd, images).await; self.persist_chat_state(chat_id, state).await; drop(states); diff --git a/src/crates/core/src/service/remote_connect/bot/telegram.rs b/src/crates/core/src/service/remote_connect/bot/telegram.rs index 5047acb7..fa7b7eff 100644 --- a/src/crates/core/src/service/remote_connect/bot/telegram.rs +++ b/src/crates/core/src/service/remote_connect/bot/telegram.rs @@ -292,7 +292,7 @@ impl TelegramBot { } let cmd = parse_command(text); - let result = handle_command(state, cmd).await; + let result = handle_command(state, cmd, vec![]).await; self.persist_chat_state(chat_id, state).await; drop(states); diff --git a/src/crates/core/src/service/remote_connect/mod.rs b/src/crates/core/src/service/remote_connect/mod.rs index 3cd6821a..dc5b9d09 100644 --- a/src/crates/core/src/service/remote_connect/mod.rs +++ b/src/crates/core/src/service/remote_connect/mod.rs @@ -27,7 +27,7 @@ pub use relay_client::RelayClient; pub use remote_server::RemoteServer; use anyhow::Result; -use log::{error, info}; +use log::{debug, error, info, warn}; use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::RwLock; @@ -362,7 +362,7 @@ impl RemoteConnectService { if let Some(ref server) = *server_guard { match server.decrypt_command(&encrypted_data, &nonce) { Ok((cmd, request_id)) => { - info!("Remote command: {cmd:?}"); + debug!("Remote command: {cmd:?}"); let response = server.dispatch(&cmd).await; match server .encrypt_response(&response, request_id.as_deref()) @@ -385,7 +385,7 @@ impl RemoteConnectService { } } } - Err(e) => error!("Failed to decrypt command: {e}"), + Err(e) => debug!("Ignoring undecryptable command (likely stale mobile session): {e}"), } } } else { diff --git a/src/crates/core/src/service/remote_connect/remote_server.rs b/src/crates/core/src/service/remote_connect/remote_server.rs index a9a370f0..f1b8bedd 100644 --- a/src/crates/core/src/service/remote_connect/remote_server.rs +++ b/src/crates/core/src/service/remote_connect/remote_server.rs @@ -54,6 +54,7 @@ pub enum RemoteCommand { content: String, agent_type: Option, images: Option>, + image_contexts: Option>, }, CancelTask { session_id: String, @@ -181,6 +182,12 @@ pub struct SessionInfo { pub workspace_name: Option, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChatImageAttachment { + pub name: String, + pub data_url: String, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChatMessage { pub id: String, @@ -195,6 +202,8 @@ pub struct ChatMessage { /// Ordered items preserving the interleaved display order from the desktop. #[serde(skip_serializing_if = "Option::is_none")] pub items: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub images: Option>, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -205,6 +214,8 @@ pub struct ChatMessageItem { pub content: Option, #[serde(skip_serializing_if = "Option::is_none")] pub tool: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub is_subagent: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -244,30 +255,150 @@ pub struct RemoteToolStatus { pub type EncryptedPayload = (String, String); +/// Build a slim version of tool params for mobile preview. +/// Strips large string values (file content, diffs, etc.) to keep payload small, +/// while preserving all short fields so the frontend can parse and display them. +fn make_slim_params(params: &serde_json::Value) -> Option { + match params { + serde_json::Value::Object(obj) => { + let slim: serde_json::Map = obj + .iter() + .filter_map(|(k, v)| match v { + serde_json::Value::String(s) if s.len() > 200 => None, + _ => Some((k.clone(), v.clone())), + }) + .collect(); + if slim.is_empty() { + return None; + } + serde_json::to_string(&serde_json::Value::Object(slim)).ok() + } + serde_json::Value::String(s) => Some(s.chars().take(200).collect()), + _ => None, + } +} + +/// Compress a base64 data-URL image to a small thumbnail for mobile display. +/// Falls back to the original if decoding/compression fails or the image is +/// already within `max_bytes`. +fn compress_data_url_for_mobile(data_url: &str, max_bytes: usize) -> String { + use base64::engine::general_purpose::STANDARD as BASE64; + use base64::Engine; + use image::imageops::FilterType; + + const MAX_THUMBNAIL_DIM: u32 = 400; + + let Some(comma_pos) = data_url.find(',') else { + return data_url.to_string(); + }; + let b64_data = &data_url[comma_pos + 1..]; + + if b64_data.len() * 3 / 4 <= max_bytes { + return data_url.to_string(); + } + + let Ok(raw_bytes) = BASE64.decode(b64_data) else { + return data_url.to_string(); + }; + + let Ok(img) = image::load_from_memory(&raw_bytes) else { + return data_url.to_string(); + }; + + let resized = if img.width() > MAX_THUMBNAIL_DIM || img.height() > MAX_THUMBNAIL_DIM { + img.resize(MAX_THUMBNAIL_DIM, MAX_THUMBNAIL_DIM, FilterType::Triangle) + } else { + img + }; + + fn encode_jpeg(img: &image::DynamicImage, quality: u8) -> Option> { + let mut buf = Vec::new(); + let encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf, quality); + img.write_with_encoder(encoder).ok()?; + Some(buf) + } + + for quality in [75u8, 60, 45, 30] { + if let Some(buf) = encode_jpeg(&resized, quality) { + if buf.len() <= max_bytes || quality == 30 { + let b64 = BASE64.encode(&buf); + return format!("data:image/jpeg;base64,{b64}"); + } + } + } + + data_url.to_string() +} + +/// Max thumbnail size per image sent to mobile (100 KB). +const MOBILE_IMAGE_MAX_BYTES: usize = 100 * 1024; + /// Convert ConversationPersistenceManager turns into mobile ChatMessages. /// This is the same data source the desktop frontend uses. fn turns_to_chat_messages( turns: &[crate::service::conversation::DialogTurnData], ) -> Vec { + use crate::service::conversation::TurnStatus; + let mut result = Vec::new(); for turn in turns { + let images = turn + .user_message + .metadata + .as_ref() + .and_then(|m| m.get("images")) + .and_then(|v| v.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|v| { + let name = v.get("name")?.as_str()?.to_string(); + let raw_url = v.get("data_url")?.as_str()?; + let data_url = + compress_data_url_for_mobile(raw_url, MOBILE_IMAGE_MAX_BYTES); + Some(ChatImageAttachment { name, data_url }) + }) + .collect::>() + }) + .filter(|v| !v.is_empty()); + + // Prefer original_text from metadata (pre-enhancement) for display + let display_content = turn + .user_message + .metadata + .as_ref() + .and_then(|m| m.get("original_text")) + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .unwrap_or_else(|| strip_user_input_tags(&turn.user_message.content)); + result.push(ChatMessage { id: turn.user_message.id.clone(), role: "user".to_string(), - content: strip_user_input_tags(&turn.user_message.content), + content: display_content, timestamp: (turn.user_message.timestamp / 1000).to_string(), metadata: None, tools: None, thinking: None, items: None, + images, }); + // Skip assistant message for in-progress turns. The active turn's + // content is delivered via the real-time overlay, not the historical + // list. Including an empty / partial assistant message here would + // "consume" a slot in the count-based skip cursor and prevent the + // final version from ever being delivered. + if turn.status == TurnStatus::InProgress { + continue; + } + // Collect ordered items across all rounds, preserving interleaved order struct OrderedEntry { order_index: Option, timestamp: u64, sequence: usize, + round_idx: usize, item: ChatMessageItem, } let mut ordered: Vec = Vec::new(); @@ -276,40 +407,48 @@ fn turns_to_chat_messages( let mut text_parts = Vec::new(); let mut sequence = 0usize; - for round in &turn.model_rounds { - for t in &round.text_items { + for (round_idx, round) in turn.model_rounds.iter().enumerate() { + // Iterate in streaming order: thinking → text → tools. + // The model first thinks, then outputs text (which may reference + // tool calls), and finally the tools are detected and executed. + // This matches the real-time display order on the tracker. + for t in &round.thinking_items { if t.is_subagent_item.unwrap_or(false) { continue; } if !t.content.is_empty() { - text_parts.push(t.content.clone()); + thinking_parts.push(t.content.clone()); ordered.push(OrderedEntry { order_index: t.order_index, timestamp: t.timestamp, sequence, + round_idx, item: ChatMessageItem { - item_type: "text".to_string(), + item_type: "thinking".to_string(), content: Some(t.content.clone()), tool: None, + is_subagent: None, }, }); sequence += 1; } } - for t in &round.thinking_items { + for t in &round.text_items { if t.is_subagent_item.unwrap_or(false) { continue; } if !t.content.is_empty() { - thinking_parts.push(t.content.clone()); + text_parts.push(t.content.clone()); ordered.push(OrderedEntry { order_index: t.order_index, timestamp: t.timestamp, sequence, + round_idx, item: ChatMessageItem { - item_type: "thinking".to_string(), + item_type: "text".to_string(), content: Some(t.content.clone()), tool: None, + is_subagent: None, }, }); sequence += 1; @@ -332,8 +471,11 @@ fn turns_to_chat_messages( status: status_str.to_string(), duration_ms: t.duration_ms, start_ms: Some(t.start_time), - input_preview: None, - tool_input: if t.tool_name == "AskUserQuestion" { + input_preview: make_slim_params(&t.tool_call.input), + tool_input: if t.tool_name == "AskUserQuestion" + || t.tool_name == "Task" + || t.tool_name == "TodoWrite" + { Some(t.tool_call.input.clone()) } else { None @@ -342,29 +484,34 @@ fn turns_to_chat_messages( tools_flat.push(tool_status.clone()); ordered.push(OrderedEntry { order_index: t.order_index, - timestamp: t.start_time, + timestamp: round.start_time, sequence, + round_idx, item: ChatMessageItem { item_type: "tool".to_string(), content: None, tool: Some(tool_status), + is_subagent: None, }, }); sequence += 1; } } - ordered.sort_by(|a, b| match (a.order_index, b.order_index) { - (Some(a_idx), Some(b_idx)) => a_idx - .cmp(&b_idx) - .then_with(|| a.timestamp.cmp(&b.timestamp)) - .then_with(|| a.sequence.cmp(&b.sequence)), - (Some(_), None) => std::cmp::Ordering::Less, - (None, Some(_)) => std::cmp::Ordering::Greater, - (None, None) => a - .timestamp - .cmp(&b.timestamp) - .then_with(|| a.sequence.cmp(&b.sequence)), + // Sort by round first (rounds are strictly sequential), then by + // order_index within each round. order_index is per-round (resets + // to 0 each round), so it must NOT be compared across rounds. + ordered.sort_by(|a, b| { + let round_cmp = a.round_idx.cmp(&b.round_idx); + if round_cmp != std::cmp::Ordering::Equal { + return round_cmp; + } + match (a.order_index, b.order_index) { + (Some(a_idx), Some(b_idx)) => a_idx.cmp(&b_idx), + (Some(_), None) => std::cmp::Ordering::Less, + (None, Some(_)) => std::cmp::Ordering::Greater, + (None, None) => a.sequence.cmp(&b.sequence), + } }); let items: Vec = ordered.into_iter().map(|e| e.item).collect(); @@ -387,6 +534,7 @@ fn turns_to_chat_messages( Some(thinking_parts.join("\n\n")) }, items: if items.is_empty() { None } else { Some(items) }, + images: None, }); } @@ -428,6 +576,12 @@ fn strip_user_input_tags(content: &str) -> String { if let Some(pos) = s.find("") { return s[..pos].trim().to_string(); } + // Extract original question from enhancer-wrapped content + if s.starts_with("User uploaded") { + if let Some(pos) = s.find("User's question:\n") { + return s[pos + "User's question:\n".len()..].trim().to_string(); + } + } s.to_string() } @@ -441,6 +595,40 @@ fn resolve_agent_type(mobile_type: Option<&str>) -> &'static str { } } +/// Convert legacy `ImageAttachment` to unified `ImageContextData`. +pub fn images_to_contexts( + images: Option<&Vec>, +) -> Vec { + let Some(imgs) = images.filter(|v| !v.is_empty()) else { + return Vec::new(); + }; + imgs.iter() + .map(|img| { + let mime_type = img + .data_url + .split_once(',') + .and_then(|(header, _)| { + header + .strip_prefix("data:") + .and_then(|rest| rest.split(';').next()) + }) + .unwrap_or("image/png") + .to_string(); + + crate::agentic::image_analysis::ImageContextData { + id: format!("remote_img_{}", uuid::Uuid::new_v4()), + image_path: None, + data_url: Some(img.data_url.clone()), + mime_type, + metadata: Some(serde_json::json!({ + "name": img.name, + "source": "remote" + })), + } + }) + .collect() +} + fn build_message_with_remote_images(content: &str, images: &[ImageAttachment]) -> String { use crate::agentic::tools::image_context::{ format_image_context_reference, store_image_context, ImageContextData, @@ -500,6 +688,10 @@ struct TrackerState { round_index: usize, /// Ordered items preserving the interleaved arrival order for real-time display. active_items: Vec, + /// Set on structural events (turn start/complete) that change persisted + /// messages. Cleared after the poll handler loads persistence. Allows + /// skipping the expensive disk read during streaming. + persistence_dirty: bool, } /// Lightweight event broadcast by the tracker for real-time consumers (e.g. bots). @@ -544,6 +736,7 @@ impl RemoteSessionStateTracker { active_tools: Vec::new(), round_index: 0, active_items: Vec::new(), + persistence_dirty: true, }), event_tx, } @@ -564,14 +757,17 @@ impl RemoteSessionStateTracker { pub fn snapshot_active_turn(&self) -> Option { let s = self.state.read().unwrap(); + let has_items = !s.active_items.is_empty(); s.turn_id.as_ref().map(|tid| ActiveTurnSnapshot { turn_id: tid.clone(), status: s.turn_status.clone(), - text: s.accumulated_text.clone(), - thinking: s.accumulated_thinking.clone(), + // When items exist they already contain the text/thinking content. + // Skip the duplicate top-level fields to halve the payload. + text: if has_items { String::new() } else { s.accumulated_text.clone() }, + thinking: if has_items { String::new() } else { s.accumulated_thinking.clone() }, tools: s.active_tools.clone(), round_index: s.round_index, - items: if s.active_items.is_empty() { None } else { Some(s.active_items.clone()) }, + items: if has_items { Some(s.active_items.clone()) } else { None }, }) } @@ -583,6 +779,67 @@ impl RemoteSessionStateTracker { self.state.read().unwrap().title.clone() } + pub fn turn_status(&self) -> String { + self.state.read().unwrap().turn_status.clone() + } + + /// Returns true if the turn has ended (completed/failed/cancelled) but + /// the tracker state hasn't been cleaned up yet (waiting for persistence). + pub fn is_turn_finished(&self) -> bool { + let s = self.state.read().unwrap(); + s.turn_id.is_some() + && matches!( + s.turn_status.as_str(), + "completed" | "failed" | "cancelled" + ) + } + + /// Clear tracker state after the persisted historical message is confirmed + /// available. Called by the poll handler to complete the atomic transition. + pub fn finalize_completed_turn(&self) { + let mut s = self.state.write().unwrap(); + if matches!(s.turn_status.as_str(), "completed" | "failed" | "cancelled") { + s.turn_id = None; + s.accumulated_text.clear(); + s.accumulated_thinking.clear(); + s.active_tools.clear(); + s.active_items.clear(); + } + } + + /// Whether the persisted message list may have changed since the last + /// poll. Structural events (turn start / complete) set this flag; + /// streaming events (text / thinking chunks) do not. + pub fn is_persistence_dirty(&self) -> bool { + self.state.read().unwrap().persistence_dirty + } + + pub fn mark_persistence_clean(&self) { + self.state.write().unwrap().persistence_dirty = false; + } + + /// Find the last item of `target_type` with matching `subagent_marker` that + /// can be extended, skipping over the complementary text/thinking type. + /// Tool items act as boundaries — we never merge across tool items. + /// This mirrors the desktop's EventBatcher behaviour where text and thinking + /// accumulate independently within a single ModelRound. + fn find_mergeable_item( + items: &[ChatMessageItem], + target_type: &str, + subagent_marker: &Option, + ) -> Option { + for i in (0..items.len()).rev() { + let item = &items[i]; + if item.item_type == "tool" { + return None; + } + if item.item_type == target_type && &item.is_subagent == subagent_marker { + return Some(i); + } + } + None + } + fn upsert_active_tool( state: &mut TrackerState, tool_id: &str, @@ -590,6 +847,7 @@ impl RemoteSessionStateTracker { status: &str, input_preview: Option, tool_input: Option, + is_subagent: bool, ) { let resolved_id = if tool_id.is_empty() { format!("{}-{}", tool_name, state.active_tools.len()) @@ -597,6 +855,7 @@ impl RemoteSessionStateTracker { tool_id.to_string() }; let allow_name_fallback = tool_id.is_empty() && !tool_name.is_empty(); + let subagent_marker = if is_subagent { Some(true) } else { None }; if let Some(tool) = state .active_tools @@ -631,6 +890,7 @@ impl RemoteSessionStateTracker { item_type: "tool".to_string(), content: None, tool: Some(tool_status), + is_subagent: subagent_marker, }); return; } @@ -678,24 +938,22 @@ impl RemoteSessionStateTracker { match event { AE::TextChunk { text, .. } => { + let subagent_marker = if is_subagent { Some(true) } else { None }; let mut s = self.state.write().unwrap(); - s.accumulated_text.push_str(text); - if let Some(last) = s.active_items.last_mut() { - if last.item_type == "text" { - let c = last.content.get_or_insert_with(String::new); - c.push_str(text); - } else { - s.active_items.push(ChatMessageItem { - item_type: "text".to_string(), - content: Some(text.clone()), - tool: None, - }); - } + if !is_subagent { + s.accumulated_text.push_str(text); + } + let extend_idx = Self::find_mergeable_item(&s.active_items, "text", &subagent_marker); + if let Some(idx) = extend_idx { + let item = &mut s.active_items[idx]; + let c = item.content.get_or_insert_with(String::new); + c.push_str(text); } else { s.active_items.push(ChatMessageItem { item_type: "text".to_string(), content: Some(text.clone()), tool: None, + is_subagent: subagent_marker, }); } drop(s); @@ -707,24 +965,22 @@ impl RemoteSessionStateTracker { .replace("", "") .replace("", "") .replace("", ""); + let subagent_marker = if is_subagent { Some(true) } else { None }; let mut s = self.state.write().unwrap(); - s.accumulated_thinking.push_str(&clean); - if let Some(last) = s.active_items.last_mut() { - if last.item_type == "thinking" { - let c = last.content.get_or_insert_with(String::new); - c.push_str(&clean); - } else { - s.active_items.push(ChatMessageItem { - item_type: "thinking".to_string(), - content: Some(clean), - tool: None, - }); - } + if !is_subagent { + s.accumulated_thinking.push_str(&clean); + } + let extend_idx = Self::find_mergeable_item(&s.active_items, "thinking", &subagent_marker); + if let Some(idx) = extend_idx { + let item = &mut s.active_items[idx]; + let c = item.content.get_or_insert_with(String::new); + c.push_str(&clean); } else { s.active_items.push(ChatMessageItem { item_type: "thinking".to_string(), content: Some(clean), tool: None, + is_subagent: subagent_marker, }); } drop(s); @@ -763,18 +1019,14 @@ impl RemoteSessionStateTracker { "preparing", None, None, + is_subagent, ); } "ConfirmationNeeded" => { let params = val.get("params").cloned(); - let input_preview = params.as_ref().map(|v| { - let text = if v.is_string() { - v.as_str().unwrap_or_default().to_string() - } else { - serde_json::to_string(v).unwrap_or_default() - }; - text.chars().take(160).collect() - }); + let input_preview = params + .as_ref() + .and_then(|v| make_slim_params(v)); Self::upsert_active_tool( &mut s, &tool_id, @@ -782,19 +1034,18 @@ impl RemoteSessionStateTracker { "pending_confirmation", input_preview, params, + is_subagent, ); } "Started" => { let params = val.get("params").cloned(); - let input_preview = params.as_ref().map(|v| { - let text = if v.is_string() { - v.as_str().unwrap_or_default().to_string() - } else { - serde_json::to_string(v).unwrap_or_default() - }; - text.chars().take(160).collect() - }); - let tool_input = if tool_name == "AskUserQuestion" { + let input_preview = params + .as_ref() + .and_then(|v| make_slim_params(v)); + let tool_input = if tool_name == "AskUserQuestion" + || tool_name == "Task" + || tool_name == "TodoWrite" + { params.clone() } else { None @@ -806,6 +1057,7 @@ impl RemoteSessionStateTracker { "running", input_preview, tool_input, + is_subagent, ); let _ = self.event_tx.send(TrackerEvent::ToolStarted { tool_id: tool_id.clone(), @@ -821,6 +1073,7 @@ impl RemoteSessionStateTracker { "confirmed", None, None, + is_subagent, ); } "Rejected" => { @@ -831,6 +1084,7 @@ impl RemoteSessionStateTracker { "rejected", None, None, + is_subagent, ); } "Completed" | "Succeeded" => { @@ -923,18 +1177,15 @@ impl RemoteSessionStateTracker { s.active_items.clear(); s.round_index = 0; s.session_state = "running".to_string(); + s.persistence_dirty = true; drop(s); self.bump_version(); } AE::DialogTurnCompleted { .. } if is_direct => { let mut s = self.state.write().unwrap(); s.turn_status = "completed".to_string(); - s.turn_id = None; - s.accumulated_text.clear(); - s.accumulated_thinking.clear(); - s.active_tools.clear(); - s.active_items.clear(); s.session_state = "idle".to_string(); + s.persistence_dirty = true; drop(s); self.bump_version(); let _ = self.event_tx.send(TrackerEvent::TurnCompleted); @@ -942,8 +1193,8 @@ impl RemoteSessionStateTracker { AE::DialogTurnFailed { error, .. } if is_direct => { let mut s = self.state.write().unwrap(); s.turn_status = "failed".to_string(); - s.turn_id = None; s.session_state = "idle".to_string(); + s.persistence_dirty = true; drop(s); self.bump_version(); let _ = self.event_tx.send(TrackerEvent::TurnFailed(error.clone())); @@ -951,8 +1202,8 @@ impl RemoteSessionStateTracker { AE::DialogTurnCancelled { .. } if is_direct => { let mut s = self.state.write().unwrap(); s.turn_status = "cancelled".to_string(); - s.turn_id = None; s.session_state = "idle".to_string(); + s.persistence_dirty = true; drop(s); self.bump_version(); let _ = self.event_tx.send(TrackerEvent::TurnCancelled); @@ -1052,12 +1303,14 @@ impl RemoteExecutionDispatcher { /// Dispatch a SendMessage command: ensure tracker, restore session, start dialog turn. /// Returns `(session_id, turn_id)` on success. /// If `turn_id` is `None`, one is auto-generated. + /// + /// All platforms (desktop, mobile, bot) use the same `ImageContextData` format. pub async fn send_message( &self, session_id: &str, content: String, agent_type: Option<&str>, - images: Option<&Vec>, + image_contexts: Vec, trigger_source: crate::agentic::coordination::DialogTriggerSource, turn_id: Option, ) -> std::result::Result<(String, String), String> { @@ -1078,22 +1331,33 @@ impl RemoteExecutionDispatcher { .map(|t| resolve_agent_type(Some(t)).to_string()) .unwrap_or_else(|| "agentic".to_string()); - let full_content = images - .map(|imgs| build_message_with_remote_images(&content, imgs)) - .unwrap_or_else(|| content.clone()); - let turn_id = turn_id.unwrap_or_else(|| format!("turn_{}", chrono::Utc::now().timestamp_millis())); - coordinator - .start_dialog_turn( - session_id.to_string(), - full_content, - Some(turn_id.clone()), - resolved_agent_type, - trigger_source, - ) - .await - .map_err(|e| e.to_string())?; + + if image_contexts.is_empty() { + coordinator + .start_dialog_turn( + session_id.to_string(), + content.clone(), + Some(turn_id.clone()), + resolved_agent_type, + trigger_source, + ) + .await + .map_err(|e| e.to_string())?; + } else { + coordinator + .start_dialog_turn_with_image_contexts( + session_id.to_string(), + content.clone(), + image_contexts, + Some(turn_id.clone()), + resolved_agent_type, + trigger_source, + ) + .await + .map_err(|e| e.to_string())?; + } Ok((session_id.to_string(), turn_id)) } @@ -1314,6 +1578,26 @@ impl RemoteServer { }; } + // Fast path: during active streaming, only the real-time snapshot + // changes — persisted messages stay the same. Skip the expensive + // disk read and return just the snapshot. + let needs_persistence = *since_version == 0 || tracker.is_persistence_dirty(); + + if !needs_persistence { + let active_turn = tracker.snapshot_active_turn(); + let sess_state = tracker.session_state(); + let title = tracker.title(); + return RemoteResponse::SessionPoll { + version: current_version, + changed: true, + session_state: Some(sess_state), + title: if title.is_empty() { None } else { Some(title) }, + new_messages: None, + total_msg_count: None, + active_turn, + }; + } + let (all_chat_msgs, _) = load_chat_messages_from_conversation_persistence(session_id).await; let total_msg_count = all_chat_msgs.len(); @@ -1321,35 +1605,48 @@ impl RemoteServer { let new_messages: Vec = all_chat_msgs.into_iter().skip(skip).collect(); - let active_turn = tracker.snapshot_active_turn(); + let turn_finished = tracker.is_turn_finished(); + let has_assistant_msg = new_messages.iter().any(|m| m.role == "assistant"); + + let active_turn = if turn_finished && has_assistant_msg { + tracker.finalize_completed_turn(); + None + } else if turn_finished { + let ts = tracker.turn_status(); + if ts == "completed" { + tracker.snapshot_active_turn() + } else { + tracker.finalize_completed_turn(); + tracker.mark_persistence_clean(); + None + } + } else { + tracker.snapshot_active_turn() + }; + + let (send_msgs, send_total) = if turn_finished && !has_assistant_msg { + // Turn is finished but disk doesn't have the completed assistant + // message yet — the frontend's immediateSaveDialogTurn hasn't + // landed. Don't send partial data; the snapshot overlay keeps the + // user informed. Next poll will re-read from disk. + (None, None) + } else { + if !new_messages.is_empty() { + tracker.mark_persistence_clean(); + } + (Some(new_messages), Some(total_msg_count)) + }; + let sess_state = tracker.session_state(); let title = tracker.title(); - let active_turn_ask_tool_ids = active_turn - .as_ref() - .map(|turn| { - turn.tools - .iter() - .filter(|tool| tool.name == "AskUserQuestion") - .map(|tool| tool.id.clone()) - .collect::>() - }) - .unwrap_or_default(); - let new_message_ask_tool_ids = new_messages - .iter() - .flat_map(|message| message.items.iter().flatten()) - .filter_map(|item| item.tool.as_ref()) - .filter(|tool| tool.name == "AskUserQuestion") - .map(|tool| tool.id.clone()) - .collect::>(); - RemoteResponse::SessionPoll { version: current_version, changed: true, session_state: Some(sess_state), title: if title.is_empty() { None } else { Some(title) }, - new_messages: Some(new_messages), - total_msg_count: Some(total_msg_count), + new_messages: send_msgs, + total_msg_count: send_total, active_turn, } } @@ -1664,18 +1961,23 @@ impl RemoteServer { content, agent_type: requested_agent_type, images, + image_contexts, } => { + // Unified: prefer image_contexts (new format), fall back to legacy images + let resolved_contexts = image_contexts.clone().unwrap_or_else(|| { + images_to_contexts(images.as_ref()) + }); info!( - "Remote send_message: session={session_id}, agent_type={}, images={}", + "Remote send_message: session={session_id}, agent_type={}, image_contexts={}", requested_agent_type.as_deref().unwrap_or("agentic"), - images.as_ref().map_or(0, |v| v.len()) + resolved_contexts.len() ); match dispatcher .send_message( session_id, content.clone(), requested_agent_type.as_deref(), - images.as_ref(), + resolved_contexts, DialogTriggerSource::RemoteRelay, None, ) diff --git a/src/crates/events/src/agentic.rs b/src/crates/events/src/agentic.rs index fda73528..0bbf88c3 100644 --- a/src/crates/events/src/agentic.rs +++ b/src/crates/events/src/agentic.rs @@ -46,11 +46,29 @@ pub enum AgenticEvent { title: String, method: String, }, + ImageAnalysisStarted { + session_id: String, + image_count: usize, + user_input: String, + /// Image metadata JSON for UI rendering (same as DialogTurnStarted) + image_metadata: Option, + }, + + ImageAnalysisCompleted { + session_id: String, + success: bool, + duration_ms: u64, + }, + DialogTurnStarted { session_id: String, turn_id: String, turn_index: usize, user_input: String, + /// Original user input before vision enhancement (for display on all clients) + original_user_input: Option, + /// Image metadata JSON for UI rendering (id, name, data_url, mime_type, image_path) + user_message_metadata: Option, subagent_parent_info: Option, }, @@ -287,6 +305,8 @@ impl AgenticEvent { | Self::SessionStateChanged { session_id, .. } | Self::SessionDeleted { session_id } | Self::SessionTitleGenerated { session_id, .. } + | Self::ImageAnalysisStarted { session_id, .. } + | Self::ImageAnalysisCompleted { session_id, .. } | Self::DialogTurnStarted { session_id, .. } | Self::DialogTurnCompleted { session_id, .. } | Self::TokenUsageUpdated { session_id, .. } @@ -316,7 +336,9 @@ impl AgenticEvent { | Self::DialogTurnCompleted { .. } | Self::ContextCompressionFailed { .. } => AgenticEventPriority::High, - Self::TextChunk { .. } + Self::ImageAnalysisStarted { .. } + | Self::ImageAnalysisCompleted { .. } + | Self::TextChunk { .. } | Self::ThinkingChunk { .. } | Self::ToolEvent { .. } | Self::ModelRoundStarted { .. } diff --git a/src/crates/transport/src/adapters/tauri.rs b/src/crates/transport/src/adapters/tauri.rs index d297032f..d742f68e 100644 --- a/src/crates/transport/src/adapters/tauri.rs +++ b/src/crates/transport/src/adapters/tauri.rs @@ -54,12 +54,29 @@ impl TransportAdapter for TauriTransportAdapter { "sessionId": session_id, }))?; } - AgenticEvent::DialogTurnStarted { session_id, turn_id, turn_index, user_input, subagent_parent_info } => { + AgenticEvent::ImageAnalysisStarted { session_id, image_count, user_input, image_metadata } => { + self.app_handle.emit("agentic://image-analysis-started", json!({ + "sessionId": session_id, + "imageCount": image_count, + "userInput": user_input, + "imageMetadata": image_metadata, + }))?; + } + AgenticEvent::ImageAnalysisCompleted { session_id, success, duration_ms } => { + self.app_handle.emit("agentic://image-analysis-completed", json!({ + "sessionId": session_id, + "success": success, + "durationMs": duration_ms, + }))?; + } + AgenticEvent::DialogTurnStarted { session_id, turn_id, turn_index, user_input, original_user_input, user_message_metadata, subagent_parent_info } => { self.app_handle.emit("agentic://dialog-turn-started", json!({ "sessionId": session_id, "turnId": turn_id, "turnIndex": turn_index, "userInput": user_input, + "originalUserInput": original_user_input, + "userMessageMetadata": user_message_metadata, "subagentParentInfo": subagent_parent_info, }))?; } diff --git a/src/crates/transport/src/adapters/websocket.rs b/src/crates/transport/src/adapters/websocket.rs index a3723dc5..892afb0c 100644 --- a/src/crates/transport/src/adapters/websocket.rs +++ b/src/crates/transport/src/adapters/websocket.rs @@ -51,12 +51,31 @@ impl fmt::Debug for WebSocketTransportAdapter { impl TransportAdapter for WebSocketTransportAdapter { async fn emit_event(&self, _session_id: &str, event: AgenticEvent) -> anyhow::Result<()> { let message = match event { - AgenticEvent::DialogTurnStarted { session_id, turn_id, turn_index, .. } => { + AgenticEvent::ImageAnalysisStarted { session_id, image_count, user_input, image_metadata } => { + json!({ + "type": "image-analysis-started", + "sessionId": session_id, + "imageCount": image_count, + "userInput": user_input, + "imageMetadata": image_metadata, + }) + } + AgenticEvent::ImageAnalysisCompleted { session_id, success, duration_ms } => { + json!({ + "type": "image-analysis-completed", + "sessionId": session_id, + "success": success, + "durationMs": duration_ms, + }) + } + AgenticEvent::DialogTurnStarted { session_id, turn_id, turn_index, original_user_input, user_message_metadata, .. } => { json!({ "type": "dialog-turn-started", "sessionId": session_id, "turnId": turn_id, "turnIndex": turn_index, + "originalUserInput": original_user_input, + "userMessageMetadata": user_message_metadata, }) } AgenticEvent::ModelRoundStarted { session_id, turn_id, round_id, .. } => { diff --git a/src/mobile-web/index.html b/src/mobile-web/index.html index ca443af8..c3d9099e 100644 --- a/src/mobile-web/index.html +++ b/src/mobile-web/index.html @@ -4,10 +4,16 @@ + BitFun Remote diff --git a/src/mobile-web/src/App.tsx b/src/mobile-web/src/App.tsx index b4b893c1..5a596cd0 100644 --- a/src/mobile-web/src/App.tsx +++ b/src/mobile-web/src/App.tsx @@ -1,4 +1,4 @@ -import React, { useState, useCallback, useRef } from 'react'; +import React, { useState, useCallback, useRef, useEffect } from 'react'; import PairingPage from './pages/PairingPage'; import WorkspacePage from './pages/WorkspacePage'; import SessionListPage from './pages/SessionListPage'; @@ -9,14 +9,53 @@ import { ThemeProvider } from './theme'; import './styles/index.scss'; type Page = 'pairing' | 'workspace' | 'sessions' | 'chat'; +type NavDirection = 'push' | 'pop' | null; + +const NAV_PUSH_DURATION = 350; +const NAV_POP_DURATION = 250; + +function getNavClass( + targetPage: Page, + currentPage: Page, + navDir: NavDirection, + isAnimating: boolean, +): string { + if (!isAnimating) return ''; + const isEntering = currentPage === targetPage; + if (isEntering) { + return navDir === 'push' ? 'nav-push-enter' : 'nav-pop-enter'; + } + return navDir === 'push' ? 'nav-push-exit' : 'nav-pop-exit'; +} const AppContent: React.FC = () => { const [page, setPage] = useState('pairing'); const [activeSessionId, setActiveSessionId] = useState(null); const [activeSessionName, setActiveSessionName] = useState('Session'); + const [chatAutoFocus, setChatAutoFocus] = useState(false); const clientRef = useRef(null); const sessionMgrRef = useRef(null); + const [navDir, setNavDir] = useState(null); + const [prevPage, setPrevPage] = useState(null); + const timerRef = useRef>(); + + const navigateTo = useCallback((target: Page, direction: NavDirection) => { + setPage(prev => { + setPrevPage(prev); + return target; + }); + setNavDir(direction); + clearTimeout(timerRef.current); + const duration = direction === 'pop' ? NAV_POP_DURATION : NAV_PUSH_DURATION; + timerRef.current = setTimeout(() => { + setPrevPage(null); + setNavDir(null); + }, duration); + }, []); + + useEffect(() => () => clearTimeout(timerRef.current), []); + const handlePaired = useCallback( (client: RelayHttpClient, sessionMgr: RemoteSessionManager) => { clientRef.current = client; @@ -27,47 +66,60 @@ const AppContent: React.FC = () => { ); const handleOpenWorkspace = useCallback(() => { - setPage('workspace'); - }, []); + navigateTo('workspace', 'push'); + }, [navigateTo]); const handleWorkspaceReady = useCallback(() => { - setPage('sessions'); - }, []); + navigateTo('sessions', 'pop'); + }, [navigateTo]); - const handleSelectSession = useCallback((sessionId: string, sessionName?: string) => { + const handleSelectSession = useCallback((sessionId: string, sessionName?: string, isNew?: boolean) => { setActiveSessionId(sessionId); setActiveSessionName(sessionName || 'Session'); - setPage('chat'); - }, []); + setChatAutoFocus(!!isNew); + navigateTo('chat', 'push'); + }, [navigateTo]); const handleBackToSessions = useCallback(() => { - setActiveSessionId(null); - setPage('sessions'); - }, []); + navigateTo('sessions', 'pop'); + setTimeout(() => setActiveSessionId(null), NAV_POP_DURATION); + }, [navigateTo]); + + const isAnimating = navDir !== null; + const currentPage: Page = page; + + const shouldShow = (p: Page) => currentPage === p || (isAnimating && prevPage === p); return (
{page === 'pairing' && } - {page === 'workspace' && sessionMgrRef.current && ( - + {shouldShow('workspace') && sessionMgrRef.current && ( +
+ +
)} - {page === 'sessions' && sessionMgrRef.current && ( - + {shouldShow('sessions') && sessionMgrRef.current && ( +
+ +
)} - {page === 'chat' && sessionMgrRef.current && activeSessionId && ( - + {shouldShow('chat') && sessionMgrRef.current && activeSessionId && ( +
+ +
)}
); diff --git a/src/mobile-web/src/assets/Logo-ICON.png b/src/mobile-web/src/assets/Logo-ICON.png new file mode 100644 index 00000000..e250ea97 Binary files /dev/null and b/src/mobile-web/src/assets/Logo-ICON.png differ diff --git a/src/mobile-web/src/pages/ChatPage.tsx b/src/mobile-web/src/pages/ChatPage.tsx index bfd1824d..eaae6b63 100644 --- a/src/mobile-web/src/pages/ChatPage.tsx +++ b/src/mobile-web/src/pages/ChatPage.tsx @@ -2,7 +2,7 @@ import React, { useEffect, useRef, useState, useCallback, useMemo } from 'react' import ReactMarkdown from 'react-markdown'; import remarkGfm from 'remark-gfm'; import { Prism as SyntaxHighlighter } from 'react-syntax-highlighter'; -import { oneDark } from 'react-syntax-highlighter/dist/esm/styles/prism'; +import { vscDarkPlus, vs } from 'react-syntax-highlighter/dist/esm/styles/prism'; import { RemoteSessionManager, SessionPoller, @@ -20,32 +20,143 @@ interface ChatPageProps { sessionId: string; sessionName?: string; onBack: () => void; + autoFocus?: boolean; } // ─── Markdown ─────────────────────────────────────────────────────────────── -const MarkdownContent: React.FC<{ content: string }> = ({ content }) => ( - - {codeStr} - - ) : ( - +function formatDuration(ms: number): string { + if (ms < 1000) return `${Math.round(ms)}ms`; + return `${(ms / 1000).toFixed(1)}s`; +} + +function truncateMiddle(str: string, maxLen: number): string { + if (!str || str.length <= maxLen) return str; + const keep = maxLen - 3; + const head = Math.ceil(keep * 0.6); + const tail = keep - head; + return str.slice(0, head) + '...' + str.slice(-tail); +} + +function copyToClipboard(text: string): Promise { + if (navigator.clipboard?.writeText) { + return navigator.clipboard.writeText(text); + } + // Fallback for insecure contexts (HTTP) + const ta = document.createElement('textarea'); + ta.value = text; + ta.style.cssText = 'position:fixed;left:-9999px;top:-9999px;opacity:0'; + document.body.appendChild(ta); + ta.select(); + try { + document.execCommand('copy'); + } finally { + document.body.removeChild(ta); + } + return Promise.resolve(); +} + +const CopyButton: React.FC<{ code: string }> = ({ code }) => { + const [copied, setCopied] = useState(false); + + const handleCopy = async () => { + try { + await copyToClipboard(code); + setCopied(true); + setTimeout(() => setCopied(false), 2000); + } catch { /* ignore */ } + }; + + return ( + + ); +}; + +const MarkdownContent: React.FC<{ content: string }> = ({ content }) => { + const { isDark } = useTheme(); + const syntaxTheme = isDark ? vscDarkPlus : vs; + + const components: React.ComponentProps['components'] = useMemo(() => ({ + code({ className, children, ...props }: any) { + const match = /language-(\w+)/.exec(className || ''); + const codeStr = String(children).replace(/\n$/, ''); + const hasMultipleLines = codeStr.includes('\n'); + const isCodeBlock = className?.startsWith('language-') || hasMultipleLines; + + if (!isCodeBlock) { + return ( + {children} ); - }, - }} - > - {content} - -); + } + + return ( +
+ + + {codeStr} + +
+ ); + }, + + table({ children }: any) { + return ( +
+ {children}
+
+ ); + }, + + blockquote({ children }: any) { + return
{children}
; + }, + }), [syntaxTheme, isDark]); + + return ( + + {content} + + ); +}; // ─── Thinking (ModelThinkingDisplay-style) ─────────────────────────────────── @@ -68,7 +179,7 @@ const ThinkingBlock: React.FC<{ thinking: string; streaming?: boolean }> = ({ th const charCount = thinking.length; const label = streaming && charCount === 0 ? 'Thinking...' - : `Thought for ${charCount} characters`; + : `Thought ${charCount} characters`; return (
@@ -112,12 +223,312 @@ const TOOL_TYPE_MAP: Record = { grep: 'Grep', create_file: 'Write', delete_file: 'Delete', - execute_subagent: 'Task', + Task: 'Task', search: 'Search', edit_file: 'Edit', web_search: 'Web', + TodoWrite: 'Todo', +}; + +// ─── TodoWrite card ───────────────────────────────────────────────────────── + +const TodoCard: React.FC<{ tool: RemoteToolStatus }> = ({ tool }) => { + const [expanded, setExpanded] = useState(false); + + const todos: { id?: string; content: string; status: string }[] = useMemo(() => { + const src = tool.tool_input; + if (!src) return []; + const arr = src.todos || src.result?.todos; + return Array.isArray(arr) ? arr : []; + }, [tool.tool_input]); + + if (todos.length === 0) return null; + + const completed = todos.filter(t => t.status === 'completed').length; + const allDone = completed === todos.length; + const inProgress = todos.find(t => t.status === 'in_progress'); + + const statusIcon = (s: string) => { + switch (s) { + case 'completed': + return ; + case 'in_progress': + return ; + case 'cancelled': + return ; + default: + return ; + } + }; + + return ( +
+
setExpanded(!expanded)}> + + + + + + {allDone && !expanded ? ( + All tasks completed + ) : inProgress && !expanded ? ( + {inProgress.content} + ) : null} + + + {todos.map((t, i) => ( + + ))} + + {completed}/{todos.length} + + + + +
+ {expanded && ( +
+ {todos.map((t, i) => ( +
+ {statusIcon(t.status)} + {t.content} +
+ ))} +
+ )} +
+ ); +}; + +/** + * Extract task description and agent type from execute_subagent tool data. + * Prefers tool_input (full JSON from backend), falls back to input_preview (truncated). + */ +function parseTaskInfo(tool: RemoteToolStatus): { description?: string; agentType?: string } | null { + const source = tool.tool_input ?? (() => { + try { return JSON.parse(tool.input_preview || ''); } catch { return null; } + })(); + if (!source) return null; + return { + description: source.description, + agentType: source.subagent_type, + }; +} + +/** + * Summarize a subItem for display inside a Task card. + */ +function subItemLabel(item: ChatMessageItem): string { + if (item.type === 'thinking') { + const len = (item.content || '').length; + return `Thought ${len} characters`; + } + if (item.type === 'tool' && item.tool) { + const t = item.tool; + const preview = t.input_preview ? `: ${t.input_preview}` : ''; + return `${t.name}${preview}`; + } + if (item.type === 'text') { + const len = (item.content || '').length; + return `Text ${len} characters`; + } + return ''; +} + +const TaskToolCard: React.FC<{ + tool: RemoteToolStatus; + now: number; + subItems?: ChatMessageItem[]; + onCancelTool?: (toolId: string) => void; +}> = ({ tool, now, subItems = [], onCancelTool }) => { + const scrollRef = useRef(null); + const prevCountRef = useRef(0); + const [stepsExpanded, setStepsExpanded] = useState(false); + const isRunning = tool.status === 'running'; + const isCompleted = tool.status === 'completed'; + const isError = tool.status === 'failed' || tool.status === 'error'; + const showCancel = isRunning && !!onCancelTool; + const taskInfo = parseTaskInfo(tool); + + const durationLabel = isCompleted && tool.duration_ms != null + ? formatDuration(tool.duration_ms) + : isRunning && tool.start_ms + ? formatDuration(now - tool.start_ms) + : ''; + + const statusClass = isRunning ? 'running' : isCompleted ? 'done' : isError ? 'error' : 'pending'; + + const subTools = subItems.filter(i => i.type === 'tool' && i.tool); + const subToolsDone = subTools.filter(i => i.tool!.status === 'completed').length; + const subToolsRunning = subTools.filter(i => i.tool!.status === 'running').length; + + useEffect(() => { + if (stepsExpanded && subItems.length > prevCountRef.current && scrollRef.current) { + scrollRef.current.scrollTop = scrollRef.current.scrollHeight; + } + prevCountRef.current = subItems.length; + }, [subItems.length, stepsExpanded]); + + return ( +
+
+ + {isRunning ? ( + + ) : isCompleted ? ( + + + + ) : isError ? ( + + + + ) : ( + + )} + + + {taskInfo?.description || 'Task'} + + {taskInfo?.agentType && ( + {taskInfo.agentType} + )} + {durationLabel && ( + {durationLabel} + )} + {showCancel && ( + + )} +
+ + {subItems.length > 0 && ( + <> +
setStepsExpanded(e => !e)}> + + {subTools.length} tool call{subTools.length === 1 ? '' : 's'} + + + {subToolsDone} done + {subToolsRunning > 0 && {subToolsRunning} running} + + + + +
+ {stepsExpanded && ( +
+ {subItems.map((item, idx) => { + if (item.type === 'thinking') { + return ( +
+ + {subItemLabel(item)} +
+ ); + } + if (item.type === 'tool' && item.tool) { + const t = item.tool; + const isDone = t.status === 'completed'; + const isErr = t.status === 'failed' || t.status === 'error'; + return ( +
+ {isDone ? ( + + ) : isErr ? ( + + ) : ( + + )} + {t.name} + {(() => { + const p = getToolPreview(t); + return p ? {p} : null; + })()} + {isDone && t.duration_ms != null && ( + {formatDuration(t.duration_ms)} + )} +
+ ); + } + return null; + })} +
+ )} + + )} +
+ ); }; +/** + * Parse tool input_preview (slim JSON from backend) and extract a concise display text. + * Backend sends valid JSON with large fields stripped; frontend extracts the key field + * and truncates the resulting plain text. + */ +function getToolPreview(tool: RemoteToolStatus): string | null { + if (!tool.input_preview) return null; + try { + const params = JSON.parse(tool.input_preview); + if (!params || typeof params !== 'object') return null; + + const lastSegment = (p: string) => { + const parts = p.replace(/\\/g, '/').split('/'); + return parts[parts.length - 1] || p; + }; + + let result: string | null = null; + + const pathVal = params.file_path || params.path; + switch (tool.name) { + case 'Read': + case 'Write': + case 'Edit': + case 'LS': + case 'StrReplace': + case 'delete_file': + result = pathVal ? lastSegment(pathVal) : null; + break; + case 'Glob': + case 'Grep': + result = params.pattern || null; + break; + case 'Bash': + case 'Shell': + result = params.description || params.command || null; + break; + case 'web_search': + case 'WebSearch': + result = params.search_term || params.query || null; + break; + case 'WebFetch': + result = params.url || null; + break; + case 'SemanticSearch': + result = params.query || null; + break; + default: { + const first = Object.values(params).find( + (v): v is string => typeof v === 'string' && v.length > 0 && v.length < 80, + ); + result = first || null; + } + } + + if (!result) return null; + return result.length > 60 ? result.slice(0, 60) + '…' : result; + } catch { + return null; + } +} + const ToolCard: React.FC<{ tool: RemoteToolStatus; now: number; @@ -127,15 +538,17 @@ const ToolCard: React.FC<{ const typeLabel = TOOL_TYPE_MAP[toolKey] || TOOL_TYPE_MAP[tool.name] || 'Tool'; const isRunning = tool.status === 'running'; const isCompleted = tool.status === 'completed'; + const isError = tool.status === 'failed' || tool.status === 'error'; const showCancel = isRunning && !!onCancelTool; + const preview = getToolPreview(tool); const durationLabel = isCompleted && tool.duration_ms != null - ? `${(tool.duration_ms / 1000).toFixed(1)}s` + ? formatDuration(tool.duration_ms) : isRunning && tool.start_ms - ? `${((now - tool.start_ms) / 1000).toFixed(1)}s` + ? formatDuration(now - tool.start_ms) : ''; - const statusClass = isRunning ? 'running' : isCompleted ? 'done' : 'error'; + const statusClass = isRunning ? 'running' : isCompleted ? 'done' : isError ? 'error' : 'pending'; return (
@@ -147,21 +560,72 @@ const ToolCard: React.FC<{ - ) : ( + ) : isError ? ( + ) : ( + )} - {tool.name} + + {tool.name} + {preview && {preview}} + {typeLabel} {durationLabel && ( {durationLabel} )} + {showCancel && ( + + )}
- {showCancel && ( -
- +
+ ); +}; + +const READ_LIKE_TOOLS = new Set(['Read', 'Grep', 'Glob', 'SemanticSearch']); + +const ReadFilesToggle: React.FC<{ tools: RemoteToolStatus[] }> = ({ tools }) => { + const [open, setOpen] = useState(false); + if (tools.length === 0) return null; + + const doneCount = tools.filter(t => t.status === 'completed').length; + const allDone = doneCount === tools.length; + const label = allDone + ? `Read ${tools.length} file${tools.length === 1 ? '' : 's'}` + : `Reading ${tools.length} file${tools.length === 1 ? '' : 's'} (${doneCount} done)`; + + return ( +
+ + {open && ( +
+
+ {tools.map(t => { + const preview = t.input_preview || ''; + return ( +
+ {t.status === 'completed' ? '✓' : '⋯'} {t.name} {preview} +
+ ); + })} +
)}
@@ -177,13 +641,14 @@ const ToolList: React.FC<{ }> = ({ tools, now, onCancelTool }) => { const scrollRef = useRef(null); const prevCountRef = useRef(0); + const [expanded, setExpanded] = useState(false); useEffect(() => { - if (tools.length > prevCountRef.current && scrollRef.current) { + if (expanded && tools.length > prevCountRef.current && scrollRef.current) { scrollRef.current.scrollTop = scrollRef.current.scrollHeight; } prevCountRef.current = tools.length; - }, [tools.length]); + }, [tools.length, expanded]); if (!tools || tools.length === 0) return null; @@ -202,18 +667,23 @@ const ToolList: React.FC<{ return (
-
- {tools.length} tool calls +
setExpanded(e => !e)}> + {tools.length} tool call{tools.length === 1 ? '' : 's'} {doneCount > 0 && {doneCount} done} {runningCount > 0 && {runningCount} running} + + +
-
- {tools.map((tc) => ( - - ))} -
+ {expanded && ( +
+ {tools.map((tc) => ( + + ))} +
+ )}
); }; @@ -226,6 +696,75 @@ const TypingDots: React.FC = () => ( ); +// ─── Typewriter effect (pseudo-streaming) ────────────────────────────────── + +function useTypewriter(targetText: string, animate: boolean): string { + const [displayText, setDisplayText] = useState(animate ? '' : targetText); + const revealedRef = useRef(animate ? 0 : targetText.length); + const targetRef = useRef(targetText); + const timerRef = useRef | null>(null); + const speedRef = useRef(3); + + useEffect(() => { + if (!animate) { + if (timerRef.current) { + clearInterval(timerRef.current); + timerRef.current = null; + } + revealedRef.current = targetText.length; + targetRef.current = targetText; + setDisplayText(targetText); + return; + } + + targetRef.current = targetText; + + if (targetText.length < revealedRef.current) { + revealedRef.current = 0; + } + + const delta = targetText.length - revealedRef.current; + if (delta > 0) { + const FRAME_INTERVAL = 30; + const REVEAL_DURATION = 800; + const totalFrames = REVEAL_DURATION / FRAME_INTERVAL; + speedRef.current = Math.max(Math.ceil(delta / totalFrames), 2); + + if (!timerRef.current) { + timerRef.current = setInterval(() => { + const target = targetRef.current; + const cur = revealedRef.current; + if (cur >= target.length) { + if (timerRef.current) { + clearInterval(timerRef.current); + timerRef.current = null; + } + return; + } + const next = Math.min(cur + speedRef.current, target.length); + revealedRef.current = next; + setDisplayText(target.slice(0, next)); + }, FRAME_INTERVAL); + } + } + }, [targetText, animate]); + + useEffect(() => { + return () => { + if (timerRef.current) { + clearInterval(timerRef.current); + } + }; + }, []); + + return displayText; +} + +const TypewriterText: React.FC<{ content: string }> = ({ content }) => { + const displayText = useTypewriter(content, true); + return ; +}; + // ─── AskUserQuestion Card ───────────────────────────────────────────────── interface AskQuestionCardProps { @@ -392,6 +931,43 @@ const AskQuestionCard: React.FC = ({ tool, onAnswer }) => ); }; +/** + * Collect subagent internal items into the Task item's subItems field. + * When a Task tool appears, all subsequent items until the next non-subagent + * item (or a completed Task) are its internal output. We attach them as + * subItems on the Task ChatMessageItem for nested rendering. + */ +function filterSubagentItems(items: ChatMessageItem[]): ChatMessageItem[] { + const result: ChatMessageItem[] = []; + let currentTaskItem: ChatMessageItem | null = null; + + for (const item of items) { + if (item.type === 'tool' && item.tool?.name === 'Task') { + const taskCopy: ChatMessageItem = { ...item, subItems: [] }; + result.push(taskCopy); + currentTaskItem = taskCopy; + continue; + } + + if (item.is_subagent && currentTaskItem) { + currentTaskItem.subItems!.push(item); + continue; + } + + if (item.is_subagent) { + continue; + } + + // Don't reset currentTaskItem — when the agent calls tools in + // parallel (e.g. Explore + 3 Reads), direct tools interleave with + // the subagent's internal tools. Keeping currentTaskItem alive + // ensures later is_subagent items still get grouped correctly. + result.push(item); + } + + return result; +} + function groupChatItems(items: ChatMessageItem[]) { const groups: { type: string; entries: ChatMessageItem[] }[] = []; for (const item of items) { @@ -425,6 +1001,7 @@ function renderStandardGroups( keyPrefix: string, now: number, onCancelTool?: (toolId: string) => void, + animate?: boolean, ) { return groups.map((g, gi) => { if (g.type === 'thinking') { @@ -432,14 +1009,56 @@ function renderStandardGroups( return ; } if (g.type === 'tool') { - const tools = g.entries.map(e => e.tool!).filter(Boolean); - return ; + const rendered: React.ReactNode[] = []; + let regularBuf: RemoteToolStatus[] = []; + let readBuf: RemoteToolStatus[] = []; + + const flushRead = () => { + if (readBuf.length > 0) { + rendered.push( + , + ); + readBuf = []; + } + }; + + const flushRegular = () => { + if (regularBuf.length > 0) { + rendered.push( + , + ); + regularBuf = []; + } + }; + + const flushAll = () => { flushRead(); flushRegular(); }; + + for (const entry of g.entries) { + if (entry.tool?.name === 'Task') { + flushAll(); + rendered.push( + , + ); + } else if (entry.tool?.name === 'TodoWrite') { + flushAll(); + rendered.push(); + } else if (entry.tool && READ_LIKE_TOOLS.has(entry.tool.name)) { + flushRegular(); + readBuf.push(entry.tool); + } else if (entry.tool) { + flushRead(); + regularBuf.push(entry.tool); + } + } + flushAll(); + + return {rendered}; } if (g.type === 'text') { const text = g.entries.map(e => e.content || '').join(''); return text ? (
- + {animate ? : }
) : null; } @@ -450,11 +1069,12 @@ function renderStandardGroups( // ─── Ordered Items renderer ───────────────────────────────────────────────── function renderOrderedItems( - items: ChatMessageItem[], + rawItems: ChatMessageItem[], now: number, onCancelTool?: (toolId: string) => void, onAnswer?: (toolId: string, answers: any) => Promise, ) { + const items = filterSubagentItems(rawItems); const askEntries = items.filter(item => isPendingAskUserQuestion(item.tool)); if (askEntries.length === 0) { return renderStandardGroups(groupChatItems(items), 'ordered', now, onCancelTool); @@ -485,19 +1105,20 @@ function renderOrderedItems( // ─── Active turn items renderer (with AskUserQuestion support) ───────────── function renderActiveTurnItems( - items: ChatMessageItem[], + rawItems: ChatMessageItem[], now: number, sessionMgr: RemoteSessionManager, setError: (e: string) => void, onAnswer: (toolId: string, answers: any) => Promise, ) { + const items = filterSubagentItems(rawItems); const askEntries = items.filter(item => isPendingAskUserQuestion(item.tool)); const onCancel = (toolId: string) => { sessionMgr.cancelTool(toolId, 'User cancelled').catch(err => { setError(String(err)); }); }; if (askEntries.length === 0) { - return renderStandardGroups(groupChatItems(items), 'active', now, onCancel); + return renderStandardGroups(groupChatItems(items), 'active', now, onCancel, true); } const beforeAskItems: ChatMessageItem[] = []; @@ -515,9 +1136,9 @@ function renderActiveTurnItems( return ( <> - {renderStandardGroups(groupChatItems(beforeAskItems), 'active-before', now, onCancel)} + {renderStandardGroups(groupChatItems(beforeAskItems), 'active-before', now, onCancel, true)} {renderQuestionEntries(askEntries, 'active', onAnswer)} - {renderStandardGroups(groupChatItems(afterAskItems), 'active-after', now, onCancel)} + {renderStandardGroups(groupChatItems(afterAskItems), 'active-after', now, onCancel, true)} ); } @@ -525,7 +1146,7 @@ function renderActiveTurnItems( // ─── Theme toggle icon ───────────────────────────────────────────────────── const ThemeToggleIcon: React.FC<{ isDark: boolean }> = ({ isDark }) => ( - + {isDark ? ( ) : ( @@ -546,13 +1167,14 @@ const MODE_OPTIONS: { id: AgentMode; label: string }[] = [ // ─── ChatPage ─────────────────────────────────────────────────────────────── -const ChatPage: React.FC = ({ sessionMgr, sessionId, sessionName, onBack }) => { +const ChatPage: React.FC = ({ sessionMgr, sessionId, sessionName, onBack, autoFocus }) => { const { getMessages, setMessages, appendNewMessages, activeTurn, setActiveTurn, + error, setError, currentWorkspace, updateSessionName, @@ -564,15 +1186,21 @@ const ChatPage: React.FC = ({ sessionMgr, sessionId, sessionName, const [agentMode, setAgentMode] = useState('agentic'); const [liveTitle, setLiveTitle] = useState(sessionName); const [pendingImages, setPendingImages] = useState<{ name: string; dataUrl: string }[]>([]); - const [inputFocused, setInputFocused] = useState(false); + const [imageAnalyzing, setImageAnalyzing] = useState(false); + const [optimisticMsg, setOptimisticMsg] = useState<{ + id: string; text: string; images: { name: string; data_url: string }[]; + } | null>(null); + const [inputExpanded, setInputExpanded] = useState(!!autoFocus); const inputRef = useRef(null); const fileInputRef = useRef(null); + const inputBarRef = useRef(null); const pollerRef = useRef(null); const [isLoadingMore, setIsLoadingMore] = useState(false); const [hasMore, setHasMore] = useState(true); const messagesEndRef = useRef(null); const messagesContainerRef = useRef(null); + const [expandedMsgIds, setExpandedMsgIds] = useState>(new Set()); const isStreaming = activeTurn != null && activeTurn.status === 'active'; @@ -592,6 +1220,12 @@ const ChatPage: React.FC = ({ sessionMgr, sessionId, sessionName, return () => clearInterval(timer); }, [isStreaming]); + useEffect(() => { + if (!error) return; + const t = setTimeout(() => setError(null), 5000); + return () => clearTimeout(t); + }, [error, setError]); + const loadMessages = useCallback(async (beforeId?: string) => { if (isLoadingMore || (!hasMore && beforeId)) return; try { @@ -621,14 +1255,39 @@ const ChatPage: React.FC = ({ sessionMgr, sessionId, sessionName, }, [hasMore, isLoadingMore, getMessages, sessionId, loadMessages]); // Initial load + start poller + const initialScrollDone = useRef(false); useEffect(() => { + initialScrollDone.current = false; loadMessages().then(() => { const initialMsgCount = useMobileStore.getState().getMessages(sessionId).length; + // Scroll to bottom after initial load — use rAF + setTimeout to ensure + // the DOM has finished laying out the newly rendered messages. + requestAnimationFrame(() => { + setTimeout(() => { + messagesEndRef.current?.scrollIntoView({ behavior: 'auto' }); + initialScrollDone.current = true; + prevMsgCountRef.current = useMobileStore.getState().getMessages(sessionId).length; + }, 50); + }); + const poller = new SessionPoller(sessionMgr, sessionId, (resp: PollResponse) => { if (resp.new_messages && resp.new_messages.length > 0) { appendNewMessages(sessionId, resp.new_messages); } + + // Detect count mismatch (messages inserted in the middle due to + // persistence race). When the local count doesn't match the server + // total, do a full reload to pick up all messages. + if (resp.total_msg_count != null) { + const localCount = useMobileStore.getState().getMessages(sessionId).length; + if (localCount !== resp.total_msg_count) { + sessionMgr.getSessionMessages(sessionId, 200).then(fresh => { + useMobileStore.getState().setMessages(sessionId, fresh.messages); + }).catch(() => {}); + } + } + if (resp.title) { setLiveTitle(resp.title); updateSessionName(sessionId, resp.title); @@ -647,37 +1306,80 @@ const ChatPage: React.FC = ({ sessionMgr, sessionId, sessionName, }; }, [sessionId, sessionMgr]); + const prevMsgCountRef = useRef(0); useEffect(() => { - if (!isLoadingMore) { - messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' }); + if (!initialScrollDone.current) return; + if (messages.length !== prevMsgCountRef.current) { + const isNewAppend = messages.length > prevMsgCountRef.current; + prevMsgCountRef.current = messages.length; + if (isNewAppend && !isLoadingMore) { + messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' }); + } } - }, [messages, activeTurn, isLoadingMore]); + }, [messages.length, isLoadingMore]); + + useEffect(() => { + if (!initialScrollDone.current || !isStreaming) return; + messagesEndRef.current?.scrollIntoView({ behavior: 'auto' }); + }, [activeTurn, isStreaming]); - // Reload messages when a turn completes so the messages array - // contains the final persisted content instead of stale partial data. - const prevActiveTurnRef = useRef(null); useEffect(() => { - const prev = prevActiveTurnRef.current; - prevActiveTurnRef.current = activeTurn; - if (prev && !activeTurn) { - loadMessages(); + if (optimisticMsg) { + messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' }); } - }, [activeTurn, loadMessages]); + }, [optimisticMsg]); + + useEffect(() => { + if (!initialScrollDone.current || !isStreaming) return; + const container = messagesContainerRef.current; + if (!container) return; + const tid = setInterval(() => { + const gap = container.scrollHeight - container.scrollTop - container.clientHeight; + if (gap > 10 && gap < 300) { + container.scrollTop = container.scrollHeight; + } + }, 200); + return () => clearInterval(tid); + }, [isStreaming]); const handleSend = useCallback(async () => { const text = input.trim(); const imgs = pendingImages; - if ((!text && imgs.length === 0) || isStreaming) return; + if ((!text && imgs.length === 0) || isStreaming || imageAnalyzing) return; setInput(''); setPendingImages([]); + setInputExpanded(false); + + const hasImages = imgs.length > 0; + const imageContexts = hasImages + ? imgs.map((img, idx) => { + const mimeType = img.dataUrl.split(';')[0]?.replace('data:', '') || 'image/png'; + return { + id: `mobile_img_${Date.now()}_${idx}`, + data_url: img.dataUrl, + mime_type: mimeType, + metadata: { name: img.name, source: 'remote' }, + }; + }) + : undefined; + + if (hasImages) { + setOptimisticMsg({ + id: `opt_${Date.now()}`, + text: text || '', + images: imgs.map(i => ({ name: i.name, data_url: i.dataUrl })), + }); + setImageAnalyzing(true); + } try { - const imagePayload = imgs.length > 0 - ? imgs.map(i => ({ name: i.name, data_url: i.dataUrl })) - : undefined; - await sessionMgr.sendMessage(sessionId, text || '(see attached images)', agentMode, imagePayload); + await sessionMgr.sendMessage(sessionId, text || '(see attached images)', agentMode, imageContexts); + pollerRef.current?.nudge(); } catch (e: any) { setError(e.message); + } finally { + setImageAnalyzing(false); + setOptimisticMsg(null); } }, [input, pendingImages, isStreaming, sessionId, sessionMgr, setError, agentMode]); @@ -685,24 +1387,33 @@ const ChatPage: React.FC = ({ sessionMgr, sessionId, sessionName, fileInputRef.current?.click(); }, []); - const handleFileChange = useCallback((e: React.ChangeEvent) => { + const handleFileChange = useCallback(async (e: React.ChangeEvent) => { const files = e.target.files; if (!files) return; const maxImages = 5; const remaining = maxImages - pendingImages.length; const toProcess = Array.from(files).slice(0, remaining); - toProcess.forEach((file) => { - const reader = new FileReader(); - reader.onload = () => { - const dataUrl = reader.result as string; + const { compressImageFile } = await import('../services/imageCompressor'); + for (const file of toProcess) { + try { + const compressed = await compressImageFile(file); setPendingImages((prev) => { if (prev.length >= maxImages) return prev; - return [...prev, { name: file.name, dataUrl }]; + return [...prev, { name: compressed.name, dataUrl: compressed.dataUrl }]; }); - }; - reader.readAsDataURL(file); - }); + } catch { + const reader = new FileReader(); + reader.onload = () => { + const dataUrl = reader.result as string; + setPendingImages((prev) => { + if (prev.length >= maxImages) return prev; + return [...prev, { name: file.name, dataUrl }]; + }); + }; + reader.readAsDataURL(file); + } + } e.target.value = ''; }, [pendingImages.length]); @@ -710,6 +1421,30 @@ const ChatPage: React.FC = ({ sessionMgr, sessionId, sessionName, setPendingImages((prev) => prev.filter((_, i) => i !== idx)); }, []); + const expandInput = useCallback(() => { + setInputExpanded(true); + requestAnimationFrame(() => inputRef.current?.focus()); + }, []); + + useEffect(() => { + if (autoFocus) { + requestAnimationFrame(() => inputRef.current?.focus()); + } + }, [autoFocus]); + + useEffect(() => { + if (!inputExpanded) return; + const handleClickOutside = (e: MouseEvent) => { + if (inputBarRef.current && !inputBarRef.current.contains(e.target as Node)) { + if (!input.trim() && pendingImages.length === 0) { + setInputExpanded(false); + } + } + }; + document.addEventListener('mousedown', handleClickOutside); + return () => document.removeEventListener('mousedown', handleClickOutside); + }, [inputExpanded, input, pendingImages.length]); + const handleKeyDown = (e: React.KeyboardEvent) => { if (e.key === 'Enter' && !e.shiftKey) { e.preventDefault(); @@ -730,7 +1465,7 @@ const ChatPage: React.FC = ({ sessionMgr, sessionId, sessionName, const displayName = liveTitle || sessionName || 'Session'; return ( -
+
{/* Header */}
@@ -741,6 +1476,17 @@ const ChatPage: React.FC = ({ sessionMgr, sessionId, sessionName,
{displayName} + {workspaceName && ( +
+ {workspaceName} + {gitBranch && ( + + + {truncateMiddle(gitBranch, 28)} + + )} +
+ )}
- {workspaceName && ( -
- - - - - - {workspaceName} - {gitBranch && ( - - - {gitBranch} - - )} -
- )}
{/* Messages */} @@ -772,178 +1502,337 @@ const ChatPage: React.FC = ({ sessionMgr, sessionId, sessionName,
Loading older messages…
)} - {messages.map((m, _idx) => { - if (m.role === 'system' || m.role === 'tool') return null; + {(() => { + // Find the last user message index to determine which assistant + // responses are "old" and can be collapsed. + const lastUserIdx = messages.reduceRight( + (found, m, i) => (found < 0 && m.role === 'user' ? i : found), + -1, + ); - if (m.role === 'user') { - return ( -
-
-
U
-
{m.content}
+ return messages.map((m, idx) => { + if (m.role === 'system' || m.role === 'tool') return null; + + if (m.role === 'user') { + const userText = m.content + .replace(/#img:\S+\s*/g, '') + .replace(/\[Image:.*?\]\n(?:Path:.*?\n|Image ID:.*?\n)?/g, '') + .trim(); + return ( +
+
+
U
+
+ {userText} + {m.images && m.images.length > 0 && ( +
+ {m.images.map((img, imgIdx) => ( + {img.name} + ))} +
+ )} +
+
+ ); + } + + const hasItems = m.items && m.items.length > 0; + const hasContent = m.thinking || (m.tools && m.tools.length > 0) || m.content; + if (!hasItems && !hasContent) return null; + + const isOldResponse = idx < lastUserIdx; + const isExpanded = expandedMsgIds.has(m.id); + + if (isOldResponse && !isExpanded) { + return ( +
+ +
+ ); + } + + return ( +
+ {isOldResponse && isExpanded && ( + + )} + {hasItems ? ( + renderOrderedItems(m.items!, now, undefined, handleAnswerQuestion) + ) : ( + <> + {m.thinking && } + {m.tools && m.tools.length > 0 && } + {m.content && ( +
+ +
+ )} + + )}
); - } - - return ( -
- {m.items && m.items.length > 0 ? ( - renderOrderedItems(m.items, now, undefined, handleAnswerQuestion) - ) : ( - <> - {m.thinking && } - {m.tools && m.tools.length > 0 && } - {m.content && ( -
- -
- )} - - )} -
- ); - })} + }); + })()} - {/* Active turn overlay (streaming content from poller) */} + {/* Active turn overlay (streaming or completed-pending-persist) */} {activeTurn && (() => { - if (activeTurn.items && activeTurn.items.length > 0) { + const turn = activeTurn; + const turnIsActive = turn.status === 'active'; + + if (turn.items && turn.items.length > 0) { return (
- {renderActiveTurnItems(activeTurn.items, now, sessionMgr, setError, handleAnswerQuestion)} - {activeTurn.status === 'active' && !activeTurn.thinking && !activeTurn.text && activeTurn.tools.length === 0 && ( + {turnIsActive + ? renderActiveTurnItems(turn.items, now, sessionMgr, setError, handleAnswerQuestion) + : renderOrderedItems(turn.items, now)} + {turnIsActive && !turn.thinking && !turn.text && turn.tools.length === 0 && (
)}
); } - const askTools = activeTurn.tools.filter( + const taskTools = turn.tools.filter(t => t.name === 'Task'); + const hasRunningSubagent = taskTools.some(t => t.status === 'running'); + const askTools = turn.tools.filter( t => t.name === 'AskUserQuestion' && t.status === 'running' && t.tool_input, ); const askToolIds = new Set(askTools.map(t => t.id)); - const regularTools = activeTurn.tools.filter(t => !askToolIds.has(t.id)); + const regularTools = turn.tools.filter(t => t.name !== 'Task' && !askToolIds.has(t.id)); + const subItemsForTask: ChatMessageItem[] = hasRunningSubagent + ? [ + ...(turn.thinking ? [{ type: 'thinking' as const, content: turn.thinking }] : []), + ...regularTools.map(t => ({ type: 'tool' as const, tool: t })), + ] + : []; + const onCancel = (toolId: string) => { + sessionMgr.cancelTool(toolId, 'User cancelled').catch(err => { setError(String(err)); }); + }; return (
- {(activeTurn.thinking || activeTurn.status === 'active') && ( + {!hasRunningSubagent && (turn.thinking || turnIsActive) && ( )} - { - sessionMgr.cancelTool(toolId, 'User cancelled').catch(err => { setError(String(err)); }); - }} - /> - {askTools.map(at => ( + {taskTools.map(t => ( + + ))} + {!hasRunningSubagent && regularTools.length > 0 && ( + + )} + {turnIsActive && askTools.map(at => ( ))} - {activeTurn.text ? ( + {!hasRunningSubagent && turn.text ? (
- + {turnIsActive ? : }
- ) : activeTurn.status === 'active' && !activeTurn.thinking && activeTurn.tools.length === 0 ? ( + ) : turnIsActive && !turn.thinking && turn.tools.length === 0 ? (
) : null}
); })()} -
-
- - {/* Floating Input Bar */} -
-
-
- {MODE_OPTIONS.map((opt) => ( - - ))} + {/* Optimistic user message with images (shown immediately before server responds) */} + {optimisticMsg && ( +
+
+
U
+
+ {optimisticMsg.text} + {optimisticMsg.images.length > 0 && ( +
+ {optimisticMsg.images.map((img, i) => ( + {img.name} + ))} +
+ )} +
+
-
+ )} - {pendingImages.length > 0 && ( -
- {pendingImages.map((img, idx) => ( -
- {img.name} - + {/* Image analysis indicator */} + {imageAnalyzing && ( +
+
+
+
+ + + + +
+ Analyzing image with image understanding model... +
- ))} +
)} -
- - -