diff --git a/resources/flashgrep/README.md b/resources/flashgrep/README.md index 0f0e876c7..a7206fee7 100644 --- a/resources/flashgrep/README.md +++ b/resources/flashgrep/README.md @@ -1,12 +1,18 @@ Place the prebuilt `flashgrep` daemon binary in this directory. +Pinned release: + +- `v0.2.6` from `wgqqqqq/flashgrep` + Expected filenames: - macOS x86_64: `flashgrep-x86_64-apple-darwin` - macOS arm64: `flashgrep-aarch64-apple-darwin` -- Linux x86_64: `flashgrep-x86_64-unknown-linux-gnu` -- Linux arm64: `flashgrep-aarch64-unknown-linux-gnu` +- Linux x86_64: `flashgrep-x86_64-unknown-linux-musl` +- Linux arm64: `flashgrep-aarch64-unknown-linux-musl` - Windows x86_64: `flashgrep-x86_64-pc-windows-msvc.exe` - Windows arm64: `flashgrep-aarch64-pc-windows-msvc.exe` +macOS binaries are ad-hoc signed after download so local development can execute them directly. + BitFun dev/build scripts load the daemon from this repository-relative path. diff --git a/resources/flashgrep/VERSION.json b/resources/flashgrep/VERSION.json new file mode 100644 index 000000000..d58b4129a --- /dev/null +++ b/resources/flashgrep/VERSION.json @@ -0,0 +1,5 @@ +{ + "repo": "wgqqqqq/flashgrep", + "tag": "v0.2.6", + "published_at": "2026-05-11T06:49:11Z" +} diff --git a/resources/flashgrep/flashgrep-aarch64-apple-darwin b/resources/flashgrep/flashgrep-aarch64-apple-darwin index fc6b2e52d..1e890dde2 100755 Binary files a/resources/flashgrep/flashgrep-aarch64-apple-darwin and b/resources/flashgrep/flashgrep-aarch64-apple-darwin differ diff --git a/resources/flashgrep/flashgrep-aarch64-pc-windows-msvc.exe b/resources/flashgrep/flashgrep-aarch64-pc-windows-msvc.exe index 3c4771c66..9589b5721 100644 Binary files a/resources/flashgrep/flashgrep-aarch64-pc-windows-msvc.exe and b/resources/flashgrep/flashgrep-aarch64-pc-windows-msvc.exe differ diff --git a/resources/flashgrep/flashgrep-aarch64-unknown-linux-gnu b/resources/flashgrep/flashgrep-aarch64-unknown-linux-musl similarity index 57% rename from resources/flashgrep/flashgrep-aarch64-unknown-linux-gnu rename to resources/flashgrep/flashgrep-aarch64-unknown-linux-musl index 0a6c14ee5..9b16b7397 100755 Binary files a/resources/flashgrep/flashgrep-aarch64-unknown-linux-gnu and b/resources/flashgrep/flashgrep-aarch64-unknown-linux-musl differ diff --git a/resources/flashgrep/flashgrep-x86_64-apple-darwin b/resources/flashgrep/flashgrep-x86_64-apple-darwin index 28baf781c..17ff14ef2 100755 Binary files a/resources/flashgrep/flashgrep-x86_64-apple-darwin and b/resources/flashgrep/flashgrep-x86_64-apple-darwin differ diff --git a/resources/flashgrep/flashgrep-x86_64-pc-windows-msvc.exe b/resources/flashgrep/flashgrep-x86_64-pc-windows-msvc.exe index 0143f13fa..4f7ae41f3 100644 Binary files a/resources/flashgrep/flashgrep-x86_64-pc-windows-msvc.exe and b/resources/flashgrep/flashgrep-x86_64-pc-windows-msvc.exe differ diff --git a/resources/flashgrep/flashgrep-x86_64-unknown-linux-gnu b/resources/flashgrep/flashgrep-x86_64-unknown-linux-musl similarity index 57% rename from resources/flashgrep/flashgrep-x86_64-unknown-linux-gnu rename to resources/flashgrep/flashgrep-x86_64-unknown-linux-musl index ba206a6fb..66226ecfe 100755 Binary files a/resources/flashgrep/flashgrep-x86_64-unknown-linux-gnu and b/resources/flashgrep/flashgrep-x86_64-unknown-linux-musl differ diff --git a/scripts/prepare-flashgrep-resource.mjs b/scripts/prepare-flashgrep-resource.mjs index 0d8876585..cb3b1b9d8 100644 --- a/scripts/prepare-flashgrep-resource.mjs +++ b/scripts/prepare-flashgrep-resource.mjs @@ -20,10 +20,16 @@ export function flashgrepBinaryNames() { return ['flashgrep-aarch64-apple-darwin']; } if (process.platform === 'linux' && process.arch === 'x64') { - return ['flashgrep-x86_64-unknown-linux-gnu']; + return [ + 'flashgrep-x86_64-unknown-linux-musl', + 'flashgrep-x86_64-unknown-linux-gnu', + ]; } if (process.platform === 'linux' && process.arch === 'arm64') { - return ['flashgrep-aarch64-unknown-linux-gnu']; + return [ + 'flashgrep-aarch64-unknown-linux-musl', + 'flashgrep-aarch64-unknown-linux-gnu', + ]; } return [process.platform === 'win32' ? 'flashgrep.exe' : 'flashgrep']; } @@ -33,7 +39,10 @@ export function flashgrepBinaryName() { } export function flashgrepBinaryPath() { - return join(RESOURCE_DIR, flashgrepBinaryName()); + const availableBinaryName = + flashgrepBinaryNames().find((binaryName) => existsSync(join(RESOURCE_DIR, binaryName))) ?? + flashgrepBinaryName(); + return join(RESOURCE_DIR, availableBinaryName); } export function ensureFlashgrepBinary() { diff --git a/src/apps/desktop/src/api/commands.rs b/src/apps/desktop/src/api/commands.rs index 7fd1bbea2..13ac0daf6 100644 --- a/src/apps/desktop/src/api/commands.rs +++ b/src/apps/desktop/src/api/commands.rs @@ -9,8 +9,9 @@ use crate::api::path_target::{ write_text_file, DesktopPathTarget, }; use crate::api::search_api::{ - group_search_results, search_file_contents_via_workspace_search, - search_metadata_from_content_result, should_use_workspace_search, SearchMetadataResponse, + build_content_search_request, group_search_results, prepare_content_search_runner, + search_file_contents_via_workspace_search, search_metadata_from_content_result, + should_use_workspace_search, SearchMetadataResponse, }; use crate::api::workspace_activation::spawn_workspace_background_warmup; use bitfun_core::infrastructure::{ @@ -2551,28 +2552,12 @@ pub async fn search_files( }; let use_workspace_search = - request.search_content && should_use_workspace_search(&request.root_path).await; + request.search_content && should_use_workspace_search(&state, &request.root_path).await; let result = if request.search_content { - let filename_outcome = state - .filesystem_service - .search_file_names( - &request.root_path, - &request.pattern, - FileSearchOptions { - include_content: false, - include_directories: request.include_directories, - ..options.clone() - }, - cancel_flag.clone(), - ) - .await?; - let mut filename_results = filename_outcome.results; - - if filename_results.len() >= max_results { - Ok(filename_results) - } else { - let remaining = max_results - filename_results.len(); - let mut content_outcome = if use_workspace_search { + if is_remote_path(request.root_path.trim()).await { + if !use_workspace_search { + Err("Remote content search requires workspace search support".to_string()) + } else { search_file_contents_via_workspace_search( &state, &request.root_path, @@ -2580,37 +2565,71 @@ pub async fn search_files( request.case_sensitive, request.use_regex, request.whole_word, - remaining, + max_results, ) .await - .map(|result| result.outcome)? + .map(|result| result.outcome.results) + } + } else { + let filename_outcome = state + .filesystem_service + .search_file_names( + &request.root_path, + &request.pattern, + FileSearchOptions { + include_content: false, + include_directories: request.include_directories, + ..options.clone() + }, + cancel_flag.clone(), + ) + .await?; + let mut filename_results = filename_outcome.results; + + if filename_results.len() >= max_results { + Ok(filename_results) } else { - state - .filesystem_service - .search_file_contents( + let remaining = max_results - filename_results.len(); + let mut content_outcome = if use_workspace_search { + search_file_contents_via_workspace_search( + &state, &request.root_path, &request.pattern, - FileSearchOptions { - include_content: true, - include_directories: false, - max_results: Some(remaining), - ..options - }, - cancel_flag, + request.case_sensitive, + request.use_regex, + request.whole_word, + remaining, ) - .await? - }; - if filename_outcome.truncated || content_outcome.truncated { - debug!( - "Legacy search truncated: root_path={}, pattern={}, search_content={}, limit={}", - request.root_path, - request.pattern, - request.search_content, - max_results - ); + .await + .map(|result| result.outcome)? + } else { + state + .filesystem_service + .search_file_contents( + &request.root_path, + &request.pattern, + FileSearchOptions { + include_content: true, + include_directories: false, + max_results: Some(remaining), + ..options + }, + cancel_flag, + ) + .await? + }; + if filename_outcome.truncated || content_outcome.truncated { + debug!( + "Legacy search truncated: root_path={}, pattern={}, search_content={}, limit={}", + request.root_path, + request.pattern, + request.search_content, + max_results + ); + } + filename_results.append(&mut content_outcome.results); + Ok(filename_results) } - filename_results.append(&mut content_outcome.results); - Ok(filename_results) } } else { state @@ -2618,6 +2637,7 @@ pub async fn search_files( .search_file_names(&request.root_path, &request.pattern, options, cancel_flag) .await .map(|outcome| outcome.results) + .map_err(|error| format!("Failed to search filenames: {}", error)) }; unregister_search(&state, search_id.as_deref()); @@ -2710,7 +2730,7 @@ pub async fn search_file_contents( include_directories: false, }; - let result = if should_use_workspace_search(&request.root_path).await { + let result = if should_use_workspace_search(&state, &request.root_path).await { search_file_contents_via_workspace_search( &state, &request.root_path, @@ -2876,14 +2896,22 @@ pub async fn start_search_file_contents_stream( }; let filesystem_service = state.filesystem_service.clone(); - let workspace_search_service = state.workspace_search_service.clone(); let active_searches = state.active_searches.clone(); let root_path = request.root_path.clone(); let pattern = request.pattern.clone(); let case_sensitive = request.case_sensitive; let use_regex = request.use_regex; let whole_word = request.whole_word; - let use_workspace_search = should_use_workspace_search(&root_path).await; + let use_workspace_search = should_use_workspace_search(&state, &root_path).await; + let workspace_search_runner = if use_workspace_search { + Some( + prepare_content_search_runner(&state, &root_path) + .await + .map_err(|error| format!("Failed to prepare workspace search: {}", error))?, + ) + } else { + None + }; let response_search_id = search_id.clone(); let progress_search_id = search_id.clone(); let progress_app_handle = app_handle.clone(); @@ -2902,23 +2930,17 @@ pub async fn start_search_file_contents_stream( tokio::spawn(async move { let result = if use_workspace_search { - let result = workspace_search_service - .search_content(bitfun_core::service::search::ContentSearchRequest { - repo_root: root_path.clone().into(), - search_path: None, - pattern: pattern.clone(), - output_mode: bitfun_core::service::search::ContentSearchOutputMode::Content, + let result = workspace_search_runner + .as_ref() + .expect("workspace search runner should exist when enabled") + .search_content(build_content_search_request( + &root_path, + &pattern, case_sensitive, use_regex, whole_word, - multiline: false, - before_context: 0, - after_context: 0, - max_results: Some(limit), - globs: Vec::new(), - file_types: Vec::new(), - exclude_file_types: Vec::new(), - }) + limit, + )) .await .map(|result| { let search_metadata = search_metadata_from_content_result(&result); @@ -2941,7 +2963,6 @@ pub async fn start_search_file_contents_stream( ); } } - result.map_err(|error| { bitfun_core::util::errors::BitFunError::service(format!( "Failed to search file contents via workspace search: {}", diff --git a/src/apps/desktop/src/api/search_api.rs b/src/apps/desktop/src/api/search_api.rs index 36e84fc05..7598ed440 100644 --- a/src/apps/desktop/src/api/search_api.rs +++ b/src/apps/desktop/src/api/search_api.rs @@ -1,11 +1,13 @@ use crate::api::app_state::AppState; use bitfun_core::infrastructure::{FileSearchResult, FileSearchResultGroup, SearchMatchType}; -use bitfun_core::service::remote_ssh::workspace_state::is_remote_path; +use bitfun_core::service::remote_ssh::workspace_state::{is_remote_path, lookup_remote_connection}; use bitfun_core::service::search::{ - workspace_search_daemon_available, workspace_search_feature_enabled, ContentSearchResult, - WorkspaceSearchBackend, WorkspaceSearchRepoPhase, + remote_workspace_search_service_for_path, workspace_search_daemon_available, + workspace_search_feature_enabled, ContentSearchRequest, ContentSearchResult, + RemoteWorkspaceSearchService, WorkspaceSearchBackend, WorkspaceSearchRepoPhase, }; use serde::{Deserialize, Serialize}; +use std::sync::Arc; use tauri::State; #[derive(Debug, Deserialize)] @@ -25,19 +27,74 @@ pub struct SearchMetadataResponse { pub matched_occurrences: usize, } -async fn workspace_search_unavailable_message(root_path: &str) -> Option { - if is_remote_path(root_path.trim()).await { - return Some( - "Remote workspace search status is not managed by BitFun workspace search".to_string(), - ); +#[derive(Clone)] +pub(crate) enum WorkspaceContentSearchRunner { + Local(Arc), + Remote(RemoteWorkspaceSearchService), +} + +impl WorkspaceContentSearchRunner { + pub(crate) async fn search_content( + &self, + request: ContentSearchRequest, + ) -> Result { + match self { + Self::Local(service) => service.search_content(request).await.map_err(|error| { + format!( + "Failed to search file contents via workspace search: {}", + error + ) + }), + Self::Remote(service) => service.search_content(request).await, + } } +} + +pub(crate) async fn remote_workspace_search_service( + state: &State<'_, AppState>, + root_path: &str, +) -> Result { + let preferred_connection_id = state + .get_remote_workspace_async() + .await + .and_then(|workspace| { + let remote_root = bitfun_core::service::remote_ssh::normalize_remote_workspace_path( + &workspace.remote_path, + ); + let root_path = + bitfun_core::service::remote_ssh::normalize_remote_workspace_path(root_path); + if root_path == remote_root || root_path.starts_with(&format!("{remote_root}/")) { + Some(workspace.connection_id) + } else { + None + } + }); + + remote_workspace_search_service_for_path(root_path, preferred_connection_id).await +} +async fn workspace_search_unavailable_message( + state: &State<'_, AppState>, + root_path: &str, +) -> Option { if !workspace_search_feature_enabled().await { return Some( "Workspace search is disabled. Enable it in Settings > Session Config to use accelerated workspace search.".to_string(), ); } + if is_remote_path(root_path.trim()).await { + if lookup_remote_connection(root_path.trim()).await.is_none() { + return Some("Remote workspace is not registered with BitFun SSH state".to_string()); + } + if state.get_ssh_manager_async().await.is_err() + || state.get_remote_file_service_async().await.is_err() + { + return Some("Remote workspace search services are unavailable".to_string()); + } + return None; + } + if !workspace_search_daemon_available() { return Some( "Workspace search daemon is unavailable. BitFun will continue using legacy search." @@ -48,12 +105,30 @@ async fn workspace_search_unavailable_message(root_path: &str) -> Option None } -pub(crate) async fn should_use_workspace_search(root_path: &str) -> bool { - workspace_search_unavailable_message(root_path) +pub(crate) async fn should_use_workspace_search( + state: &State<'_, AppState>, + root_path: &str, +) -> bool { + workspace_search_unavailable_message(state, root_path) .await .is_none() } +pub(crate) async fn prepare_content_search_runner( + state: &State<'_, AppState>, + root_path: &str, +) -> Result { + if is_remote_path(root_path.trim()).await { + Ok(WorkspaceContentSearchRunner::Remote( + remote_workspace_search_service(state, root_path).await?, + )) + } else { + Ok(WorkspaceContentSearchRunner::Local( + state.workspace_search_service.clone(), + )) + } +} + pub(crate) async fn search_file_contents_via_workspace_search( state: &State<'_, AppState>, root_path: &str, @@ -63,31 +138,55 @@ pub(crate) async fn search_file_contents_via_workspace_search( whole_word: bool, max_results: usize, ) -> Result { - state - .workspace_search_service - .search_content(bitfun_core::service::search::ContentSearchRequest { - repo_root: root_path.into(), - search_path: None, - pattern: pattern.to_string(), - output_mode: bitfun_core::service::search::ContentSearchOutputMode::Content, + search_content_request_via_workspace_search( + state, + build_content_search_request( + root_path, + pattern, case_sensitive, use_regex, whole_word, - multiline: false, - before_context: 0, - after_context: 0, - max_results: Some(max_results), - globs: Vec::new(), - file_types: Vec::new(), - exclude_file_types: Vec::new(), - }) + max_results, + ), + ) + .await +} + +pub(crate) fn build_content_search_request( + root_path: &str, + pattern: &str, + case_sensitive: bool, + use_regex: bool, + whole_word: bool, + max_results: usize, +) -> ContentSearchRequest { + ContentSearchRequest { + repo_root: root_path.into(), + search_path: None, + pattern: pattern.to_string(), + output_mode: bitfun_core::service::search::ContentSearchOutputMode::Content, + case_sensitive, + use_regex, + whole_word, + multiline: false, + before_context: 0, + after_context: 0, + max_results: Some(max_results), + globs: Vec::new(), + file_types: Vec::new(), + exclude_file_types: Vec::new(), + } +} + +pub(crate) async fn search_content_request_via_workspace_search( + state: &State<'_, AppState>, + request: ContentSearchRequest, +) -> Result { + let repo_root = request.repo_root.to_string_lossy().to_string(); + prepare_content_search_runner(state, &repo_root) + .await? + .search_content(request) .await - .map_err(|error| { - format!( - "Failed to search file contents via workspace search: {}", - error - ) - }) } pub(crate) fn group_search_results(results: Vec) -> Vec { @@ -139,10 +238,19 @@ pub async fn search_get_repo_status( state: State<'_, AppState>, request: SearchRepoIndexRequest, ) -> Result { - if let Some(message) = workspace_search_unavailable_message(&request.root_path).await { + if let Some(message) = workspace_search_unavailable_message(&state, &request.root_path).await { return Err(message); } + if is_remote_path(request.root_path.trim()).await { + return remote_workspace_search_service(&state, &request.root_path) + .await? + .get_index_status(&request.root_path) + .await + .map(|status| serde_json::to_value(status).unwrap_or_else(|_| serde_json::json!({}))) + .map_err(|error| format!("Failed to get search repository status: {}", error)); + } + state .workspace_search_service .get_index_status(&request.root_path) @@ -156,10 +264,19 @@ pub async fn search_build_index( state: State<'_, AppState>, request: SearchRepoIndexRequest, ) -> Result { - if let Some(message) = workspace_search_unavailable_message(&request.root_path).await { + if let Some(message) = workspace_search_unavailable_message(&state, &request.root_path).await { return Err(message); } + if is_remote_path(request.root_path.trim()).await { + return remote_workspace_search_service(&state, &request.root_path) + .await? + .build_index(&request.root_path) + .await + .map(|task| serde_json::to_value(task).unwrap_or_else(|_| serde_json::json!({}))) + .map_err(|error| format!("Failed to build workspace index: {}", error)); + } + state .workspace_search_service .build_index(&request.root_path) @@ -173,10 +290,19 @@ pub async fn search_rebuild_index( state: State<'_, AppState>, request: SearchRepoIndexRequest, ) -> Result { - if let Some(message) = workspace_search_unavailable_message(&request.root_path).await { + if let Some(message) = workspace_search_unavailable_message(&state, &request.root_path).await { return Err(message); } + if is_remote_path(request.root_path.trim()).await { + return remote_workspace_search_service(&state, &request.root_path) + .await? + .rebuild_index(&request.root_path) + .await + .map(|task| serde_json::to_value(task).unwrap_or_else(|_| serde_json::json!({}))) + .map_err(|error| format!("Failed to rebuild workspace index: {}", error)); + } + state .workspace_search_service .rebuild_index(&request.root_path) diff --git a/src/crates/core/src/agentic/tools/implementations/glob_tool.rs b/src/crates/core/src/agentic/tools/implementations/glob_tool.rs index 7e193e990..67a0adb9f 100644 --- a/src/crates/core/src/agentic/tools/implementations/glob_tool.rs +++ b/src/crates/core/src/agentic/tools/implementations/glob_tool.rs @@ -1,6 +1,7 @@ use crate::agentic::tools::framework::{Tool, ToolResult, ToolUseContext}; use crate::service::search::{ - get_global_workspace_search_service, workspace_search_runtime_available, GlobSearchRequest, + get_global_workspace_search_service, remote_workspace_search_service_for_path, + workspace_search_feature_enabled, workspace_search_runtime_available, GlobSearchRequest, }; use crate::util::errors::{BitFunError, BitFunResult}; use crate::util::process_manager; @@ -501,8 +502,75 @@ impl Tool for GlobTool { .map(|v| v as usize) .unwrap_or(100); - // Remote workspace: prefer `rg --files --glob`, but fall back to `find` if resolved.uses_remote_workspace_backend() { + if workspace_search_feature_enabled().await { + let remote_workspace_glob_result = async { + let workspace_root = context + .workspace + .as_ref() + .map(|workspace| PathBuf::from(workspace.root_path_string())) + .ok_or_else(|| { + BitFunError::tool( + "workspace_path is required when Glob path is omitted".to_string(), + ) + })?; + let resolved_path = PathBuf::from(&resolved.resolved_path); + let repo_root = workspace_root.to_string_lossy().to_string(); + let preferred_connection_id = context + .workspace + .as_ref() + .and_then(|workspace| workspace.connection_id()) + .map(str::to_string); + let search_service = remote_workspace_search_service_for_path( + &repo_root, + preferred_connection_id, + ) + .await + .map_err(BitFunError::tool)?; + let glob_result = search_service + .glob(GlobSearchRequest { + repo_root: workspace_root.clone(), + search_path: (resolved_path != workspace_root).then_some(resolved_path), + pattern: pattern.to_string(), + limit, + }) + .await + .map_err(BitFunError::tool)?; + + let match_count = glob_result.paths.len(); + let result_text = if glob_result.paths.is_empty() { + format!("No files found matching pattern '{}'", pattern) + } else { + glob_result.paths.join("\n") + }; + + Ok::, BitFunError>(vec![ToolResult::Result { + data: json!({ + "pattern": pattern, + "path": resolved.logical_path, + "matches": glob_result.paths, + "match_count": match_count, + "repo_phase": glob_result.repo_status.phase, + "rebuild_recommended": glob_result.repo_status.rebuild_recommended + }), + result_for_assistant: Some(result_text), + image_attachments: None, + }]) + } + .await; + + match remote_workspace_glob_result { + Ok(results) => return Ok(results), + Err(error) => { + warn!( + "Glob tool remote workspace-search failed; falling back to shell glob: {}", + error + ); + } + } + } + + // Remote workspace fallback: prefer `rg --files --glob`, but fall back to `find`. let ws_shell = context .ws_shell() .ok_or_else(|| BitFunError::tool("Workspace shell not available".to_string()))?; diff --git a/src/crates/core/src/agentic/tools/implementations/grep_tool.rs b/src/crates/core/src/agentic/tools/implementations/grep_tool.rs index 16d0a63c5..cc6c16046 100644 --- a/src/crates/core/src/agentic/tools/implementations/grep_tool.rs +++ b/src/crates/core/src/agentic/tools/implementations/grep_tool.rs @@ -1,7 +1,8 @@ use crate::agentic::tools::framework::{Tool, ToolResult, ToolUseContext}; use crate::service::search::{ - get_global_workspace_search_service, workspace_search_runtime_available, - ContentSearchOutputMode, ContentSearchRequest, WorkspaceSearchHit, WorkspaceSearchLine, + get_global_workspace_search_service, remote_workspace_search_service_for_path, + workspace_search_feature_enabled, workspace_search_runtime_available, ContentSearchOutputMode, + ContentSearchRequest, WorkspaceSearchHit, WorkspaceSearchLine, }; use crate::util::errors::{BitFunError, BitFunResult}; use async_trait::async_trait; @@ -774,7 +775,7 @@ Usage: } fn is_concurrency_safe(&self, _input: Option<&Value>) -> bool { - false + true } fn needs_permissions(&self, _input: Option<&Value>) -> bool { @@ -829,6 +830,94 @@ Usage: let resolved = context.resolve_tool_path(search_path)?; if resolved.uses_remote_workspace_backend() { + if workspace_search_feature_enabled().await { + let remote_workspace_search_result = async { + let (request, output_mode, show_line_numbers, offset, head_limit) = + self.build_workspace_search_request(input, context)?; + let pattern = request.pattern.clone(); + let search_mode = request.output_mode.search_mode(); + let path = request + .search_path + .as_ref() + .map(|path| path.to_string_lossy().to_string()) + .unwrap_or_else(|| request.repo_root.to_string_lossy().to_string()); + let repo_root = request.repo_root.to_string_lossy().to_string(); + let preferred_connection_id = context + .workspace + .as_ref() + .and_then(|workspace| workspace.connection_id()) + .map(str::to_string); + let search_service = + remote_workspace_search_service_for_path(&repo_root, preferred_connection_id) + .await + .map_err(BitFunError::tool)?; + let search_started_at = Instant::now(); + let search_result = search_service + .search_content(request) + .await + .map_err(BitFunError::tool)?; + let display_base = Self::display_base(context); + let (result_text, file_count, total_matches) = + self.format_workspace_search_output( + &output_mode, + show_line_numbers, + offset, + head_limit, + &search_result, + display_base.as_deref(), + ); + let workspace_search_elapsed_ms = search_started_at.elapsed().as_millis(); + + log::info!( + "Grep tool remote workspace-search result: pattern={}, path={}, output_mode={}, search_mode={:?}, file_count={}, total_matches={}, backend={:?}, repo_phase={:?}, rebuild_recommended={}, dirty_modified={}, dirty_deleted={}, dirty_new={}, candidate_docs={}, matched_lines={}, matched_occurrences={}, workspace_search_ms={}", + pattern, + path, + output_mode, + search_mode, + file_count, + total_matches, + search_result.backend, + search_result.repo_status.phase, + search_result.repo_status.rebuild_recommended, + search_result.repo_status.dirty_files.modified, + search_result.repo_status.dirty_files.deleted, + search_result.repo_status.dirty_files.new, + search_result.candidate_docs, + search_result.matched_lines, + search_result.matched_occurrences, + workspace_search_elapsed_ms, + ); + + Ok::, BitFunError>(vec![ToolResult::Result { + data: json!({ + "pattern": pattern, + "path": path, + "output_mode": output_mode, + "file_count": file_count, + "total_matches": total_matches, + "backend": search_result.backend, + "repo_phase": search_result.repo_status.phase, + "rebuild_recommended": search_result.repo_status.rebuild_recommended, + "applied_limit": head_limit, + "applied_offset": if offset > 0 { Some(offset) } else { None:: }, + "result": result_text, + }), + result_for_assistant: Some(result_text), + image_attachments: None, + }]) + } + .await; + + match remote_workspace_search_result { + Ok(results) => return Ok(results), + Err(error) => { + log::warn!( + "Grep tool remote workspace-search failed; falling back to shell grep: {}", + error + ); + } + } + } return self.call_remote(input, context).await; } diff --git a/src/crates/core/src/service/remote_ssh/manager.rs b/src/crates/core/src/service/remote_ssh/manager.rs index e9f910e85..4a12ca6b8 100644 --- a/src/crates/core/src/service/remote_ssh/manager.rs +++ b/src/crates/core/src/service/remote_ssh/manager.rs @@ -1465,6 +1465,49 @@ impl SSHConnectionManager { .unwrap_or(false) } + async fn load_connection_config_from_saved( + &self, + connection_id: &str, + ) -> anyhow::Result> { + let saved = { + let guard = self.saved_connections.read().await; + guard.iter().find(|conn| conn.id == connection_id).cloned() + }; + + let Some(saved) = saved else { + return Ok(None); + }; + + let auth = match saved.auth_type { + crate::service::remote_ssh::types::SavedAuthType::Password => { + let password = + self.password_vault.load(connection_id).await?.ok_or_else(|| { + anyhow!( + "Saved SSH connection {} requires a password, but no stored vault entry is available", + connection_id + ) + })?; + SSHAuthMethod::Password { password } + } + crate::service::remote_ssh::types::SavedAuthType::PrivateKey { key_path } => { + SSHAuthMethod::PrivateKey { + key_path, + passphrase: None, + } + } + }; + + Ok(Some(SSHConnectionConfig { + id: saved.id, + name: saved.name, + host: saved.host, + port: saved.port, + username: saved.username, + auth, + default_workspace: saved.default_workspace, + })) + } + /// Ensure the connection is alive; if it was torn down (network blip, /// server-side timeout), transparently reconnect using the saved config /// and (for password auth) the encrypted password vault. @@ -1473,16 +1516,31 @@ impl SSHConnectionManager { /// concurrent SFTP/exec calls hit a dead session at the same time. /// Idempotent: returns Ok(()) immediately when the session is already alive. async fn ensure_alive_or_reconnect(&self, connection_id: &str) -> anyhow::Result<()> { + let missing_config = self + .load_connection_config_from_saved(connection_id) + .await?; + let (alive_flag, reconnect_lock, mut config) = { let guard = self.connections.read().await; - let conn = guard - .get(connection_id) - .ok_or_else(|| anyhow!("Connection {} not found", connection_id))?; - ( - conn.alive.clone(), - conn.reconnect_lock.clone(), - conn.config.clone(), - ) + if let Some(conn) = guard.get(connection_id) { + ( + conn.alive.clone(), + conn.reconnect_lock.clone(), + conn.config.clone(), + ) + } else { + let config = missing_config.ok_or_else(|| { + anyhow!( + "Connection {} not found and no saved SSH profile is available", + connection_id + ) + })?; + ( + Arc::new(AtomicBool::new(false)), + Arc::new(tokio::sync::Mutex::new(())), + config, + ) + } }; if alive_flag.load(Ordering::SeqCst) { @@ -1496,10 +1554,21 @@ impl SSHConnectionManager { return Ok(()); } - log::warn!( - "SSH session {} is dead; attempting transparent reconnect", - connection_id - ); + let is_existing_connection = { + let guard = self.connections.read().await; + guard.contains_key(connection_id) + }; + if is_existing_connection { + log::warn!( + "SSH session {} is dead; attempting transparent reconnect", + connection_id + ); + } else { + log::info!( + "SSH session {} is not active; attempting to connect using saved SSH profile", + connection_id + ); + } // Refresh the password from the encrypted vault if password auth was // configured but the in-memory copy is empty (defensive — covers cases @@ -1532,18 +1601,24 @@ impl SSHConnectionManager { if let Some(conn) = guard.get_mut(connection_id) { conn.handle = Arc::new(handle); conn.alive = alive; - if let Some(si) = server_info { - conn.server_info = Some(si); + if let Some(si) = server_info.as_ref() { + conn.server_info = Some(si.clone()); } let mut sftp_guard = conn.sftp_session.write().await; *sftp_guard = None; } else { - // Entry was removed concurrently (e.g. user-triggered disconnect); - // nothing to restore. - return Err(anyhow!( - "Connection {} was removed during reconnect", - connection_id - )); + guard.insert( + connection_id.to_string(), + ActiveConnection { + handle: Arc::new(handle), + config, + server_info, + sftp_session: Arc::new(tokio::sync::RwLock::new(None)), + server_key: None, + alive, + reconnect_lock: Arc::new(tokio::sync::Mutex::new(())), + }, + ); } } @@ -1593,6 +1668,33 @@ impl SSHConnectionManager { .map_err(|e| anyhow!("Command execution failed: {}", e)) } + /// Open a long-lived non-PTY exec channel for streaming stdin/stdout protocols. + pub async fn open_exec_channel( + &self, + connection_id: &str, + command: &str, + ) -> anyhow::Result> { + self.ensure_alive_or_reconnect(connection_id).await?; + let handle = { + let guard = self.connections.read().await; + guard + .get(connection_id) + .ok_or_else(|| anyhow!("Connection {} not found", connection_id))? + .handle + .clone() + }; + + let channel = handle + .channel_open_session() + .await + .map_err(|e| anyhow!("Failed to open SSH exec channel: {}", e))?; + channel + .exec(true, command) + .await + .map_err(|e| anyhow!("Failed to start remote command: {}", e))?; + Ok(channel) + } + /// Get server info for a connection pub async fn get_server_info(&self, connection_id: &str) -> Option { let guard = self.connections.read().await; @@ -1860,11 +1962,22 @@ impl SSHConnectionManager { Err(_) => {} } - // Try to create - sftp.as_ref() - .create_dir(&path) - .await - .map_err(|e| anyhow!("Failed to create directory '{}': {}", path, e))?; + for dir in sftp_mkdir_all_prefixes(&path) { + match sftp.as_ref().try_exists(&dir).await { + Ok(true) => continue, + Ok(false) | Err(_) => {} + } + + if let Err(error) = sftp.as_ref().create_dir(&dir).await { + match sftp.as_ref().try_exists(&dir).await { + Ok(true) => continue, + Ok(false) | Err(_) => { + return Err(anyhow!("Failed to create directory '{}': {}", dir, error)); + } + } + } + } + Ok(()) } @@ -2247,6 +2360,27 @@ impl Default for PortForwardManager { } } +fn sftp_mkdir_all_prefixes(path: &str) -> Vec { + let is_absolute = path.starts_with('/'); + let mut current = String::new(); + let mut prefixes = Vec::new(); + + for component in path.split('/').filter(|component| !component.is_empty()) { + if current.is_empty() { + if is_absolute { + current.push('/'); + } + current.push_str(component); + } else { + current.push('/'); + current.push_str(component); + } + prefixes.push(current.clone()); + } + + prefixes +} + #[cfg(test)] mod tests { use super::*; @@ -2320,6 +2454,44 @@ mod tests { let _ = tokio::fs::remove_dir_all(&dir).await; } + #[tokio::test] + async fn restores_connection_config_from_saved_password_profile() { + let dir = test_data_dir("restore-password-config"); + tokio::fs::create_dir_all(&dir).await.unwrap(); + let manager = SSHConnectionManager::new(dir.clone()); + + manager + .save_connection(&SSHConnectionConfig { + id: "ssh-root@example.com:22".to_string(), + name: "root@example.com".to_string(), + host: "example.com".to_string(), + port: 22, + username: "root".to_string(), + auth: SSHAuthMethod::Password { + password: "secret".to_string(), + }, + default_workspace: Some("/root/project".to_string()), + }) + .await + .unwrap(); + + let restored = manager + .load_connection_config_from_saved("ssh-root@example.com:22") + .await + .unwrap() + .expect("expected saved config"); + + assert_eq!(restored.host, "example.com"); + assert_eq!(restored.username, "root"); + assert_eq!(restored.default_workspace.as_deref(), Some("/root/project")); + match restored.auth { + SSHAuthMethod::Password { password } => assert_eq!(password, "secret"), + other => panic!("expected password auth, got {:?}", other), + } + + let _ = tokio::fs::remove_dir_all(&dir).await; + } + #[tokio::test] async fn prunes_remote_workspaces_without_saved_connection() { let dir = test_data_dir("missing-saved"); @@ -2349,4 +2521,31 @@ mod tests { assert!(manager.get_remote_workspaces().await.is_empty()); let _ = tokio::fs::remove_dir_all(&dir).await; } + + #[test] + fn mkdir_all_prefixes_expand_absolute_posix_path() { + assert_eq!( + sftp_mkdir_all_prefixes("/home/wgq/workspace/bot_detection/.bitfun/bin"), + vec![ + "/home".to_string(), + "/home/wgq".to_string(), + "/home/wgq/workspace".to_string(), + "/home/wgq/workspace/bot_detection".to_string(), + "/home/wgq/workspace/bot_detection/.bitfun".to_string(), + "/home/wgq/workspace/bot_detection/.bitfun/bin".to_string(), + ] + ); + } + + #[test] + fn mkdir_all_prefixes_collapse_redundant_separators() { + assert_eq!( + sftp_mkdir_all_prefixes("/home//wgq///project/"), + vec![ + "/home".to_string(), + "/home/wgq".to_string(), + "/home/wgq/project".to_string(), + ] + ); + } } diff --git a/src/crates/core/src/service/search/flashgrep/client.rs b/src/crates/core/src/service/search/flashgrep/client.rs index e49ff8952..a4ac0807e 100644 --- a/src/crates/core/src/service/search/flashgrep/client.rs +++ b/src/crates/core/src/service/search/flashgrep/client.rs @@ -1,45 +1,42 @@ use std::{ - collections::HashMap, ffi::OsString, process::Stdio, sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicBool, Ordering}, Arc, Mutex as StdMutex, MutexGuard as StdMutexGuard, }, time::{Duration, Instant}, }; use crate::util::process_manager; -use serde::Serialize; +use async_trait::async_trait; use tokio::{ - io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, + io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, process::{Child, ChildStderr, ChildStdin, ChildStdout}, - sync::{oneshot, Mutex}, + sync::{mpsc, Mutex}, time::{sleep, timeout}, }; use super::{ error::{AppError, Result}, protocol::{ - ClientCapabilities, ClientInfo, GlobParams, InitializeParams, RepoRef, Request, - RequestEnvelope, Response, ResponseEnvelope, SearchParams, ServerMessage, TaskRef, + ClientCapabilities, ClientInfo, GlobParams, InitializeParams, RepoRef, Request, Response, + SearchParams, TaskRef, }, + repo_session::FlashgrepRepoSession, + rpc_client::{read_content_length_message, ProtocolClient}, types::{ GlobOutcome, GlobRequest, OpenRepoParams, RepoStatus, SearchOutcome, SearchRequest, TaskStatus, }, }; -const JSONRPC_VERSION: &str = "2.0"; const CLIENT_NAME: &str = "bitfun-workspace-search"; const REPO_CLOSE_TIMEOUT: Duration = Duration::from_secs(2); const SHUTDOWN_REQUEST_TIMEOUT: Duration = Duration::from_secs(2); const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(2); const DROP_CLEANUP_TIMEOUT: Duration = Duration::from_millis(150); -type PendingResponseSender = oneshot::Sender>; -type PendingResponses = HashMap; - #[derive(Debug, Clone)] pub(crate) struct ManagedClient { daemon_program: Option, @@ -64,19 +61,12 @@ struct ManagedClientState { #[derive(Debug)] struct AsyncDaemonClient { child: StdMutex>, - writer: Mutex>, - shared: Arc, - next_id: AtomicU64, + protocol: ProtocolClient, + writer_task: StdMutex>>, reader_task: StdMutex>>, stderr_task: StdMutex>>, } -#[derive(Debug, Default)] -struct DaemonShared { - pending: Mutex, - closed: AtomicBool, -} - fn lock_std_mutex(mutex: &StdMutex) -> StdMutexGuard<'_, T> { match mutex.lock() { Ok(guard) => guard, @@ -403,6 +393,37 @@ impl RepoSession { } } +#[async_trait] +impl FlashgrepRepoSession for RepoSession { + async fn status(&self) -> Result { + RepoSession::status(self).await + } + + async fn task_status(&self, task_id: String) -> Result { + RepoSession::task_status(self, task_id).await + } + + async fn build_index(&self) -> Result { + RepoSession::index_build(self).await + } + + async fn rebuild_index(&self) -> Result { + RepoSession::index_rebuild(self).await + } + + async fn search(&self, request: SearchRequest) -> Result { + RepoSession::search(self, request).await + } + + async fn glob(&self, request: GlobRequest) -> Result { + RepoSession::glob(self, request).await + } + + async fn close(&self) -> Result<()> { + RepoSession::close(self).await + } +} + impl AsyncDaemonClient { async fn spawn(daemon_program: Option) -> Result { let program = daemon_program @@ -428,15 +449,17 @@ impl AsyncDaemonClient { })?; let stderr = child.stderr.take(); + let (protocol, write_rx) = ProtocolClient::channel("flashgrep stdio backend"); + let client = Self { child: StdMutex::new(Some(child)), - writer: Mutex::new(BufWriter::new(stdin)), - shared: Arc::new(DaemonShared::default()), - next_id: AtomicU64::new(1), + protocol, + writer_task: StdMutex::new(None), reader_task: StdMutex::new(None), stderr_task: StdMutex::new(None), }; + client.spawn_writer_task(stdin, write_rx).await; client.spawn_reader_task(stdout).await; client.spawn_stderr_task(stderr).await; if let Err(error) = client.initialize().await { @@ -457,11 +480,12 @@ impl AsyncDaemonClient { } fn is_closed(&self) -> bool { - self.shared.closed.load(Ordering::Relaxed) + self.protocol.is_closed() } async fn initialize(&self) -> Result<()> { match self + .protocol .send_request_with_timeout( Request::Initialize { params: InitializeParams { @@ -476,7 +500,9 @@ impl AsyncDaemonClient { ) .await? { - Response::InitializeResult { .. } => self.send_notification(Request::Initialized).await, + Response::InitializeResult { .. } => { + self.protocol.send_notification(Request::Initialized).await + } other => unexpected_response("initialize", other), } } @@ -486,62 +512,9 @@ impl AsyncDaemonClient { request: Request, request_timeout: Option, ) -> Result { - if self.is_closed() { - return Err(AppError::Protocol( - "flashgrep stdio backend is not running".into(), - )); - } - - let request_name = request_name(&request); - let request_id = self.next_id.fetch_add(1, Ordering::Relaxed); - let envelope = RequestEnvelope { - jsonrpc: JSONRPC_VERSION.to_string(), - id: Some(request_id), - request, - }; - let (sender, receiver) = oneshot::channel(); - self.shared.pending.lock().await.insert(request_id, sender); - - if let Err(error) = self.write_envelope(&envelope).await { - self.shared.pending.lock().await.remove(&request_id); - return Err(error); - } - - let response = match request_timeout { - Some(duration) => match timeout(duration, receiver).await { - Ok(result) => result.map_err(|_| { - AppError::Protocol( - "flashgrep stdio backend closed without sending a response".into(), - ) - })??, - Err(_) => { - self.shared.pending.lock().await.remove(&request_id); - return Err(AppError::Protocol(format!( - "flashgrep stdio backend request timed out: {request_name}" - ))); - } - }, - None => receiver.await.map_err(|_| { - AppError::Protocol( - "flashgrep stdio backend closed without sending a response".into(), - ) - })??, - }; - decode_response(request_id, response) - } - - async fn send_notification(&self, request: Request) -> Result<()> { - let envelope = RequestEnvelope { - jsonrpc: JSONRPC_VERSION.to_string(), - id: None, - request, - }; - self.write_envelope(&envelope).await - } - - async fn write_envelope(&self, envelope: &RequestEnvelope) -> Result<()> { - let mut writer = self.writer.lock().await; - write_content_length_message(&mut writer, envelope).await + self.protocol + .send_request_with_timeout(request, request_timeout) + .await } async fn shutdown(&self) -> Result<()> { @@ -565,7 +538,7 @@ impl AsyncDaemonClient { } fn mark_closed(&self) { - self.shared.closed.store(true, Ordering::Relaxed); + self.protocol.mark_closed(); } async fn wait_for_child_exit(&self) -> Result<()> { @@ -588,6 +561,11 @@ impl AsyncDaemonClient { } async fn stop_background_tasks(&self) { + let writer_handle = take_std_option(&self.writer_task); + if let Some(handle) = writer_handle { + handle.abort(); + let _ = handle.await; + } let reader_handle = take_std_option(&self.reader_task); if let Some(handle) = reader_handle { handle.abort(); @@ -600,26 +578,48 @@ impl AsyncDaemonClient { } } + async fn spawn_writer_task(&self, stdin: ChildStdin, mut write_rx: mpsc::Receiver>) { + let protocol = self.protocol.clone(); + let handle = tokio::spawn(async move { + let mut writer = BufWriter::new(stdin); + while let Some(outbound) = write_rx.recv().await { + if let Err(error) = writer.write_all(&outbound).await { + log::debug!("flashgrep stdio daemon stdin write failed: {}", error); + protocol + .close_with_message("flashgrep stdio backend stdin write failed") + .await; + return; + } + if let Err(error) = writer.flush().await { + log::debug!("flashgrep stdio daemon stdin flush failed: {}", error); + protocol + .close_with_message("flashgrep stdio backend stdin flush failed") + .await; + return; + } + } + }); + + *lock_std_mutex(&self.writer_task) = Some(handle); + } + async fn spawn_reader_task(&self, stdout: ChildStdout) { - let shared = self.shared.clone(); + let protocol = self.protocol.clone(); let handle = tokio::spawn(async move { let mut reader = BufReader::new(stdout); - let result = reader_loop(&mut reader, &shared).await; - shared.closed.store(true, Ordering::Relaxed); + let result = reader_loop(&mut reader, &protocol).await; match result { Ok(()) => { - reject_pending_requests( - &shared.pending, - "flashgrep stdio backend closed its stdout pipe", - ) - .await; + protocol + .close_with_message("flashgrep stdio backend closed its stdout pipe") + .await; } Err(error) => { - reject_pending_requests( - &shared.pending, - format!("flashgrep stdio backend reader failed: {error}"), - ) - .await; + protocol + .close_with_message(format!( + "flashgrep stdio backend reader failed: {error}" + )) + .await; } } }); @@ -652,7 +652,7 @@ impl AsyncDaemonClient { } async fn reject_pending(&self, message: impl Into) { - reject_pending_requests(&self.shared.pending, message.into()).await; + self.protocol.reject_pending(message.into()).await; } fn take_child_for_drop(&self) -> Option { @@ -660,6 +660,9 @@ impl AsyncDaemonClient { } fn abort_background_tasks_for_drop(&self) { + if let Some(handle) = take_std_option(&self.writer_task) { + handle.abort(); + } if let Some(handle) = take_std_option(&self.reader_task) { handle.abort(); } @@ -679,128 +682,13 @@ impl Drop for AsyncDaemonClient { } } -async fn reader_loop( - reader: &mut BufReader, - shared: &Arc, -) -> Result<()> { +async fn reader_loop(reader: &mut BufReader, protocol: &ProtocolClient) -> Result<()> { while let Some(message) = read_content_length_message(reader).await? { - match message { - ServerMessage::Response(response) => { - let Some(request_id) = response.id else { - continue; - }; - if let Some(sender) = shared.pending.lock().await.remove(&request_id) { - let _ = sender.send(Ok(response)); - } - } - ServerMessage::Notification(_) => {} - } - } - Ok(()) -} - -async fn reject_pending_requests(pending: &Mutex, message: impl Into) { - let message = message.into(); - let mut pending = pending.lock().await; - if pending.is_empty() { - return; - } - - for (_, sender) in pending.drain() { - let _ = sender.send(Err(AppError::Protocol(message.clone()))); - } -} - -async fn read_content_length_message( - reader: &mut BufReader, -) -> Result> { - let mut content_length = None; - - loop { - let mut line = String::new(); - let read = reader.read_line(&mut line).await?; - if read == 0 { - return Ok(None); - } - if line == "\r\n" || line == "\n" { - break; - } - - let trimmed = line.trim_end_matches(['\r', '\n']); - let Some((name, value)) = trimmed.split_once(':') else { - continue; - }; - if name.trim().eq_ignore_ascii_case("Content-Length") { - let length = value.trim().parse::().map_err(|error| { - AppError::Protocol(format!("invalid Content-Length header: {error}")) - })?; - content_length = Some(length); - } + protocol.handle_server_message(message).await; } - - let content_length = - content_length.ok_or_else(|| AppError::Protocol("missing Content-Length header".into()))?; - let mut body = vec![0u8; content_length]; - reader.read_exact(&mut body).await?; - serde_json::from_slice(&body) - .map_err(|error| AppError::Protocol(format!("failed to decode daemon message: {error}"))) -} - -async fn write_content_length_message( - writer: &mut BufWriter, - message: &impl Serialize, -) -> Result<()> { - let body = serde_json::to_vec(message) - .map_err(|error| AppError::Protocol(format!("failed to encode request: {error}")))?; - writer - .write_all(format!("Content-Length: {}\r\n\r\n", body.len()).as_bytes()) - .await?; - writer.write_all(&body).await?; - writer.flush().await?; Ok(()) } -fn request_name(request: &Request) -> &'static str { - match request { - Request::Initialize { .. } => "initialize", - Request::Initialized => "initialized", - Request::Ping => "ping", - Request::BaseSnapshotBuild { .. } => "base_snapshot/build", - Request::BaseSnapshotRebuild { .. } => "base_snapshot/rebuild", - Request::TaskStatus { .. } => "task/status", - Request::OpenRepo { .. } => "open_repo", - Request::GetRepoStatus { .. } => "get_repo_status", - Request::Search { .. } => "search", - Request::Glob { .. } => "glob", - Request::CloseRepo { .. } => "close_repo", - Request::Shutdown => "shutdown", - } -} - -fn decode_response(request_id: u64, response: ResponseEnvelope) -> Result { - if response.id != Some(request_id) { - return Err(AppError::Protocol(format!( - "daemon response id mismatch: expected {request_id:?}, got {:?}", - response.id - ))); - } - - if response.jsonrpc != JSONRPC_VERSION { - return Err(AppError::Protocol(format!( - "unsupported daemon jsonrpc version: {}", - response.jsonrpc - ))); - } - - if let Some(error) = response.error { - return Err(AppError::Protocol(error.message)); - } - - response - .result - .ok_or_else(|| AppError::Protocol("daemon response missing result".into())) -} - fn should_restart_daemon(error: &AppError, daemon: &AsyncDaemonClient) -> bool { daemon.is_closed() || matches!(error, AppError::Io(_)) } diff --git a/src/crates/core/src/service/search/flashgrep/mod.rs b/src/crates/core/src/service/search/flashgrep/mod.rs index a506a56e1..bcf852c28 100644 --- a/src/crates/core/src/service/search/flashgrep/mod.rs +++ b/src/crates/core/src/service/search/flashgrep/mod.rs @@ -1,13 +1,20 @@ mod client; pub mod error; mod protocol; +mod repo_session; +mod rpc_client; mod types; pub(crate) use client::{ManagedClient, RepoSession}; -pub(crate) use protocol::{FileMatch, MatchLocation, SearchHit, SearchLine}; +pub(crate) use protocol::{ + ClientCapabilities, ClientInfo, FileMatch, GlobParams, InitializeParams, MatchLocation, + RepoRef, Request, Response, SearchHit, SearchLine, SearchParams, TaskRef, +}; +pub(crate) use repo_session::FlashgrepRepoSession; +pub(crate) use rpc_client::{drain_content_length_messages, ProtocolClient}; pub(crate) use types::{ - ConsistencyMode, DirtyFileStats, FileCount, GlobRequest, OpenRepoParams, PathScope, QuerySpec, - RefreshPolicyConfig, RepoConfig, RepoPhase, RepoStatus, SearchBackend, SearchModeConfig, - SearchRequest, SearchResults, TaskKind, TaskPhase, TaskState, TaskStatus, - WorkspaceOverlayStatus, + ConsistencyMode, DirtyFileStats, FileCount, GlobOutcome, GlobRequest, OpenRepoParams, + PathScope, QuerySpec, RefreshPolicyConfig, RepoConfig, RepoPhase, RepoStatus, SearchBackend, + SearchModeConfig, SearchOutcome, SearchRequest, SearchResults, TaskKind, TaskPhase, TaskState, + TaskStatus, WorkspaceOverlayStatus, }; diff --git a/src/crates/core/src/service/search/flashgrep/protocol.rs b/src/crates/core/src/service/search/flashgrep/protocol.rs index 8aa398b8b..31772b667 100644 --- a/src/crates/core/src/service/search/flashgrep/protocol.rs +++ b/src/crates/core/src/service/search/flashgrep/protocol.rs @@ -243,9 +243,10 @@ pub(crate) enum CorpusModeConfig { pub(crate) enum SearchModeConfig { CountOnly, CountMatches, - FirstHitOnly, #[default] MaterializeMatches, + FilesWithMatches, + LineMatches, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)] @@ -482,11 +483,19 @@ pub(crate) enum SearchBackend { #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct SearchResults { pub candidate_docs: usize, + #[serde(default)] + pub searches_with_match: usize, + #[serde(default)] + pub bytes_searched: u64, pub matched_lines: usize, pub matched_occurrences: usize, #[serde(default)] + pub matched_paths: Vec, + #[serde(default)] pub file_counts: Vec, #[serde(default)] + pub file_match_counts: Vec, + #[serde(default)] pub hits: Vec, } @@ -496,6 +505,12 @@ pub(crate) struct FileCount { pub matched_lines: usize, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) struct FileMatchCount { + pub path: String, + pub matched_occurrences: usize, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct SearchHit { pub path: String, diff --git a/src/crates/core/src/service/search/flashgrep/repo_session.rs b/src/crates/core/src/service/search/flashgrep/repo_session.rs new file mode 100644 index 000000000..1353e5b9b --- /dev/null +++ b/src/crates/core/src/service/search/flashgrep/repo_session.rs @@ -0,0 +1,17 @@ +use async_trait::async_trait; + +use super::error::Result; +use super::types::{ + GlobOutcome, GlobRequest, RepoStatus, SearchOutcome, SearchRequest, TaskStatus, +}; + +#[async_trait] +pub(crate) trait FlashgrepRepoSession: Send + Sync { + async fn status(&self) -> Result; + async fn task_status(&self, task_id: String) -> Result; + async fn build_index(&self) -> Result; + async fn rebuild_index(&self) -> Result; + async fn search(&self, request: SearchRequest) -> Result; + async fn glob(&self, request: GlobRequest) -> Result; + async fn close(&self) -> Result<()>; +} diff --git a/src/crates/core/src/service/search/flashgrep/rpc_client.rs b/src/crates/core/src/service/search/flashgrep/rpc_client.rs new file mode 100644 index 000000000..4b72f247e --- /dev/null +++ b/src/crates/core/src/service/search/flashgrep/rpc_client.rs @@ -0,0 +1,322 @@ +use std::collections::HashMap; +use std::fmt; +use std::sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, +}; +use std::time::Duration; + +use serde::Serialize; +use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt}; +use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::time::timeout; + +use super::error::{AppError, Result}; +use super::protocol::{Request, RequestEnvelope, Response, ResponseEnvelope, ServerMessage}; + +const JSONRPC_VERSION: &str = "2.0"; + +type PendingResponseSender = oneshot::Sender>; +type PendingResponses = HashMap; + +#[derive(Clone)] +pub(crate) struct ProtocolClient { + inner: Arc, +} + +struct ProtocolClientInner { + write_tx: mpsc::Sender>, + pending: Mutex, + closed: AtomicBool, + next_id: AtomicU64, + backend_name: String, +} + +impl ProtocolClient { + pub(crate) fn channel(backend_name: impl Into) -> (Self, mpsc::Receiver>) { + let (write_tx, write_rx) = mpsc::channel::>(128); + ( + Self { + inner: Arc::new(ProtocolClientInner { + write_tx, + pending: Mutex::new(HashMap::new()), + closed: AtomicBool::new(false), + next_id: AtomicU64::new(1), + backend_name: backend_name.into(), + }), + }, + write_rx, + ) + } + + pub(crate) fn is_closed(&self) -> bool { + self.inner.closed.load(Ordering::Relaxed) + } + + pub(crate) fn mark_closed(&self) { + self.inner.closed.store(true, Ordering::Relaxed); + } + + pub(crate) async fn send_request_with_timeout( + &self, + request: Request, + request_timeout: Option, + ) -> Result { + if self.is_closed() { + return Err(AppError::Protocol(format!( + "{} is not running", + self.inner.backend_name + ))); + } + + let request_name = request_name(&request); + let request_id = self.inner.next_id.fetch_add(1, Ordering::Relaxed); + let envelope = RequestEnvelope { + jsonrpc: JSONRPC_VERSION.to_string(), + id: Some(request_id), + request, + }; + let bytes = encode_content_length_message(&envelope)?; + let (sender, receiver) = oneshot::channel(); + self.inner.pending.lock().await.insert(request_id, sender); + + if self.inner.write_tx.send(bytes).await.is_err() { + self.inner.pending.lock().await.remove(&request_id); + return Err(AppError::Protocol(format!( + "{} write channel is closed", + self.inner.backend_name + ))); + } + + let response = match request_timeout { + Some(duration) => match timeout(duration, receiver).await { + Ok(result) => result.map_err(|_| { + AppError::Protocol(format!( + "{} closed without sending a response", + self.inner.backend_name + )) + })??, + Err(_) => { + self.inner.pending.lock().await.remove(&request_id); + return Err(AppError::Protocol(format!( + "{} request timed out: {request_name}", + self.inner.backend_name + ))); + } + }, + None => receiver.await.map_err(|_| { + AppError::Protocol(format!( + "{} closed without sending a response", + self.inner.backend_name + )) + })??, + }; + + decode_response(request_id, response) + } + + pub(crate) async fn send_notification(&self, request: Request) -> Result<()> { + if self.is_closed() { + return Err(AppError::Protocol(format!( + "{} is not running", + self.inner.backend_name + ))); + } + + let envelope = RequestEnvelope { + jsonrpc: JSONRPC_VERSION.to_string(), + id: None, + request, + }; + let bytes = encode_content_length_message(&envelope)?; + self.inner.write_tx.send(bytes).await.map_err(|_| { + AppError::Protocol(format!( + "{} write channel is closed", + self.inner.backend_name + )) + }) + } + + pub(crate) async fn handle_server_message(&self, message: ServerMessage) { + match message { + ServerMessage::Response(response) => { + let Some(request_id) = response.id else { + return; + }; + if let Some(sender) = self.inner.pending.lock().await.remove(&request_id) { + let _ = sender.send(Ok(response)); + } + } + ServerMessage::Notification(notification) => { + log::debug!( + "Flashgrep protocol notification: backend={}, method={}", + self.inner.backend_name, + notification.method + ); + } + } + } + + pub(crate) async fn close_with_message(&self, message: impl Into) { + self.mark_closed(); + self.reject_pending(message).await; + } + + pub(crate) async fn reject_pending(&self, message: impl Into) { + let message = message.into(); + let mut pending = self.inner.pending.lock().await; + for (_, sender) in pending.drain() { + let _ = sender.send(Err(AppError::Protocol(message.clone()))); + } + } +} + +impl fmt::Debug for ProtocolClient { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter + .debug_struct("ProtocolClient") + .field("backend_name", &self.inner.backend_name) + .field("closed", &self.is_closed()) + .finish_non_exhaustive() + } +} + +pub(crate) async fn read_content_length_message(reader: &mut R) -> Result> +where + R: AsyncBufRead + AsyncRead + Unpin, +{ + let mut content_length = None; + + loop { + let mut line = String::new(); + let read = reader.read_line(&mut line).await?; + if read == 0 { + return Ok(None); + } + if line == "\r\n" || line == "\n" { + break; + } + + let trimmed = line.trim_end_matches(['\r', '\n']); + let Some((name, value)) = trimmed.split_once(':') else { + continue; + }; + if name.trim().eq_ignore_ascii_case("Content-Length") { + let length = value.trim().parse::().map_err(|error| { + AppError::Protocol(format!("invalid Content-Length header: {error}")) + })?; + content_length = Some(length); + } + } + + let content_length = + content_length.ok_or_else(|| AppError::Protocol("missing Content-Length header".into()))?; + let mut body = vec![0u8; content_length]; + reader.read_exact(&mut body).await?; + serde_json::from_slice(&body) + .map_err(|error| AppError::Protocol(format!("failed to decode daemon message: {error}"))) +} + +pub(crate) fn drain_content_length_messages(buffer: &mut Vec) -> Result> { + let mut messages = Vec::new(); + + loop { + let Some(header_end) = find_header_end(buffer) else { + break; + }; + let header = String::from_utf8_lossy(&buffer[..header_end]); + let mut content_length = None; + for line in header.lines() { + let Some((name, value)) = line.split_once(':') else { + continue; + }; + if name.trim().eq_ignore_ascii_case("Content-Length") { + content_length = Some(value.trim().parse::().map_err(|error| { + AppError::Protocol(format!("invalid Content-Length header: {error}")) + })?); + } + } + let content_length = content_length + .ok_or_else(|| AppError::Protocol("missing Content-Length header".into()))?; + let body_start = header_end + header_delimiter_len(buffer, header_end); + let body_end = body_start + content_length; + if buffer.len() < body_end { + break; + } + let message = serde_json::from_slice::(&buffer[body_start..body_end]) + .map_err(|error| { + AppError::Protocol(format!("failed to decode daemon message: {error}")) + })?; + buffer.drain(..body_end); + messages.push(message); + } + + Ok(messages) +} + +fn encode_content_length_message(message: &impl Serialize) -> Result> { + let body = serde_json::to_vec(message) + .map_err(|error| AppError::Protocol(format!("failed to encode request: {error}")))?; + let mut framed = format!("Content-Length: {}\r\n\r\n", body.len()).into_bytes(); + framed.extend_from_slice(&body); + Ok(framed) +} + +fn find_header_end(buffer: &[u8]) -> Option { + buffer + .windows(4) + .position(|window| window == b"\r\n\r\n") + .or_else(|| buffer.windows(2).position(|window| window == b"\n\n")) +} + +fn header_delimiter_len(buffer: &[u8], header_end: usize) -> usize { + if buffer + .get(header_end..header_end + 4) + .is_some_and(|delimiter| delimiter == b"\r\n\r\n") + { + 4 + } else { + 2 + } +} + +fn decode_response(request_id: u64, response: ResponseEnvelope) -> Result { + if response.id != Some(request_id) { + return Err(AppError::Protocol(format!( + "daemon response id mismatch: expected {request_id:?}, got {:?}", + response.id + ))); + } + + if response.jsonrpc != JSONRPC_VERSION { + return Err(AppError::Protocol(format!( + "unsupported daemon jsonrpc version: {}", + response.jsonrpc + ))); + } + + if let Some(error) = response.error { + return Err(AppError::Protocol(error.message)); + } + + response + .result + .ok_or_else(|| AppError::Protocol("daemon response missing result".into())) +} + +fn request_name(request: &Request) -> &'static str { + match request { + Request::Initialize { .. } => "initialize", + Request::Initialized => "initialized", + Request::Ping => "ping", + Request::BaseSnapshotBuild { .. } => "base_snapshot/build", + Request::BaseSnapshotRebuild { .. } => "base_snapshot/rebuild", + Request::TaskStatus { .. } => "task/status", + Request::OpenRepo { .. } => "open_repo", + Request::GetRepoStatus { .. } => "get_repo_status", + Request::Search { .. } => "search", + Request::Glob { .. } => "glob", + Request::CloseRepo { .. } => "close_repo", + Request::Shutdown => "shutdown", + } +} diff --git a/src/crates/core/src/service/search/mod.rs b/src/crates/core/src/service/search/mod.rs index a467f21a8..faff550df 100644 --- a/src/crates/core/src/service/search/mod.rs +++ b/src/crates/core/src/service/search/mod.rs @@ -1,7 +1,9 @@ pub(crate) mod flashgrep; +mod remote; pub mod service; pub mod types; +pub use remote::{remote_workspace_search_service_for_path, RemoteWorkspaceSearchService}; pub use service::{ get_global_workspace_search_service, resolve_workspace_search_daemon_program_path, set_global_workspace_search_service, workspace_search_daemon_available, diff --git a/src/crates/core/src/service/search/remote.rs b/src/crates/core/src/service/search/remote.rs new file mode 100644 index 000000000..3ba6748dc --- /dev/null +++ b/src/crates/core/src/service/search/remote.rs @@ -0,0 +1,1534 @@ +use crate::infrastructure::{FileSearchOutcome, FileSearchResult, SearchMatchType}; +use crate::service::config::{get_global_config_service, types::WorkspaceConfig, ConfigService}; +use crate::service::remote_ssh::workspace_state::{ + get_remote_workspace_manager, lookup_remote_connection, lookup_remote_connection_with_hint, + RemoteWorkspaceEntry, +}; +use crate::service::remote_ssh::{ + normalize_remote_workspace_path, RemoteFileService, SSHConnectionManager, +}; +use crate::service::search::flashgrep::{ + drain_content_length_messages, ClientCapabilities, ClientInfo, ConsistencyMode, GlobOutcome, + GlobParams, GlobRequest, InitializeParams, OpenRepoParams, PathScope, ProtocolClient, + QuerySpec, RefreshPolicyConfig, RepoConfig, RepoRef, RepoStatus, Request, Response, + SearchModeConfig, SearchOutcome, SearchParams, SearchRequest, SearchResults, TaskRef, + TaskStatus, +}; +use crate::service::search::flashgrep::{error::AppError, FlashgrepRepoSession}; +use crate::service::search::{ + ContentSearchOutputMode, ContentSearchRequest, ContentSearchResult, GlobSearchRequest, + GlobSearchResult, IndexTaskHandle, WorkspaceIndexStatus, WorkspaceSearchFileCount, + WorkspaceSearchHit, WorkspaceSearchRepoStatus, +}; +use async_trait::async_trait; +use sha2::{Digest, Sha256}; +use std::collections::{BTreeMap, HashMap}; +use std::ops::Deref; +use std::path::{Path, PathBuf}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, LazyLock, +}; +use std::time::Duration; +use tokio::io::AsyncWriteExt; +use tokio::sync::{mpsc, Mutex, RwLock}; +use tokio::time::{sleep, timeout}; + +const REMOTE_FLASHGREP_INSTALL_DIR: &str = ".bitfun/bin"; +const REMOTE_STDIO_REQUEST_TIMEOUT: Duration = Duration::from_secs(120); +const REMOTE_STDIO_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(2); +const REMOTE_STDIO_SESSION_IDLE_GRACE: Duration = Duration::from_secs(45); +const CLIENT_NAME: &str = "bitfun-remote-workspace-search"; +const REMOTE_OS_PROBES: &[&str] = &["uname -s", "sh -c 'uname -s 2>/dev/null'"]; +const REMOTE_ARCHITECTURE_PROBES: &[&str] = &[ + "uname -m", + "arch", + "sh -c 'uname -m 2>/dev/null || arch 2>/dev/null'", +]; +const LINUX_X86_64_FLASHGREP_BUNDLES: &[&str] = &[ + "flashgrep-x86_64-unknown-linux-musl", + "flashgrep-x86_64-unknown-linux-gnu", +]; +const LINUX_AARCH64_FLASHGREP_BUNDLES: &[&str] = &[ + "flashgrep-aarch64-unknown-linux-musl", + "flashgrep-aarch64-unknown-linux-gnu", +]; + +static REMOTE_STDIO_SESSIONS: LazyLock>> = + LazyLock::new(|| RwLock::new(HashMap::new())); +static REMOTE_STDIO_OPEN_GUARDS: LazyLock>>>> = + LazyLock::new(|| Mutex::new(HashMap::new())); +static REMOTE_SEARCH_CONTEXTS: LazyLock>> = + LazyLock::new(|| RwLock::new(HashMap::new())); + +#[derive(Clone)] +struct RemoteStdioSessionEntry { + session: Arc, + activity_epoch: Arc, +} + +struct RemoteStdioRepoSession { + repo_id: String, + client: Arc, + activity_epoch: Arc, + active_operations: Arc, +} + +struct RemoteStdioDaemonClient { + protocol: ProtocolClient, +} + +struct RemoteStdioOperationLease { + activity_epoch: Arc, + active_operations: Arc, +} + +struct RemoteStdioSessionLease { + session: Arc, + _operation: RemoteStdioOperationLease, +} + +impl Drop for RemoteStdioOperationLease { + fn drop(&mut self) { + self.active_operations.fetch_sub(1, Ordering::Relaxed); + self.activity_epoch.fetch_add(1, Ordering::Relaxed); + } +} + +impl RemoteStdioSessionLease { + fn new(session: Arc) -> Self { + let operation = session.acquire_operation(); + Self { + session, + _operation: operation, + } + } +} + +impl Deref for RemoteStdioSessionLease { + type Target = RemoteStdioRepoSession; + + fn deref(&self) -> &Self::Target { + &self.session + } +} + +impl RemoteStdioDaemonClient { + async fn spawn( + ssh_manager: SSHConnectionManager, + connection_id: String, + binary_path: String, + ) -> Result, String> { + let command = format!("{} serve --stdio", shell_escape(&binary_path)); + let channel = ssh_manager + .open_exec_channel(&connection_id, &command) + .await + .map_err(|error| format!("Failed to start remote flashgrep stdio daemon: {error}"))?; + + let (protocol, write_rx) = ProtocolClient::channel("remote flashgrep stdio daemon"); + spawn_remote_stdio_owner(connection_id, channel, write_rx, protocol.clone()); + + let client = Arc::new(Self { protocol }); + client.initialize().await?; + Ok(client) + } + + async fn initialize(&self) -> Result<(), String> { + match self + .protocol + .send_request_with_timeout( + Request::Initialize { + params: InitializeParams { + client_info: Some(ClientInfo { + name: CLIENT_NAME.to_string(), + version: Some(env!("CARGO_PKG_VERSION").to_string()), + }), + capabilities: ClientCapabilities::default(), + }, + }, + Some(REMOTE_STDIO_REQUEST_TIMEOUT), + ) + .await + .map_err(|error| error.to_string())? + { + Response::InitializeResult { .. } => { + self.protocol + .send_notification(Request::Initialized) + .await + .map_err(|error| error.to_string())?; + Ok(()) + } + other => Err(format!( + "Unexpected remote flashgrep initialize response: {other:?}" + )), + } + } + + async fn open_repo( + self: &Arc, + params: OpenRepoParams, + ) -> Result { + match self.send_request(Request::OpenRepo { params }).await? { + Response::RepoOpened { repo_id, .. } => Ok(RemoteStdioRepoSession { + repo_id, + client: self.clone(), + activity_epoch: Arc::new(AtomicU64::new(1)), + active_operations: Arc::new(AtomicU64::new(0)), + }), + other => Err(format!( + "Unexpected remote flashgrep open_repo response: {other:?}" + )), + } + } + + async fn send_request(&self, request: Request) -> Result { + self.protocol + .send_request_with_timeout(request, Some(REMOTE_STDIO_REQUEST_TIMEOUT)) + .await + .map_err(|error| error.to_string()) + } + + async fn shutdown(&self) { + let _ = timeout( + REMOTE_STDIO_SHUTDOWN_TIMEOUT, + self.send_request(Request::Shutdown), + ) + .await; + self.protocol + .close_with_message("remote flashgrep stdio daemon is shutting down") + .await; + } + + fn is_closed(&self) -> bool { + self.protocol.is_closed() + } +} + +impl RemoteStdioRepoSession { + fn acquire_operation(&self) -> RemoteStdioOperationLease { + self.active_operations.fetch_add(1, Ordering::Relaxed); + self.activity_epoch.fetch_add(1, Ordering::Relaxed); + RemoteStdioOperationLease { + activity_epoch: self.activity_epoch.clone(), + active_operations: self.active_operations.clone(), + } + } + + async fn status(&self) -> Result { + let _lease = self.acquire_operation(); + self.status_without_activity_lease().await + } + + async fn status_without_activity_lease(&self) -> Result { + match self + .client + .send_request(Request::GetRepoStatus { + params: self.repo_ref(), + }) + .await? + { + Response::RepoStatus { status } => Ok(status), + other => Err(format!( + "Unexpected remote flashgrep get_repo_status response: {other:?}" + )), + } + } + + async fn task_status(&self, task_id: impl Into) -> Result { + let _lease = self.acquire_operation(); + match self + .client + .send_request(Request::TaskStatus { + params: TaskRef { + task_id: task_id.into(), + }, + }) + .await? + { + Response::TaskStatus { task } => Ok(task), + other => Err(format!( + "Unexpected remote flashgrep task/status response: {other:?}" + )), + } + } + + async fn build_index(&self) -> Result { + let _lease = self.acquire_operation(); + match self + .client + .send_request(Request::BaseSnapshotBuild { + params: self.repo_ref(), + }) + .await? + { + Response::TaskStarted { task } => Ok(task), + other => Err(format!( + "Unexpected remote flashgrep build response: {other:?}" + )), + } + } + + async fn rebuild_index(&self) -> Result { + let _lease = self.acquire_operation(); + match self + .client + .send_request(Request::BaseSnapshotRebuild { + params: self.repo_ref(), + }) + .await? + { + Response::TaskStarted { task } => Ok(task), + other => Err(format!( + "Unexpected remote flashgrep rebuild response: {other:?}" + )), + } + } + + async fn search( + &self, + query: QuerySpec, + scope: PathScope, + ) -> Result< + ( + crate::service::search::flashgrep::SearchBackend, + RepoStatus, + SearchResults, + ), + String, + > { + let _lease = self.acquire_operation(); + match self + .client + .send_request(Request::Search { + params: SearchParams { + repo_id: self.repo_id.clone(), + query, + scope, + consistency: ConsistencyMode::WorkspaceEventual, + allow_scan_fallback: true, + }, + }) + .await? + { + Response::SearchCompleted { + backend, + status, + results, + .. + } => Ok((backend, status, results)), + other => Err(format!( + "Unexpected remote flashgrep search response: {other:?}" + )), + } + } + + async fn glob(&self, scope: PathScope) -> Result<(RepoStatus, Vec), String> { + let _lease = self.acquire_operation(); + match self + .client + .send_request(Request::Glob { + params: GlobParams { + repo_id: self.repo_id.clone(), + scope, + }, + }) + .await? + { + Response::GlobCompleted { status, paths, .. } => Ok((status, paths)), + other => Err(format!( + "Unexpected remote flashgrep glob response: {other:?}" + )), + } + } + + async fn close(&self) { + let _ = self + .client + .send_request(Request::CloseRepo { + params: self.repo_ref(), + }) + .await; + } + + fn repo_ref(&self) -> RepoRef { + RepoRef { + repo_id: self.repo_id.clone(), + } + } +} + +#[async_trait] +impl FlashgrepRepoSession for RemoteStdioRepoSession { + async fn status(&self) -> crate::service::search::flashgrep::error::Result { + RemoteStdioRepoSession::status(self) + .await + .map_err(AppError::Protocol) + } + + async fn task_status( + &self, + task_id: String, + ) -> crate::service::search::flashgrep::error::Result { + RemoteStdioRepoSession::task_status(self, task_id) + .await + .map_err(AppError::Protocol) + } + + async fn build_index(&self) -> crate::service::search::flashgrep::error::Result { + RemoteStdioRepoSession::build_index(self) + .await + .map_err(AppError::Protocol) + } + + async fn rebuild_index(&self) -> crate::service::search::flashgrep::error::Result { + RemoteStdioRepoSession::rebuild_index(self) + .await + .map_err(AppError::Protocol) + } + + async fn search( + &self, + request: SearchRequest, + ) -> crate::service::search::flashgrep::error::Result { + let (backend, status, results) = + RemoteStdioRepoSession::search(self, request.query, request.scope) + .await + .map_err(AppError::Protocol)?; + Ok(SearchOutcome { + backend, + status, + results, + }) + } + + async fn glob( + &self, + request: GlobRequest, + ) -> crate::service::search::flashgrep::error::Result { + let (status, paths) = RemoteStdioRepoSession::glob(self, request.scope) + .await + .map_err(AppError::Protocol)?; + Ok(GlobOutcome { status, paths }) + } + + async fn close(&self) -> crate::service::search::flashgrep::error::Result<()> { + RemoteStdioRepoSession::close(self).await; + Ok(()) + } +} + +fn spawn_remote_stdio_owner( + connection_id: String, + mut channel: russh::Channel, + mut write_rx: mpsc::Receiver>, + protocol: ProtocolClient, +) { + tokio::spawn(async move { + let mut writer = channel.make_writer(); + let mut read_buffer = Vec::::new(); + + loop { + tokio::select! { + outbound = write_rx.recv() => { + let Some(outbound) = outbound else { + let _ = channel.eof().await; + let _ = channel.close().await; + break; + }; + if let Err(error) = writer.write_all(&outbound).await { + log::warn!( + "Failed to write remote flashgrep stdio request: connection_id={}, error={}", + connection_id, + error + ); + protocol + .close_with_message("remote flashgrep stdio daemon write failed") + .await; + break; + } + if let Err(error) = writer.flush().await { + log::warn!( + "Failed to flush remote flashgrep stdio request: connection_id={}, error={}", + connection_id, + error + ); + protocol + .close_with_message("remote flashgrep stdio daemon flush failed") + .await; + break; + } + } + + message = channel.wait() => { + match message { + Some(russh::ChannelMsg::Data { data }) => { + read_buffer.extend_from_slice(&data); + match drain_content_length_messages(&mut read_buffer) { + Ok(messages) => { + for message in messages { + protocol.handle_server_message(message).await; + } + } + Err(error) => { + log::warn!( + "Failed to decode remote flashgrep stdio message: connection_id={}, error={}", + connection_id, + error + ); + protocol + .close_with_message(format!( + "remote flashgrep stdio daemon decode failed: {error}" + )) + .await; + break; + } + } + } + Some(russh::ChannelMsg::ExtendedData { data, .. }) => { + let text = String::from_utf8_lossy(&data); + for line in text.lines() { + log::debug!( + "remote flashgrep stdio daemon stderr: connection_id={}, line={}", + connection_id, + line + ); + } + } + Some(russh::ChannelMsg::ExitStatus { exit_status }) => { + log::debug!( + "Remote flashgrep stdio daemon exited: connection_id={}, exit_status={}", + connection_id, + exit_status + ); + break; + } + Some(russh::ChannelMsg::Eof) | Some(russh::ChannelMsg::Close) | None => { + break; + } + Some(_) => {} + } + } + } + } + + protocol + .close_with_message("remote flashgrep stdio daemon closed before sending a response") + .await; + }); +} + +#[derive(Clone)] +pub struct RemoteWorkspaceSearchService { + ssh_manager: SSHConnectionManager, + remote_file_service: RemoteFileService, + config_service: Arc, + preferred_connection_id: Option, +} + +#[derive(Debug, Clone)] +struct RemoteSearchContext { + connection: RemoteWorkspaceEntry, + binary_path: String, + repo_root: String, + storage_root: String, + remote_arch: String, + local_binary_sha256: String, +} + +struct LocalFlashgrepBundle { + binary_name: String, + path: PathBuf, + bytes: Vec, + sha256: String, +} + +impl RemoteWorkspaceSearchService { + pub fn new( + ssh_manager: SSHConnectionManager, + remote_file_service: RemoteFileService, + config_service: Arc, + ) -> Self { + Self { + ssh_manager, + remote_file_service, + config_service, + preferred_connection_id: None, + } + } + + pub fn with_preferred_connection_id(mut self, preferred_connection_id: Option) -> Self { + self.preferred_connection_id = preferred_connection_id; + self + } + + pub async fn get_index_status(&self, root_path: &str) -> Result { + let session = self.get_or_open_stdio_session(root_path).await?; + let repo_status: WorkspaceSearchRepoStatus = session.status().await?.into(); + let active_task = match repo_status.active_task_id.clone() { + Some(task_id) => match session.task_status(task_id).await { + Ok(task) => Some(task.into()), + Err(error) => { + log::warn!( + "Failed to fetch active remote flashgrep task status: {}", + error + ); + None + } + }, + None => None, + }; + Ok(WorkspaceIndexStatus { + active_task, + repo_status, + }) + } + + pub async fn build_index(&self, root_path: &str) -> Result { + let session = self.get_or_open_stdio_session(root_path).await?; + let task = session.build_index().await?; + let repo_status = session.status().await?; + Ok(IndexTaskHandle { + task: task.into(), + repo_status: repo_status.into(), + }) + } + + pub async fn rebuild_index(&self, root_path: &str) -> Result { + let session = self.get_or_open_stdio_session(root_path).await?; + let task = session.rebuild_index().await?; + let repo_status = session.status().await?; + Ok(IndexTaskHandle { + task: task.into(), + repo_status: repo_status.into(), + }) + } + + pub async fn search_content( + &self, + request: ContentSearchRequest, + ) -> Result { + let repo_root = normalize_remote_workspace_path(&request.repo_root.to_string_lossy()); + let session = self.get_or_open_stdio_session(&repo_root).await?; + let scope = build_remote_scope( + &repo_root, + request.search_path.as_deref(), + request.globs, + request.file_types, + request.exclude_file_types, + )?; + let max_results = request.max_results.filter(|limit| *limit > 0); + let query = QuerySpec { + pattern: request.pattern, + patterns: Vec::new(), + case_insensitive: !request.case_sensitive, + multiline: request.multiline, + dot_matches_new_line: request.multiline, + fixed_strings: !request.use_regex, + word_regexp: request.whole_word, + line_regexp: false, + before_context: request.before_context, + after_context: request.after_context, + top_k_tokens: 6, + max_count: None, + global_max_results: max_results, + search_mode: remote_stdio_search_mode(request.output_mode), + }; + + let output_mode = request.output_mode; + let (backend, repo_status, raw_results) = session.search(query, scope).await?; + let mut results = convert_stdio_search_results(&raw_results, output_mode); + let truncated = max_results + .map(|limit| results.len() >= limit) + .unwrap_or(false); + if let Some(limit) = max_results { + results.truncate(limit); + } + + Ok(ContentSearchResult { + outcome: FileSearchOutcome { results, truncated }, + file_counts: raw_results + .file_counts + .clone() + .into_iter() + .map(WorkspaceSearchFileCount::from) + .collect(), + hits: raw_results + .hits + .clone() + .into_iter() + .map(WorkspaceSearchHit::from) + .collect(), + backend: backend.into(), + repo_status: repo_status.into(), + candidate_docs: raw_results.candidate_docs, + matched_lines: raw_results.matched_lines, + matched_occurrences: raw_results.matched_occurrences, + }) + } + + pub async fn glob(&self, request: GlobSearchRequest) -> Result { + let repo_root = normalize_remote_workspace_path(&request.repo_root.to_string_lossy()); + let session = self.get_or_open_stdio_session(&repo_root).await?; + let scope = build_remote_scope( + &repo_root, + request.search_path.as_deref(), + vec![request.pattern], + Vec::new(), + Vec::new(), + )?; + let (repo_status, mut paths) = session.glob(scope).await?; + + paths.sort(); + if request.limit > 0 { + paths.truncate(request.limit); + } else { + paths.clear(); + } + + Ok(GlobSearchResult { + paths, + repo_status: repo_status.into(), + }) + } + + async fn get_or_open_stdio_session( + &self, + root_path: &str, + ) -> Result { + let context = self.ensure_remote_search_context(root_path).await?; + let key = remote_stdio_session_key(&context.connection.connection_id, &context.repo_root); + + if let Some(entry) = REMOTE_STDIO_SESSIONS.read().await.get(&key).cloned() { + entry.activity_epoch.fetch_add(1, Ordering::Relaxed); + if !entry.session.client.is_closed() { + return Ok(RemoteStdioSessionLease::new(entry.session.clone())); + } + log::warn!( + "Remote workspace search stdio session became unhealthy, reopening: connection_id={}, path={}", + context.connection.connection_id, + context.repo_root + ); + REMOTE_STDIO_SESSIONS.write().await.remove(&key); + entry.session.close().await; + entry.session.client.shutdown().await; + } + + let guard = { + let mut guards = REMOTE_STDIO_OPEN_GUARDS.lock().await; + guards + .entry(key.clone()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone() + }; + let _guard = guard.lock().await; + + if let Some(entry) = REMOTE_STDIO_SESSIONS.read().await.get(&key).cloned() { + entry.activity_epoch.fetch_add(1, Ordering::Relaxed); + return Ok(RemoteStdioSessionLease::new(entry.session)); + } + + let client = RemoteStdioDaemonClient::spawn( + self.ssh_manager.clone(), + context.connection.connection_id.clone(), + context.binary_path.clone(), + ) + .await?; + let mut repo_config = RepoConfig::default(); + repo_config.max_file_size = self.max_file_size().await; + let session = client + .open_repo(OpenRepoParams { + repo_path: PathBuf::from(&context.repo_root), + storage_root: Some(PathBuf::from(&context.storage_root)), + config: repo_config, + refresh: RefreshPolicyConfig::default(), + }) + .await?; + let activity_epoch = session.activity_epoch.clone(); + let session = Arc::new(session); + REMOTE_STDIO_SESSIONS.write().await.insert( + key.clone(), + RemoteStdioSessionEntry { + session: session.clone(), + activity_epoch: activity_epoch.clone(), + }, + ); + schedule_remote_stdio_session_release(key, activity_epoch); + Ok(RemoteStdioSessionLease::new(session)) + } + + pub async fn resolve_remote_workspace_entry( + &self, + root_path: &str, + ) -> Result { + if let Some(entry) = + lookup_remote_connection_with_hint(root_path, self.preferred_connection_id.as_deref()) + .await + { + return Ok(entry); + } + lookup_remote_connection(root_path) + .await + .ok_or_else(|| format!("Remote workspace is not registered for path: {root_path}")) + } + + async fn ensure_remote_search_context( + &self, + root_path: &str, + ) -> Result { + let repo_root = normalize_remote_workspace_path(root_path); + let cache_key = + remote_search_context_key(self.preferred_connection_id.as_deref(), &repo_root); + if let Some(context) = REMOTE_SEARCH_CONTEXTS.read().await.get(&cache_key).cloned() { + let local_bundle = local_flashgrep_bundle_for_arch(&context.remote_arch).await?; + if local_bundle.sha256 == context.local_binary_sha256 { + return Ok(context); + } + + log::info!( + "Bundled remote flashgrep binary changed; reopening remote search session: connection_id={}, path={}, old_sha256={}, new_sha256={}", + context.connection.connection_id, + context.repo_root, + context.local_binary_sha256, + local_bundle.sha256 + ); + REMOTE_SEARCH_CONTEXTS.write().await.remove(&cache_key); + let session_key = + remote_stdio_session_key(&context.connection.connection_id, &context.repo_root); + if let Some(entry) = REMOTE_STDIO_SESSIONS.write().await.remove(&session_key) { + entry.session.close().await; + entry.session.client.shutdown().await; + } + } + + let connection = self.resolve_remote_workspace_entry(&repo_root).await?; + let cached_server_info = self + .ssh_manager + .get_server_info(&connection.connection_id) + .await; + let remote_os = if let Some(server_info) = cached_server_info { + if server_info.os_type.eq_ignore_ascii_case("unknown") { + self.detect_remote_os_type(&connection.connection_id) + .await + .unwrap_or_else(|| server_info.os_type.clone()) + } else { + server_info.os_type + } + } else { + self.detect_remote_os_type(&connection.connection_id) + .await + .unwrap_or_else(|| "unknown".to_string()) + }; + let inferred_linux = remote_os.eq_ignore_ascii_case("unknown") + && looks_like_linux_workspace_root(&repo_root); + if !remote_os.eq_ignore_ascii_case("linux") && !inferred_linux { + return Err(format!( + "Remote workspace search currently supports Linux only, but server OS is {}", + remote_os + )); + } + + let remote_arch = self + .detect_remote_architecture(&connection.connection_id) + .await?; + let local_bundle = local_flashgrep_bundle_for_arch(&remote_arch).await?; + let binary_path = self + .ensure_remote_flashgrep_binary(&connection.connection_id, &repo_root, &local_bundle) + .await?; + let storage_root = join_remote_path(&repo_root, ".bitfun/search/flashgrep-index"); + + let context = RemoteSearchContext { + connection, + binary_path, + repo_root, + storage_root, + remote_arch, + local_binary_sha256: local_bundle.sha256, + }; + REMOTE_SEARCH_CONTEXTS + .write() + .await + .insert(cache_key, context.clone()); + Ok(context) + } + + async fn detect_remote_architecture(&self, connection_id: &str) -> Result { + let mut attempts = Vec::new(); + + for probe in REMOTE_ARCHITECTURE_PROBES { + match self.ssh_manager.execute_command(connection_id, probe).await { + Ok((stdout, stderr, exit_code)) => { + if let Some(arch) = parse_remote_architecture_output(&stdout, &stderr) { + return Ok(arch); + } + attempts.push(format!( + "probe=`{probe}` exit_code={exit_code} stdout={:?} stderr={:?}", + stdout.trim(), + stderr.trim() + )); + } + Err(error) => { + attempts.push(format!("probe=`{probe}` error={error}")); + } + } + } + + Err(format!( + "Failed to detect remote architecture from SSH output. Attempts: {}", + attempts.join("; ") + )) + } + + async fn detect_remote_os_type(&self, connection_id: &str) -> Option { + for probe in REMOTE_OS_PROBES { + let Ok((stdout, stderr, _exit_code)) = + self.ssh_manager.execute_command(connection_id, probe).await + else { + continue; + }; + if let Some(os_type) = parse_remote_os_output(&stdout, &stderr) { + return Some(os_type); + } + } + None + } + + async fn ensure_remote_flashgrep_binary( + &self, + connection_id: &str, + repo_root: &str, + local_bundle: &LocalFlashgrepBundle, + ) -> Result { + let install_dir = remote_flashgrep_install_dir(repo_root); + let remote_binary_path = join_remote_path(&install_dir, &local_bundle.binary_name); + + self.remote_file_service + .create_dir_all(connection_id, &install_dir) + .await + .map_err(|error| { + format!("Failed to create remote flashgrep install directory: {error}") + })?; + let remote_sha256 = self + .remote_flashgrep_sha256(connection_id, &remote_binary_path) + .await?; + if remote_sha256.as_deref() != Some(local_bundle.sha256.as_str()) { + log::info!( + "Uploading bundled remote flashgrep binary: connection_id={}, path={}, bundle={}, local_path={}, local_sha256={}, remote_sha256={}", + connection_id, + remote_binary_path, + local_bundle.binary_name, + local_bundle.path.display(), + local_bundle.sha256, + remote_sha256.as_deref().unwrap_or("missing") + ); + self.remote_file_service + .write_file(connection_id, &remote_binary_path, &local_bundle.bytes) + .await + .map_err(|error| format!("Failed to upload flashgrep to remote host: {error}"))?; + } + self.ssh_manager + .execute_command( + connection_id, + &format!("chmod 755 {}", shell_escape(&remote_binary_path)), + ) + .await + .map_err(|error| format!("Failed to mark remote flashgrep as executable: {error}"))?; + + Ok(remote_binary_path) + } + + async fn remote_flashgrep_sha256( + &self, + connection_id: &str, + remote_binary_path: &str, + ) -> Result, String> { + let escaped_path = shell_escape(remote_binary_path); + let command = format!( + "if [ -f {path} ]; then if command -v sha256sum >/dev/null 2>&1; then sha256sum {path} | awk '{{print $1}}'; elif command -v shasum >/dev/null 2>&1; then shasum -a 256 {path} | awk '{{print $1}}'; fi; fi", + path = escaped_path + ); + let (stdout, _stderr, exit_code) = self + .ssh_manager + .execute_command(connection_id, &command) + .await + .map_err(|error| format!("Failed to hash remote flashgrep binary: {error}"))?; + if exit_code != 0 { + return Ok(None); + } + let hash = stdout.trim(); + if hash.len() == 64 && hash.chars().all(|character| character.is_ascii_hexdigit()) { + Ok(Some(hash.to_ascii_lowercase())) + } else { + Ok(None) + } + } + + async fn max_file_size(&self) -> u64 { + match self + .config_service + .get_config::(Some("workspace")) + .await + { + Ok(workspace_config) => workspace_config.max_file_size, + Err(error) => { + log::warn!( + "Failed to read workspace config for remote flashgrep repo open, using default max_file_size: {}", + error + ); + WorkspaceConfig::default().max_file_size + } + } + } +} + +pub async fn remote_workspace_search_service_for_path( + root_path: &str, + preferred_connection_id: Option, +) -> Result { + let manager = get_remote_workspace_manager() + .ok_or_else(|| "Remote workspace manager is unavailable".to_string())?; + let preferred_connection_id = match preferred_connection_id { + Some(connection_id) => Some(connection_id), + None => lookup_remote_connection(root_path) + .await + .map(|entry| entry.connection_id), + }; + + Ok(RemoteWorkspaceSearchService::new( + manager + .get_ssh_manager() + .await + .ok_or_else(|| "SSH manager unavailable".to_string())?, + manager + .get_file_service() + .await + .ok_or_else(|| "Remote file service unavailable".to_string())?, + get_global_config_service() + .await + .map_err(|error| format!("Config service unavailable: {error}"))?, + ) + .with_preferred_connection_id(preferred_connection_id)) +} + +fn remote_stdio_session_key(connection_id: &str, repo_root: &str) -> String { + format!( + "{connection_id}\0{}", + normalize_remote_workspace_path(repo_root) + ) +} + +fn remote_search_context_key(preferred_connection_id: Option<&str>, repo_root: &str) -> String { + format!( + "{}\0{}", + preferred_connection_id.unwrap_or(""), + normalize_remote_workspace_path(repo_root) + ) +} + +fn schedule_remote_stdio_session_release(key: String, activity_epoch: Arc) { + tokio::spawn(async move { + let expected_epoch = activity_epoch.load(Ordering::Relaxed); + sleep(REMOTE_STDIO_SESSION_IDLE_GRACE).await; + let entry = { + let sessions = REMOTE_STDIO_SESSIONS.read().await; + let Some(entry) = sessions.get(&key) else { + return; + }; + if entry.session.active_operations.load(Ordering::Relaxed) > 0 { + schedule_remote_stdio_session_release(key.clone(), entry.activity_epoch.clone()); + return; + } + if entry.activity_epoch.load(Ordering::Relaxed) != expected_epoch { + schedule_remote_stdio_session_release(key.clone(), entry.activity_epoch.clone()); + return; + } + entry.clone() + }; + + match entry.session.status_without_activity_lease().await { + Ok(status) if status.active_task_id.is_some() => { + schedule_remote_stdio_session_release(key.clone(), entry.activity_epoch.clone()); + return; + } + Ok(_) => {} + Err(error) => { + log::warn!( + "Failed to check idle remote workspace search status before release: key={}, error={}", + key.replace('\0', ":"), + error + ); + } + } + + let entry = { + let mut sessions = REMOTE_STDIO_SESSIONS.write().await; + let Some(current_entry) = sessions.get(&key) else { + return; + }; + if !Arc::ptr_eq(¤t_entry.session, &entry.session) { + return; + } + if current_entry + .session + .active_operations + .load(Ordering::Relaxed) + > 0 + { + schedule_remote_stdio_session_release( + key.clone(), + current_entry.activity_epoch.clone(), + ); + return; + } + if current_entry.activity_epoch.load(Ordering::Relaxed) != expected_epoch { + schedule_remote_stdio_session_release( + key.clone(), + current_entry.activity_epoch.clone(), + ); + return; + } + sessions.remove(&key) + }; + + if let Some(entry) = entry { + log::info!( + "Releasing idle remote workspace search stdio session: key={}", + key.replace('\0', ":") + ); + entry.session.close().await; + entry.session.client.shutdown().await; + REMOTE_STDIO_OPEN_GUARDS.lock().await.remove(&key); + } + }); +} + +fn build_remote_scope( + repo_root: &str, + search_path: Option<&Path>, + globs: Vec, + file_types: Vec, + exclude_file_types: Vec, +) -> Result { + let repo_root = normalize_remote_workspace_path(repo_root); + let roots = match search_path { + Some(path) => { + let normalized = normalize_remote_scope_path(&repo_root, path)?; + if normalized == repo_root { + Vec::new() + } else { + vec![PathBuf::from(normalized)] + } + } + None => Vec::new(), + }; + + Ok(PathScope { + roots, + globs, + iglobs: Vec::new(), + type_add: Vec::new(), + type_clear: Vec::new(), + types: file_types, + type_not: exclude_file_types, + }) +} + +fn normalize_remote_scope_path(repo_root: &str, search_path: &Path) -> Result { + let raw_path = search_path.to_string_lossy(); + let normalized = if raw_path.starts_with('/') { + normalize_remote_workspace_path(&raw_path) + } else { + join_remote_path(repo_root, &raw_path) + }; + let repo_root_with_slash = format!("{}/", repo_root.trim_end_matches('/')); + if normalized != repo_root && !normalized.starts_with(&repo_root_with_slash) { + return Err(format!( + "Remote search path is outside workspace root: {normalized}" + )); + } + Ok(normalized) +} + +fn remote_flashgrep_install_dir(repo_root: &str) -> String { + join_remote_path( + &normalize_remote_workspace_path(repo_root), + REMOTE_FLASHGREP_INSTALL_DIR, + ) +} + +fn looks_like_linux_workspace_root(path: &str) -> bool { + path.starts_with('/') && !path.contains(':') +} + +fn parse_remote_architecture_output(stdout: &str, stderr: &str) -> Option { + for stream in [stdout, stderr] { + for line in stream.lines() { + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + + let normalized = trimmed.to_ascii_lowercase(); + if normalized.contains("x86_64") || normalized.contains("amd64") { + return Some("x86_64".to_string()); + } + if normalized.contains("aarch64") + || normalized.contains("arm64") + || normalized.contains("armv8") + { + return Some("aarch64".to_string()); + } + } + } + + None +} + +fn parse_remote_os_output(stdout: &str, stderr: &str) -> Option { + for stream in [stdout, stderr] { + for line in stream.lines() { + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + + let normalized = trimmed.to_ascii_lowercase(); + if normalized.contains("linux") { + return Some("Linux".to_string()); + } + if normalized.contains("darwin") || normalized.contains("macos") { + return Some("Darwin".to_string()); + } + if normalized.contains("windows") + || normalized.contains("mingw") + || normalized.contains("msys") + || normalized.contains("cygwin") + { + return Some("Windows".to_string()); + } + } + } + + None +} + +fn resolve_local_flashgrep_bundle(binary_name: &str) -> Option { + let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let workspace_root = manifest_dir.join("../../.."); + let mut candidates = vec![workspace_root.join("resources/flashgrep").join(binary_name)]; + + if let Ok(current_exe) = std::env::current_exe() { + if let Some(parent) = current_exe.parent() { + candidates.push(parent.join("resources/flashgrep").join(binary_name)); + candidates.push(parent.join("flashgrep").join(binary_name)); + candidates.push(parent.join("../Resources/flashgrep").join(binary_name)); + candidates.push(parent.join("../share/bitfun/flashgrep").join(binary_name)); + candidates.push( + parent + .join("../share/com.bitfun.desktop/flashgrep") + .join(binary_name), + ); + } + } + + candidates + .into_iter() + .find(|candidate| candidate.exists()) + .map(|candidate| candidate.canonicalize().unwrap_or(candidate)) +} + +async fn local_flashgrep_bundle_for_arch( + remote_arch: &str, +) -> Result { + let bundled_binary_names = match remote_arch { + "x86_64" | "amd64" => LINUX_X86_64_FLASHGREP_BUNDLES, + "aarch64" | "arm64" => LINUX_AARCH64_FLASHGREP_BUNDLES, + arch => { + return Err(format!( + "Remote workspace search does not support Linux architecture: {arch}" + )); + } + }; + + let (binary_name, path) = bundled_binary_names + .iter() + .find_map(|binary_name| { + resolve_local_flashgrep_bundle(binary_name) + .map(|path| ((*binary_name).to_string(), path)) + }) + .ok_or_else(|| { + format!( + "Bundled Linux flashgrep binary is missing. Expected one of: {}", + bundled_binary_names + .iter() + .map(|name| format!("resources/flashgrep/{name}")) + .collect::>() + .join(", ") + ) + })?; + let bytes = tokio::fs::read(&path).await.map_err(|error| { + format!( + "Failed to read bundled flashgrep binary {}: {error}", + path.display() + ) + })?; + let sha256 = hex::encode(Sha256::digest(&bytes)); + + Ok(LocalFlashgrepBundle { + binary_name, + path, + bytes, + sha256, + }) +} + +fn convert_stdio_search_results( + search_results: &SearchResults, + output_mode: ContentSearchOutputMode, +) -> Vec { + match output_mode { + ContentSearchOutputMode::Content => { + convert_stdio_hits_to_file_search_results(search_results) + } + ContentSearchOutputMode::Count => { + convert_stdio_file_counts_to_search_results(search_results) + } + ContentSearchOutputMode::FilesWithMatches => { + convert_stdio_matched_paths_to_file_only_results(search_results) + } + } +} + +fn remote_stdio_search_mode(output_mode: ContentSearchOutputMode) -> SearchModeConfig { + match output_mode { + ContentSearchOutputMode::Content => SearchModeConfig::LineMatches, + ContentSearchOutputMode::Count => SearchModeConfig::CountOnly, + ContentSearchOutputMode::FilesWithMatches => SearchModeConfig::FilesWithMatches, + } +} + +fn convert_stdio_file_counts_to_search_results( + search_results: &SearchResults, +) -> Vec { + search_results + .file_counts + .iter() + .map(|count| FileSearchResult { + path: count.path.clone(), + name: Path::new(&count.path) + .file_name() + .and_then(|file_name| file_name.to_str()) + .unwrap_or(&count.path) + .to_string(), + is_directory: false, + match_type: SearchMatchType::Content, + line_number: None, + matched_content: Some(count.matched_lines.to_string()), + preview_before: None, + preview_inside: None, + preview_after: None, + }) + .collect() +} + +fn convert_stdio_hits_to_file_search_results( + search_results: &SearchResults, +) -> Vec { + let mut file_results = Vec::new(); + for hit in &search_results.hits { + let name = Path::new(&hit.path) + .file_name() + .and_then(|file_name| file_name.to_str()) + .unwrap_or(&hit.path) + .to_string(); + + let mut lines = BTreeMap::new(); + for file_match in &hit.matches { + lines + .entry(file_match.location.line) + .or_insert_with(|| file_match.clone()); + } + + for (_, file_match) in lines { + let (preview_before, preview_inside, preview_after) = + split_preview(&file_match.snippet, &file_match.matched_text); + file_results.push(FileSearchResult { + path: hit.path.clone(), + name: name.clone(), + is_directory: false, + match_type: SearchMatchType::Content, + line_number: Some(file_match.location.line), + matched_content: Some(file_match.snippet), + preview_before, + preview_inside, + preview_after, + }); + } + } + file_results +} + +fn convert_stdio_matched_paths_to_file_only_results( + search_results: &SearchResults, +) -> Vec { + search_results + .matched_paths + .iter() + .map(|path| FileSearchResult { + path: path.clone(), + name: Path::new(path) + .file_name() + .and_then(|file_name| file_name.to_str()) + .unwrap_or(path) + .to_string(), + is_directory: false, + match_type: SearchMatchType::Content, + line_number: None, + matched_content: None, + preview_before: None, + preview_inside: None, + preview_after: None, + }) + .collect() +} + +fn split_preview( + snippet: &str, + matched_text: &str, +) -> (Option, Option, Option) { + if matched_text.is_empty() { + return (None, Some(snippet.to_string()), None); + } + + if let Some(offset) = snippet.find(matched_text) { + let before = snippet[..offset].to_string(); + let inside = matched_text.to_string(); + let after = snippet[offset + matched_text.len()..].to_string(); + return ( + (!before.is_empty()).then_some(before), + Some(inside), + (!after.is_empty()).then_some(after), + ); + } + + (None, Some(snippet.to_string()), None) +} + +fn join_remote_path(base: &str, child: &str) -> String { + let base = normalize_remote_workspace_path(base); + let child = child.trim_start_matches('/'); + if base == "/" { + format!("/{child}") + } else { + format!("{base}/{child}") + } +} + +fn shell_escape(value: &str) -> String { + if value + .chars() + .all(|c| c.is_ascii_alphanumeric() || matches!(c, '/' | '.' | '-' | '_' | ':' | '=')) + { + value.to_string() + } else { + format!("'{}'", value.replace('\'', "'\\''")) + } +} + +#[cfg(test)] +mod tests { + use super::{ + looks_like_linux_workspace_root, parse_remote_architecture_output, parse_remote_os_output, + remote_flashgrep_install_dir, + }; + use crate::service::search::flashgrep::drain_content_length_messages; + + #[test] + fn parses_plain_uname_architecture_output() { + assert_eq!( + parse_remote_architecture_output("x86_64\n", ""), + Some("x86_64".to_string()) + ); + assert_eq!( + parse_remote_architecture_output("aarch64\n", ""), + Some("aarch64".to_string()) + ); + } + + #[test] + fn parses_architecture_from_banner_prefixed_output() { + let stdout = "Welcome to Ubuntu 24.04 LTS\nLast login: today\nArchitecture: amd64\n"; + assert_eq!( + parse_remote_architecture_output(stdout, ""), + Some("x86_64".to_string()) + ); + } + + #[test] + fn parses_architecture_from_stderr_when_needed() { + assert_eq!( + parse_remote_architecture_output("", "machine: arm64\n"), + Some("aarch64".to_string()) + ); + } + + #[test] + fn installs_remote_flashgrep_under_workspace_root() { + assert_eq!( + remote_flashgrep_install_dir("/home/wgq/workspace/bot_detection"), + "/home/wgq/workspace/bot_detection/.bitfun/bin" + ); + } + + #[test] + fn parses_remote_os_from_uname_output() { + assert_eq!( + parse_remote_os_output("Linux\n", ""), + Some("Linux".to_string()) + ); + assert_eq!( + parse_remote_os_output("Darwin Kernel Version\n", ""), + Some("Darwin".to_string()) + ); + } + + #[test] + fn parses_remote_os_from_banner_prefixed_output() { + assert_eq!( + parse_remote_os_output("Welcome\nOperating system: linux\n", ""), + Some("Linux".to_string()) + ); + } + + #[test] + fn infers_linux_from_posix_workspace_root() { + assert!(looks_like_linux_workspace_root( + "/home/wgq/workspace/bot_detection" + )); + assert!(!looks_like_linux_workspace_root( + "C:/Users/wgq/workspace/bot_detection" + )); + } + + #[test] + fn drains_remote_stdio_content_length_messages() { + let body = r#"{"jsonrpc":"2.0","id":7,"result":{"kind":"pong","now_unix_secs":1}}"#; + let mut buffer = format!("Content-Length: {}\r\n\r\n{}", body.len(), body).into_bytes(); + let messages = drain_content_length_messages(&mut buffer) + .expect("expected content-length message to decode"); + + assert_eq!(messages.len(), 1); + assert!(buffer.is_empty()); + } + + #[test] + fn drains_remote_stdio_initialize_response_with_legacy_search_modes() { + let body = r#"{"jsonrpc":"2.0","id":1,"result":{"kind":"initialize_result","protocol_version":1,"server_info":{"name":"flashgrep","version":"0.1.0"},"capabilities":{"workspace_open":true,"workspace_ensure":true,"workspace_list":false,"workspace_refresh":true,"base_snapshot_build":true,"base_snapshot_rebuild":true,"task_status":true,"task_cancel":true,"search_query":true,"glob_query":true,"progress_notifications":true,"status_notifications":true},"search":{"search_modes":["files_with_matches","line_matches","count_only","count_matches"]}}}"#; + let mut buffer = format!("Content-Length: {}\r\n\r\n{}", body.len(), body).into_bytes(); + let messages = drain_content_length_messages(&mut buffer) + .expect("expected initialize response to decode"); + + assert_eq!(messages.len(), 1); + let debug = format!("{:?}", messages[0]); + assert!(debug.contains("InitializeResult")); + } +} diff --git a/src/crates/core/src/service/search/service.rs b/src/crates/core/src/service/search/service.rs index c5d620892..ae158bcd0 100644 --- a/src/crates/core/src/service/search/service.rs +++ b/src/crates/core/src/service/search/service.rs @@ -2,8 +2,8 @@ use crate::infrastructure::{FileSearchOutcome, FileSearchResult, SearchMatchType use crate::service::bootstrap::ensure_workspace_gitignore_ignores_bitfun; use crate::service::config::{get_global_config_service, types::WorkspaceConfig}; use crate::service::search::flashgrep::{ - ConsistencyMode, GlobRequest, ManagedClient, OpenRepoParams, PathScope, QuerySpec, - RefreshPolicyConfig, RepoConfig, RepoSession, SearchRequest, SearchResults, + ConsistencyMode, FlashgrepRepoSession, GlobRequest, ManagedClient, OpenRepoParams, PathScope, + QuerySpec, RefreshPolicyConfig, RepoConfig, RepoSession, SearchRequest, SearchResults, }; use crate::util::errors::{BitFunError, BitFunResult}; use std::collections::{BTreeMap, HashMap, HashSet}; @@ -83,8 +83,7 @@ impl WorkspaceSearchService { pub async fn build_index(&self, repo_root: impl AsRef) -> BitFunResult { let session = self.get_or_open_session(repo_root.as_ref()).await?; - let task = session - .index_build() + let task = FlashgrepRepoSession::build_index(session.as_ref()) .await .map_err(map_flashgrep_error("Failed to start index build"))?; let repo_status = session @@ -102,8 +101,7 @@ impl WorkspaceSearchService { repo_root: impl AsRef, ) -> BitFunResult { let session = self.get_or_open_session(repo_root.as_ref()).await?; - let task = session - .index_rebuild() + let task = FlashgrepRepoSession::rebuild_index(session.as_ref()) .await .map_err(map_flashgrep_error("Failed to start index rebuild"))?; let repo_status = session @@ -155,15 +153,15 @@ impl WorkspaceSearchService { let session = self.get_or_open_session(&repo_root).await?; let session_ready_at = Instant::now(); - let search = session - .search( - SearchRequest::new(query) - .with_scope(scope) - .with_consistency(ConsistencyMode::WorkspaceEventual) - .with_scan_fallback(true), - ) - .await - .map_err(map_flashgrep_error("Content search failed"))?; + let search = FlashgrepRepoSession::search( + session.as_ref(), + SearchRequest::new(query) + .with_scope(scope) + .with_consistency(ConsistencyMode::WorkspaceEventual) + .with_scan_fallback(true), + ) + .await + .map_err(map_flashgrep_error("Content search failed"))?; let search_completed_at = Instant::now(); let mut results = convert_search_results(&search.results, request.output_mode); @@ -240,10 +238,10 @@ impl WorkspaceSearchService { vec![], )?; let session = self.get_or_open_session(&repo_root).await?; - let mut outcome = session - .glob(GlobRequest::new().with_scope(scope)) - .await - .map_err(map_flashgrep_error("Glob search failed"))?; + let mut outcome = + FlashgrepRepoSession::glob(session.as_ref(), GlobRequest::new().with_scope(scope)) + .await + .map_err(map_flashgrep_error("Glob search failed"))?; outcome.paths.sort(); if request.limit > 0 { outcome.paths.truncate(request.limit); @@ -399,10 +397,13 @@ impl WorkspaceSearchService { .clone()) } - async fn index_status_for_session( + async fn index_status_for_session( &self, - session: Arc, - ) -> BitFunResult { + session: Arc, + ) -> BitFunResult + where + S: FlashgrepRepoSession + ?Sized, + { let repo_status = session .status() .await @@ -451,7 +452,7 @@ impl WorkspaceSearchService { "Releasing idle workspace search repository session: path={}", repo_root.display() ); - if let Err(error) = entry.session.close().await { + if let Err(error) = FlashgrepRepoSession::close(entry.session.as_ref()).await { log::warn!( "Failed to release idle workspace search repository session: path={}, error={}", repo_root.display(), @@ -495,9 +496,15 @@ pub fn workspace_search_daemon_binary_names() -> &'static [&'static str] { } else if cfg!(all(target_os = "macos", target_arch = "aarch64")) { &["flashgrep-aarch64-apple-darwin"] } else if cfg!(all(target_os = "linux", target_arch = "x86_64")) { - &["flashgrep-x86_64-unknown-linux-gnu"] + &[ + "flashgrep-x86_64-unknown-linux-musl", + "flashgrep-x86_64-unknown-linux-gnu", + ] } else if cfg!(all(target_os = "linux", target_arch = "aarch64")) { - &["flashgrep-aarch64-unknown-linux-gnu"] + &[ + "flashgrep-aarch64-unknown-linux-musl", + "flashgrep-aarch64-unknown-linux-gnu", + ] } else if cfg!(windows) { &["flashgrep.exe"] } else { @@ -763,7 +770,7 @@ fn convert_search_results( ContentSearchOutputMode::Content => convert_hits_to_file_search_results(search_results), ContentSearchOutputMode::Count => convert_file_counts_to_search_results(search_results), ContentSearchOutputMode::FilesWithMatches => { - convert_hits_to_file_only_results(search_results) + convert_matched_paths_to_file_only_results(search_results) } } } @@ -825,16 +832,18 @@ fn convert_hits_to_file_search_results(search_results: &SearchResults) -> Vec Vec { +fn convert_matched_paths_to_file_only_results( + search_results: &SearchResults, +) -> Vec { search_results - .hits + .matched_paths .iter() - .map(|hit| FileSearchResult { - path: hit.path.clone(), - name: Path::new(&hit.path) + .map(|path| FileSearchResult { + path: path.clone(), + name: Path::new(path) .file_name() .and_then(|file_name| file_name.to_str()) - .unwrap_or(&hit.path) + .unwrap_or(path) .to_string(), is_directory: false, match_type: SearchMatchType::Content, diff --git a/src/crates/core/src/service/search/types.rs b/src/crates/core/src/service/search/types.rs index 77e734a66..7d6a2a870 100644 --- a/src/crates/core/src/service/search/types.rs +++ b/src/crates/core/src/service/search/types.rs @@ -23,7 +23,7 @@ impl ContentSearchOutputMode { match self { Self::Content => SearchModeConfig::MaterializeMatches, Self::Count => SearchModeConfig::CountOnly, - Self::FilesWithMatches => SearchModeConfig::FirstHitOnly, + Self::FilesWithMatches => SearchModeConfig::FilesWithMatches, } } } diff --git a/src/web-ui/src/app/components/NavPanel/sections/workspaces/WorkspaceItem.tsx b/src/web-ui/src/app/components/NavPanel/sections/workspaces/WorkspaceItem.tsx index 971063806..bb09f6667 100644 --- a/src/web-ui/src/app/components/NavPanel/sections/workspaces/WorkspaceItem.tsx +++ b/src/web-ui/src/app/components/NavPanel/sections/workspaces/WorkspaceItem.tsx @@ -106,8 +106,10 @@ const WorkspaceItem: React.FC = ({ const canShowSearchIndex = isActive && workspaceSearchEnabled - && workspace.workspaceKind === WorkspaceKind.Normal - && !isRemoteWorkspace(workspace); + && ( + workspace.workspaceKind === WorkspaceKind.Normal + || workspace.workspaceKind === WorkspaceKind.Remote + ); const workspaceSearchIndex = useWorkspaceSearchIndex({ workspacePath: canShowSearchIndex ? workspace.rootPath : undefined, enabled: canShowSearchIndex,