diff --git a/desktop/src-tauri/src/main.rs b/desktop/src-tauri/src/main.rs index 4de4aa9..c9b6261 100644 --- a/desktop/src-tauri/src/main.rs +++ b/desktop/src-tauri/src/main.rs @@ -12,7 +12,7 @@ use std::io::{Read, Write}; use std::net::{IpAddr, TcpStream, ToSocketAddrs}; use std::path::{Path, PathBuf}; use std::process::{Child, ChildStdin, Command, Stdio}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use std::sync::Mutex; use std::thread::JoinHandle; @@ -34,6 +34,8 @@ static SIDECAR_EXIT_WATCHER_STOP: AtomicBool = AtomicBool::new(false); static RESUME_RECOVERY_WATCHER: Lazy>>> = Lazy::new(|| Mutex::new(None)); static RESUME_RECOVERY_WATCHER_STOP: AtomicBool = AtomicBool::new(false); +static RESUME_RECOVERY_IN_FLIGHT: AtomicBool = AtomicBool::new(false); +static RESUME_RECOVERY_LAST_STARTED_MS: AtomicU64 = AtomicU64::new(0); static DESKTOP_RUNTIME: Lazy> = Lazy::new(|| Mutex::new(DesktopRuntime::default())); static DESKTOP_RECENT_LOGS: Lazy>> = @@ -65,6 +67,7 @@ const SIDECAR_READY_POLL_INTERVAL_MS: u64 = 100; const SIDECAR_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1); const RESUME_RECOVERY_WATCH_INTERVAL: Duration = Duration::from_secs(5); const RESUME_RECOVERY_GAP_THRESHOLD: Duration = Duration::from_secs(15); +const RESUME_RECOVERY_COOLDOWN: Duration = Duration::from_secs(10); const DESKTOP_RECENT_LOG_CAPACITY: usize = 200; const DESKTOP_RECENT_LOG_DEFAULT_LIMIT: usize = 50; const DESKTOP_RECENT_LOG_MAX_LIMIT: usize = 50; @@ -1250,15 +1253,51 @@ fn refresh_tray_state_from_backend(app: &AppHandle) { #[cfg(target_os = "macos")] fn recover_backend_after_reopen(app: &AppHandle) { - recover_backend_after_resume(app, "reopen", None); + request_resume_recovery(app.clone(), "reopen".to_string(), None); } #[cfg(target_os = "macos")] -fn recover_backend_after_resume( - app: &AppHandle, - trigger: &str, - gap: Option, -) { +struct ResumeRecoveryLease<'a> { + running: &'a AtomicBool, +} + +#[cfg(target_os = "macos")] +impl Drop for ResumeRecoveryLease<'_> { + fn drop(&mut self) { + self.running.store(false, Ordering::SeqCst); + } +} + +#[cfg(target_os = "macos")] +fn try_acquire_resume_recovery_slot<'a>( + running: &'a AtomicBool, + last_started_ms: &AtomicU64, + now_ms: u64, + cooldown_ms: u64, +) -> Option> { + let last_started = last_started_ms.load(Ordering::SeqCst); + if running.load(Ordering::SeqCst) + || (last_started != 0 && now_ms.saturating_sub(last_started) < cooldown_ms) + { + return None; + } + running + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .ok()?; + last_started_ms.store(now_ms, Ordering::SeqCst); + Some(ResumeRecoveryLease { running }) +} + +#[cfg(target_os = "macos")] +fn current_unix_timestamp_ms() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|duration| duration.as_millis() as u64) + .unwrap_or_default() +} + +#[cfg(target_os = "macos")] +fn probe_backend_after_resume(trigger: &str, gap: Option) { let gap_suffix = gap .map(|value| format!(" gap_ms={}", value.as_millis())) .unwrap_or_default(); @@ -1279,8 +1318,45 @@ fn recover_backend_after_resume( format!("backend probe failed trigger={trigger}{gap_suffix}: {err}"), ), } - refresh_tray_state_from_backend(app); - emit_backend_state_changed(app); +} + +#[cfg(target_os = "macos")] +fn request_resume_recovery( + app: AppHandle, + trigger: String, + gap: Option, +) { + let gap_suffix = gap + .map(|value| format!(" gap_ms={}", value.as_millis())) + .unwrap_or_default(); + let cooldown_ms = RESUME_RECOVERY_COOLDOWN.as_millis() as u64; + let now_ms = current_unix_timestamp_ms(); + let Some(_lease) = try_acquire_resume_recovery_slot( + &RESUME_RECOVERY_IN_FLIGHT, + &RESUME_RECOVERY_LAST_STARTED_MS, + now_ms, + cooldown_ms, + ) else { + log_desktop_event( + "info", + "recovery", + format!("skip coalesced recovery trigger={trigger}{gap_suffix}"), + ); + return; + }; + + let _ = std::thread::Builder::new() + .name("resume-recovery-dispatch".to_string()) + .spawn(move || { + let _lease = _lease; + probe_backend_after_resume(&trigger, gap); + + let app_handle = app.clone(); + let _ = app.run_on_main_thread(move || { + refresh_tray_state_from_backend(&app_handle); + emit_backend_state_changed(&app_handle); + }); + }); } fn apply_tray_state(app: &AppHandle, tray_state: &TrayStateSnapshot) { @@ -1839,7 +1915,7 @@ fn start_resume_recovery_watcher(app: AppHandle) -> Res let elapsed = last_tick.elapsed(); last_tick = Instant::now(); if should_trigger_resume_recovery(elapsed, RESUME_RECOVERY_GAP_THRESHOLD) { - recover_backend_after_resume(&app, "resume_gap", Some(elapsed)); + request_resume_recovery(app.clone(), "resume_gap".to_string(), Some(elapsed)); } } }) @@ -1998,22 +2074,22 @@ mod tests { use super::{ append_recent_desktop_log, build_launch_agent_plist, clamp_recent_log_limit, current_backend_addr, current_settings_cache, decode_chunked_body, format_timeout_error, - format_tray_title, load_settings_cache, map_backend_io_error, parse_account_menu_id, parse_accounts_response, - parse_proxy_status_response, proxy_menu_enabled_states, resolve_main_window_size, - request_backend, restart_sidecar_and_wait_ready, - sanitize_main_window_size, should_attempt_sidecar_recovery, + format_tray_title, load_settings_cache, map_backend_io_error, parse_account_menu_id, + parse_accounts_response, parse_proxy_status_response, persist_runtime_settings, + proxy_menu_enabled_states, request_backend, resolve_main_window_size, + restart_sidecar_and_wait_ready, sanitize_main_window_size, should_attempt_sidecar_recovery, should_refresh_tray_after_action, should_restart_sidecar_after_exit, - should_retry_sidecar_request, should_trigger_resume_recovery, sidecar_candidate_paths, - sidecar_creation_flags, sidecar_request_with_recovery, sidecar_request_with_recovery_hooks, - sidecar_resource_name, shutdown_sidecar_with_reason, spawn_sidecar, + should_retry_sidecar_request, should_trigger_resume_recovery, shutdown_sidecar_with_reason, + sidecar_candidate_paths, sidecar_creation_flags, sidecar_request_with_recovery, + sidecar_request_with_recovery_hooks, sidecar_resource_name, spawn_sidecar, tray_icon_bytes_for_platform, tray_icon_is_template_for_platform, - update_download_progress, wait_for_backend_ready, wait_for_backend_ready_with_probe, - window_close_action, AppSettingsPayload, DesktopLogEntry, DesktopRuntime, - DesktopSettingsCache, HttpResponse, UpdateInfoPayload, UpdateManagerState, - UpdateProgressPayload, UpdateStatePayload, UpdateStatus, WindowCloseAction, - WindowSizeCache, DESKTOP_RUNTIME, MAIN_WINDOW_MIN_HEIGHT, MAIN_WINDOW_MIN_WIDTH, - SIDECAR_CHILD, SIDECAR_MACOS_NAME, SIDECAR_WINDOWS_NAME, TRAY_ICON_COLOR_BYTES, - TRAY_ICON_TEMPLATE_BYTES, UPDATE_MANAGER, persist_runtime_settings, + try_acquire_resume_recovery_slot, update_download_progress, wait_for_backend_ready, + wait_for_backend_ready_with_probe, window_close_action, AppSettingsPayload, + DesktopLogEntry, DesktopRuntime, DesktopSettingsCache, HttpResponse, UpdateInfoPayload, + UpdateManagerState, UpdateProgressPayload, UpdateStatePayload, UpdateStatus, + WindowCloseAction, WindowSizeCache, DESKTOP_RUNTIME, MAIN_WINDOW_MIN_HEIGHT, + MAIN_WINDOW_MIN_WIDTH, SIDECAR_CHILD, SIDECAR_MACOS_NAME, SIDECAR_WINDOWS_NAME, + TRAY_ICON_COLOR_BYTES, TRAY_ICON_TEMPLATE_BYTES, UPDATE_MANAGER, }; use std::cell::RefCell; use std::collections::VecDeque; @@ -2021,7 +2097,7 @@ mod tests { use std::net::TcpListener; use std::path::{Path, PathBuf}; use std::process::Command; - use std::sync::atomic::Ordering; + use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; @@ -2380,9 +2456,12 @@ mod tests { spawn_sidecar().expect("spawn initial sidecar"); wait_for_backend_ready(¤t_backend_addr(), Duration::from_secs(5)) .expect("wait for initial backend"); - let initial_response = - request_backend("GET", "/ai-router/api/settings/app", "").expect("request app settings"); - assert_eq!(initial_response.status, 200, "initial backend request should succeed"); + let initial_response = request_backend("GET", "/ai-router/api/settings/app", "") + .expect("request app settings"); + assert_eq!( + initial_response.status, 200, + "initial backend request should succeed" + ); let initial_pid = SIDECAR_CHILD .lock() .expect("sidecar child lock") @@ -2395,7 +2474,10 @@ mod tests { shared_cache.lan_share_enabled = true; let restart_required = persist_runtime_settings(shared_cache).expect("persist shared runtime settings"); - assert!(restart_required, "lan toggle should require sidecar restart"); + assert!( + restart_required, + "lan toggle should require sidecar restart" + ); restart_sidecar_and_wait_ready().expect("restart sidecar after enabling lan"); let shared_pid = SIDECAR_CHILD .lock() @@ -2403,18 +2485,30 @@ mod tests { .as_ref() .map(|child| child.id()) .expect("shared child pid"); - assert_ne!(shared_pid, initial_pid, "sidecar pid should change after restart"); + assert_ne!( + shared_pid, initial_pid, + "sidecar pid should change after restart" + ); assert_eq!(current_backend_addr(), format!("127.0.0.1:{port}")); - assert_eq!(current_settings_cache().listen_addr(), format!("0.0.0.0:{port}")); - let shared_response = - request_backend("GET", "/ai-router/api/settings/app", "").expect("request shared app settings"); - assert_eq!(shared_response.status, 200, "shared backend request should succeed"); + assert_eq!( + current_settings_cache().listen_addr(), + format!("0.0.0.0:{port}") + ); + let shared_response = request_backend("GET", "/ai-router/api/settings/app", "") + .expect("request shared app settings"); + assert_eq!( + shared_response.status, 200, + "shared backend request should succeed" + ); let mut local_cache = current_settings_cache(); local_cache.lan_share_enabled = false; let restart_required = persist_runtime_settings(local_cache).expect("persist local runtime settings"); - assert!(restart_required, "lan toggle reset should require sidecar restart"); + assert!( + restart_required, + "lan toggle reset should require sidecar restart" + ); restart_sidecar_and_wait_ready().expect("restart sidecar after disabling lan"); let final_pid = SIDECAR_CHILD .lock() @@ -2422,12 +2516,21 @@ mod tests { .as_ref() .map(|child| child.id()) .expect("final child pid"); - assert_ne!(final_pid, shared_pid, "sidecar pid should change after second restart"); + assert_ne!( + final_pid, shared_pid, + "sidecar pid should change after second restart" + ); assert_eq!(current_backend_addr(), format!("127.0.0.1:{port}")); - assert_eq!(current_settings_cache().listen_addr(), format!("127.0.0.1:{port}")); - let final_response = - request_backend("GET", "/ai-router/api/settings/app", "").expect("request local app settings"); - assert_eq!(final_response.status, 200, "final backend request should succeed"); + assert_eq!( + current_settings_cache().listen_addr(), + format!("127.0.0.1:{port}") + ); + let final_response = request_backend("GET", "/ai-router/api/settings/app", "") + .expect("request local app settings"); + assert_eq!( + final_response.status, 200, + "final backend request should succeed" + ); shutdown_sidecar_with_reason("smoke-cleanup-end"); let _ = fs::remove_file(&sidecar_path); @@ -2670,6 +2773,98 @@ mod tests { )); } + #[test] + fn resume_recovery_slot_coalesces_overlapping_runs() { + let running = AtomicBool::new(false); + let last_started_ms = AtomicU64::new(0); + let active_runs = Arc::new(AtomicUsize::new(0)); + let observed_max = Arc::new(AtomicUsize::new(0)); + + std::thread::scope(|scope| { + for _ in 0..2 { + let active_runs = Arc::clone(&active_runs); + let observed_max = Arc::clone(&observed_max); + let running_ref = &running; + let last_started_ms_ref = &last_started_ms; + scope.spawn(move || { + let Some(_lease) = try_acquire_resume_recovery_slot( + running_ref, + last_started_ms_ref, + 1_000, + 10_000, + ) else { + return; + }; + let concurrent = active_runs.fetch_add(1, Ordering::SeqCst) + 1; + observed_max.fetch_max(concurrent, Ordering::SeqCst); + std::thread::sleep(Duration::from_millis(50)); + active_runs.fetch_sub(1, Ordering::SeqCst); + }); + } + }); + + assert_eq!(observed_max.load(Ordering::SeqCst), 1); + assert!(!running.load(Ordering::SeqCst)); + } + + #[test] + fn resume_recovery_slot_skips_recent_duplicate_triggers() { + let running = AtomicBool::new(false); + let last_started_ms = AtomicU64::new(0); + + let lease = try_acquire_resume_recovery_slot(&running, &last_started_ms, 1_000, 10_000) + .expect("first recovery should acquire the slot"); + drop(lease); + + assert!( + try_acquire_resume_recovery_slot(&running, &last_started_ms, 5_000, 10_000).is_none(), + "duplicate trigger inside cooldown should be skipped" + ); + assert!( + try_acquire_resume_recovery_slot(&running, &last_started_ms, 11_500, 10_000).is_some(), + "trigger after cooldown should be allowed" + ); + } + + #[test] + #[ignore = "local smoke test that stresses duplicate resume recovery triggers"] + fn resume_recovery_slot_stays_single_flight_under_burst_triggers() { + let running = AtomicBool::new(false); + let last_started_ms = AtomicU64::new(0); + let active_runs = Arc::new(AtomicUsize::new(0)); + let observed_max = Arc::new(AtomicUsize::new(0)); + let completed_runs = Arc::new(AtomicUsize::new(0)); + + std::thread::scope(|scope| { + for _ in 0..32 { + let active_runs = Arc::clone(&active_runs); + let observed_max = Arc::clone(&observed_max); + let completed_runs = Arc::clone(&completed_runs); + let running_ref = &running; + let last_started_ms_ref = &last_started_ms; + scope.spawn(move || { + let Some(_lease) = try_acquire_resume_recovery_slot( + running_ref, + last_started_ms_ref, + 50_000, + 10_000, + ) else { + return; + }; + let concurrent = active_runs.fetch_add(1, Ordering::SeqCst) + 1; + observed_max.fetch_max(concurrent, Ordering::SeqCst); + std::thread::sleep(Duration::from_millis(25)); + active_runs.fetch_sub(1, Ordering::SeqCst); + completed_runs.fetch_add(1, Ordering::SeqCst); + }); + } + }); + + assert_eq!(completed_runs.load(Ordering::SeqCst), 1); + assert_eq!(observed_max.load(Ordering::SeqCst), 1); + assert!(!running.load(Ordering::SeqCst)); + } + #[test] fn recent_desktop_logs_drop_oldest_entries_when_capacity_is_exceeded() { let mut entries = VecDeque::new();