Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 235 additions & 40 deletions desktop/src-tauri/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::io::{Read, Write};
use std::net::{IpAddr, TcpStream, ToSocketAddrs};
use std::path::{Path, PathBuf};
use std::process::{Child, ChildStdin, Command, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use std::thread::JoinHandle;
Expand All @@ -34,6 +34,8 @@ static SIDECAR_EXIT_WATCHER_STOP: AtomicBool = AtomicBool::new(false);
static RESUME_RECOVERY_WATCHER: Lazy<Mutex<Option<JoinHandle<()>>>> =
Lazy::new(|| Mutex::new(None));
static RESUME_RECOVERY_WATCHER_STOP: AtomicBool = AtomicBool::new(false);
static RESUME_RECOVERY_IN_FLIGHT: AtomicBool = AtomicBool::new(false);
static RESUME_RECOVERY_LAST_STARTED_MS: AtomicU64 = AtomicU64::new(0);
static DESKTOP_RUNTIME: Lazy<Mutex<DesktopRuntime>> =
Lazy::new(|| Mutex::new(DesktopRuntime::default()));
static DESKTOP_RECENT_LOGS: Lazy<Mutex<VecDeque<DesktopLogEntry>>> =
Expand Down Expand Up @@ -65,6 +67,7 @@ const SIDECAR_READY_POLL_INTERVAL_MS: u64 = 100;
const SIDECAR_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
const RESUME_RECOVERY_WATCH_INTERVAL: Duration = Duration::from_secs(5);
const RESUME_RECOVERY_GAP_THRESHOLD: Duration = Duration::from_secs(15);
const RESUME_RECOVERY_COOLDOWN: Duration = Duration::from_secs(10);
const DESKTOP_RECENT_LOG_CAPACITY: usize = 200;
const DESKTOP_RECENT_LOG_DEFAULT_LIMIT: usize = 50;
const DESKTOP_RECENT_LOG_MAX_LIMIT: usize = 50;
Expand Down Expand Up @@ -1250,15 +1253,51 @@ fn refresh_tray_state_from_backend<R: Runtime>(app: &AppHandle<R>) {

#[cfg(target_os = "macos")]
fn recover_backend_after_reopen<R: Runtime>(app: &AppHandle<R>) {
recover_backend_after_resume(app, "reopen", None);
request_resume_recovery(app.clone(), "reopen".to_string(), None);
}

#[cfg(target_os = "macos")]
fn recover_backend_after_resume<R: Runtime>(
app: &AppHandle<R>,
trigger: &str,
gap: Option<Duration>,
) {
struct ResumeRecoveryLease<'a> {
running: &'a AtomicBool,
}

#[cfg(target_os = "macos")]
impl Drop for ResumeRecoveryLease<'_> {
fn drop(&mut self) {
self.running.store(false, Ordering::SeqCst);
}
}

#[cfg(target_os = "macos")]
fn try_acquire_resume_recovery_slot<'a>(
running: &'a AtomicBool,
last_started_ms: &AtomicU64,
now_ms: u64,
cooldown_ms: u64,
) -> Option<ResumeRecoveryLease<'a>> {
let last_started = last_started_ms.load(Ordering::SeqCst);
if running.load(Ordering::SeqCst)
|| (last_started != 0 && now_ms.saturating_sub(last_started) < cooldown_ms)
{
return None;
}
running
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.ok()?;
last_started_ms.store(now_ms, Ordering::SeqCst);
Some(ResumeRecoveryLease { running })
}

#[cfg(target_os = "macos")]
fn current_unix_timestamp_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|duration| duration.as_millis() as u64)
.unwrap_or_default()
}

#[cfg(target_os = "macos")]
fn probe_backend_after_resume(trigger: &str, gap: Option<Duration>) {
let gap_suffix = gap
.map(|value| format!(" gap_ms={}", value.as_millis()))
.unwrap_or_default();
Expand All @@ -1279,8 +1318,45 @@ fn recover_backend_after_resume<R: Runtime>(
format!("backend probe failed trigger={trigger}{gap_suffix}: {err}"),
),
}
refresh_tray_state_from_backend(app);
emit_backend_state_changed(app);
}

#[cfg(target_os = "macos")]
fn request_resume_recovery<R: Runtime + 'static>(
app: AppHandle<R>,
trigger: String,
gap: Option<Duration>,
) {
let gap_suffix = gap
.map(|value| format!(" gap_ms={}", value.as_millis()))
.unwrap_or_default();
let cooldown_ms = RESUME_RECOVERY_COOLDOWN.as_millis() as u64;
let now_ms = current_unix_timestamp_ms();
let Some(_lease) = try_acquire_resume_recovery_slot(
&RESUME_RECOVERY_IN_FLIGHT,
&RESUME_RECOVERY_LAST_STARTED_MS,
now_ms,
cooldown_ms,
) else {
log_desktop_event(
"info",
"recovery",
format!("skip coalesced recovery trigger={trigger}{gap_suffix}"),
);
return;
};

let _ = std::thread::Builder::new()
.name("resume-recovery-dispatch".to_string())
.spawn(move || {
let _lease = _lease;
probe_backend_after_resume(&trigger, gap);

let app_handle = app.clone();
let _ = app.run_on_main_thread(move || {
refresh_tray_state_from_backend(&app_handle);
emit_backend_state_changed(&app_handle);
});
});
}

fn apply_tray_state<R: Runtime>(app: &AppHandle<R>, tray_state: &TrayStateSnapshot) {
Expand Down Expand Up @@ -1839,7 +1915,7 @@ fn start_resume_recovery_watcher<R: Runtime + 'static>(app: AppHandle<R>) -> Res
let elapsed = last_tick.elapsed();
last_tick = Instant::now();
if should_trigger_resume_recovery(elapsed, RESUME_RECOVERY_GAP_THRESHOLD) {
recover_backend_after_resume(&app, "resume_gap", Some(elapsed));
request_resume_recovery(app.clone(), "resume_gap".to_string(), Some(elapsed));
}
}
})
Expand Down Expand Up @@ -1998,30 +2074,30 @@ mod tests {
use super::{
append_recent_desktop_log, build_launch_agent_plist, clamp_recent_log_limit,
current_backend_addr, current_settings_cache, decode_chunked_body, format_timeout_error,
format_tray_title, load_settings_cache, map_backend_io_error, parse_account_menu_id, parse_accounts_response,
parse_proxy_status_response, proxy_menu_enabled_states, resolve_main_window_size,
request_backend, restart_sidecar_and_wait_ready,
sanitize_main_window_size, should_attempt_sidecar_recovery,
format_tray_title, load_settings_cache, map_backend_io_error, parse_account_menu_id,
parse_accounts_response, parse_proxy_status_response, persist_runtime_settings,
proxy_menu_enabled_states, request_backend, resolve_main_window_size,
restart_sidecar_and_wait_ready, sanitize_main_window_size, should_attempt_sidecar_recovery,
should_refresh_tray_after_action, should_restart_sidecar_after_exit,
should_retry_sidecar_request, should_trigger_resume_recovery, sidecar_candidate_paths,
sidecar_creation_flags, sidecar_request_with_recovery, sidecar_request_with_recovery_hooks,
sidecar_resource_name, shutdown_sidecar_with_reason, spawn_sidecar,
should_retry_sidecar_request, should_trigger_resume_recovery, shutdown_sidecar_with_reason,
sidecar_candidate_paths, sidecar_creation_flags, sidecar_request_with_recovery,
sidecar_request_with_recovery_hooks, sidecar_resource_name, spawn_sidecar,
tray_icon_bytes_for_platform, tray_icon_is_template_for_platform,
update_download_progress, wait_for_backend_ready, wait_for_backend_ready_with_probe,
window_close_action, AppSettingsPayload, DesktopLogEntry, DesktopRuntime,
DesktopSettingsCache, HttpResponse, UpdateInfoPayload, UpdateManagerState,
UpdateProgressPayload, UpdateStatePayload, UpdateStatus, WindowCloseAction,
WindowSizeCache, DESKTOP_RUNTIME, MAIN_WINDOW_MIN_HEIGHT, MAIN_WINDOW_MIN_WIDTH,
SIDECAR_CHILD, SIDECAR_MACOS_NAME, SIDECAR_WINDOWS_NAME, TRAY_ICON_COLOR_BYTES,
TRAY_ICON_TEMPLATE_BYTES, UPDATE_MANAGER, persist_runtime_settings,
try_acquire_resume_recovery_slot, update_download_progress, wait_for_backend_ready,
wait_for_backend_ready_with_probe, window_close_action, AppSettingsPayload,
DesktopLogEntry, DesktopRuntime, DesktopSettingsCache, HttpResponse, UpdateInfoPayload,
UpdateManagerState, UpdateProgressPayload, UpdateStatePayload, UpdateStatus,
WindowCloseAction, WindowSizeCache, DESKTOP_RUNTIME, MAIN_WINDOW_MIN_HEIGHT,
MAIN_WINDOW_MIN_WIDTH, SIDECAR_CHILD, SIDECAR_MACOS_NAME, SIDECAR_WINDOWS_NAME,
TRAY_ICON_COLOR_BYTES, TRAY_ICON_TEMPLATE_BYTES, UPDATE_MANAGER,
};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::fs;
use std::net::TcpListener;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
Expand Down Expand Up @@ -2380,9 +2456,12 @@ mod tests {
spawn_sidecar().expect("spawn initial sidecar");
wait_for_backend_ready(&current_backend_addr(), Duration::from_secs(5))
.expect("wait for initial backend");
let initial_response =
request_backend("GET", "/ai-router/api/settings/app", "").expect("request app settings");
assert_eq!(initial_response.status, 200, "initial backend request should succeed");
let initial_response = request_backend("GET", "/ai-router/api/settings/app", "")
.expect("request app settings");
assert_eq!(
initial_response.status, 200,
"initial backend request should succeed"
);
let initial_pid = SIDECAR_CHILD
.lock()
.expect("sidecar child lock")
Expand All @@ -2395,39 +2474,63 @@ mod tests {
shared_cache.lan_share_enabled = true;
let restart_required =
persist_runtime_settings(shared_cache).expect("persist shared runtime settings");
assert!(restart_required, "lan toggle should require sidecar restart");
assert!(
restart_required,
"lan toggle should require sidecar restart"
);
restart_sidecar_and_wait_ready().expect("restart sidecar after enabling lan");
let shared_pid = SIDECAR_CHILD
.lock()
.expect("sidecar child lock")
.as_ref()
.map(|child| child.id())
.expect("shared child pid");
assert_ne!(shared_pid, initial_pid, "sidecar pid should change after restart");
assert_ne!(
shared_pid, initial_pid,
"sidecar pid should change after restart"
);
assert_eq!(current_backend_addr(), format!("127.0.0.1:{port}"));
assert_eq!(current_settings_cache().listen_addr(), format!("0.0.0.0:{port}"));
let shared_response =
request_backend("GET", "/ai-router/api/settings/app", "").expect("request shared app settings");
assert_eq!(shared_response.status, 200, "shared backend request should succeed");
assert_eq!(
current_settings_cache().listen_addr(),
format!("0.0.0.0:{port}")
);
let shared_response = request_backend("GET", "/ai-router/api/settings/app", "")
.expect("request shared app settings");
assert_eq!(
shared_response.status, 200,
"shared backend request should succeed"
);

let mut local_cache = current_settings_cache();
local_cache.lan_share_enabled = false;
let restart_required =
persist_runtime_settings(local_cache).expect("persist local runtime settings");
assert!(restart_required, "lan toggle reset should require sidecar restart");
assert!(
restart_required,
"lan toggle reset should require sidecar restart"
);
restart_sidecar_and_wait_ready().expect("restart sidecar after disabling lan");
let final_pid = SIDECAR_CHILD
.lock()
.expect("sidecar child lock")
.as_ref()
.map(|child| child.id())
.expect("final child pid");
assert_ne!(final_pid, shared_pid, "sidecar pid should change after second restart");
assert_ne!(
final_pid, shared_pid,
"sidecar pid should change after second restart"
);
assert_eq!(current_backend_addr(), format!("127.0.0.1:{port}"));
assert_eq!(current_settings_cache().listen_addr(), format!("127.0.0.1:{port}"));
let final_response =
request_backend("GET", "/ai-router/api/settings/app", "").expect("request local app settings");
assert_eq!(final_response.status, 200, "final backend request should succeed");
assert_eq!(
current_settings_cache().listen_addr(),
format!("127.0.0.1:{port}")
);
let final_response = request_backend("GET", "/ai-router/api/settings/app", "")
.expect("request local app settings");
assert_eq!(
final_response.status, 200,
"final backend request should succeed"
);

shutdown_sidecar_with_reason("smoke-cleanup-end");
let _ = fs::remove_file(&sidecar_path);
Expand Down Expand Up @@ -2670,6 +2773,98 @@ mod tests {
));
}

#[test]
fn resume_recovery_slot_coalesces_overlapping_runs() {
let running = AtomicBool::new(false);
let last_started_ms = AtomicU64::new(0);
let active_runs = Arc::new(AtomicUsize::new(0));
let observed_max = Arc::new(AtomicUsize::new(0));

std::thread::scope(|scope| {
for _ in 0..2 {
let active_runs = Arc::clone(&active_runs);
let observed_max = Arc::clone(&observed_max);
let running_ref = &running;
let last_started_ms_ref = &last_started_ms;
scope.spawn(move || {
let Some(_lease) = try_acquire_resume_recovery_slot(
running_ref,
last_started_ms_ref,
1_000,
10_000,
) else {
return;
};
let concurrent = active_runs.fetch_add(1, Ordering::SeqCst) + 1;
observed_max.fetch_max(concurrent, Ordering::SeqCst);
std::thread::sleep(Duration::from_millis(50));
active_runs.fetch_sub(1, Ordering::SeqCst);
});
}
});

assert_eq!(observed_max.load(Ordering::SeqCst), 1);
assert!(!running.load(Ordering::SeqCst));
}

#[test]
fn resume_recovery_slot_skips_recent_duplicate_triggers() {
let running = AtomicBool::new(false);
let last_started_ms = AtomicU64::new(0);

let lease = try_acquire_resume_recovery_slot(&running, &last_started_ms, 1_000, 10_000)
.expect("first recovery should acquire the slot");
drop(lease);

assert!(
try_acquire_resume_recovery_slot(&running, &last_started_ms, 5_000, 10_000).is_none(),
"duplicate trigger inside cooldown should be skipped"
);
assert!(
try_acquire_resume_recovery_slot(&running, &last_started_ms, 11_500, 10_000).is_some(),
"trigger after cooldown should be allowed"
);
}

#[test]
#[ignore = "local smoke test that stresses duplicate resume recovery triggers"]
fn resume_recovery_slot_stays_single_flight_under_burst_triggers() {
let running = AtomicBool::new(false);
let last_started_ms = AtomicU64::new(0);
let active_runs = Arc::new(AtomicUsize::new(0));
let observed_max = Arc::new(AtomicUsize::new(0));
let completed_runs = Arc::new(AtomicUsize::new(0));

std::thread::scope(|scope| {
for _ in 0..32 {
let active_runs = Arc::clone(&active_runs);
let observed_max = Arc::clone(&observed_max);
let completed_runs = Arc::clone(&completed_runs);
let running_ref = &running;
let last_started_ms_ref = &last_started_ms;
scope.spawn(move || {
let Some(_lease) = try_acquire_resume_recovery_slot(
running_ref,
last_started_ms_ref,
50_000,
10_000,
) else {
return;
};
let concurrent = active_runs.fetch_add(1, Ordering::SeqCst) + 1;
observed_max.fetch_max(concurrent, Ordering::SeqCst);
std::thread::sleep(Duration::from_millis(25));
active_runs.fetch_sub(1, Ordering::SeqCst);
completed_runs.fetch_add(1, Ordering::SeqCst);
});
}
});

assert_eq!(completed_runs.load(Ordering::SeqCst), 1);
assert_eq!(observed_max.load(Ordering::SeqCst), 1);
assert!(!running.load(Ordering::SeqCst));
}

#[test]
fn recent_desktop_logs_drop_oldest_entries_when_capacity_is_exceeded() {
let mut entries = VecDeque::new();
Expand Down
Loading