diff --git a/src-tauri/src/assistant/engine.rs b/src-tauri/src/assistant/engine.rs index af5d5c6..b12630e 100644 --- a/src-tauri/src/assistant/engine.rs +++ b/src-tauri/src/assistant/engine.rs @@ -903,6 +903,30 @@ pub(crate) fn build_system_prompt( - Be concise and direct in your responses. Prefer concrete actions and evidence over vague summaries.\n", ); + // Transport-drop recovery for grant/response-blocking tools. The local MCP + // transport can drop an in-flight call (surfaced to the model as + // "transport dropped mid-call; response for tool was lost"). For a + // tool that blocks on a user grant or answer, the outcome is then unknown, + // so the model must re-ask rather than assume an answer or proceed. The + // backend treats the re-asked call as superseding the orphaned one (the + // stale approval/question card is replaced in place), so no UI caveats are + // needed here. Scoped to sessions that actually expose such a tool; + // ordinary read/write tools, which can be retried without side effects, + // need no special handling. + let has_interactive_tool = tool_names + .iter() + .any(|n| matches!(*n, "ask_user" | "bash_exec" | "fs_request_grant")); + if has_interactive_tool { + prompt.push_str( + "\n## Interactive Tool Reliability\n\ + A tool call can occasionally fail with a transport error such as `MCP server \"clai\" transport dropped mid-call; response for tool was lost`. This means CLAI lost the in-flight call before its result reached you, so the call's outcome is UNKNOWN — it may or may not have run.\n\ + - This matters specifically for tools that block on a user grant or response — `ask_user`, and approval-gated `bash_exec` / `fs_request_grant`. When one of these drops mid-call, the user may never have answered, or they answered but the decision was lost.\n\ + - When it happens, re-issue the SAME interactive call once. CLAI replaces the lost prompt with the fresh one in the app, so the user simply answers the new prompt. Do NOT assume the lost call was approved, denied, or answered, and do NOT proceed past it.\n\ + - Apply this only to active CLAI human waits. If a user-input, command-approval, or filesystem-grant prompt expires or is denied, do not invent convoluted workarounds to bypass it. If the permission or answer is required, stop and explain what is blocked; retry only after a transport drop where the outcome is unknown.\n\ + - For non-interactive tools (reads, searches, writes), a transport drop needs no special handling — just retry normally if you still need the result.\n", + ); + } + if context.space_id.is_some() || !context.mcp_server_ids.is_empty() { prompt.push_str( "- This tab already carries session-specific context and capabilities. \ @@ -1427,6 +1451,51 @@ mod tests { assert!(text.contains("re-run the relevant tools if freshness matters.")); } + #[test] + fn build_system_prompt_adds_interactive_reliability_guidance_when_blocking_tool_present() { + let context = SessionContext::default(); + let tools = [crate::assistant::types::ToolDefinition { + name: "ask_user".to_string(), + description: "desc".to_string(), + input_schema: serde_json::json!({}), + }]; + + let message = build_system_prompt(&context, None, &tools, &RunTrigger::UserMessage); + let text = match &message.content[0] { + ContentPart::Text { text } => text, + other => panic!("expected text content, got {:?}", other), + }; + + assert!(text.contains("## Interactive Tool Reliability")); + assert!(text.contains("transport dropped mid-call")); + assert!(text.contains("re-issue the SAME interactive call once")); + assert!(text.contains("Apply this only to active CLAI human waits")); + assert!(text.contains("do not invent convoluted workarounds")); + // The backend supersedes the orphaned request when the model + // re-asks, so the prompt must NOT push stale-card caveats (e.g. + // telling the user to dismiss duplicates) onto the model. + assert!(text.contains("replaces the lost prompt with the fresh one")); + assert!(!text.contains("dismiss")); + } + + #[test] + fn build_system_prompt_omits_interactive_reliability_guidance_without_blocking_tool() { + let context = SessionContext::default(); + let tools = [crate::assistant::types::ToolDefinition { + name: "fs_read".to_string(), + description: "desc".to_string(), + input_schema: serde_json::json!({}), + }]; + + let message = build_system_prompt(&context, None, &tools, &RunTrigger::UserMessage); + let text = match &message.content[0] { + ContentPart::Text { text } => text, + other => panic!("expected text content, got {:?}", other), + }; + + assert!(!text.contains("## Interactive Tool Reliability")); + } + #[test] fn build_system_prompt_describes_autonomous_run_mode() { let context = SessionContext { diff --git a/src-tauri/src/assistant/local_mcp.rs b/src-tauri/src/assistant/local_mcp.rs index 96d8099..7eeaed8 100644 --- a/src-tauri/src/assistant/local_mcp.rs +++ b/src-tauri/src/assistant/local_mcp.rs @@ -42,7 +42,20 @@ pub struct LocalMcpRuntime { // whichever workspace happened to bind first. app: AppHandle, cancellation_token: CancellationToken, - bindings: Arc>>, + bindings: Arc>>, +} + +/// A registered run binding plus the run-scope token that reaps its +/// in-flight tool calls. [`BindingGuard::drop`] cancels `run_scope`, so +/// any tool future still racing in [`execute_bound_tool`]'s `select!` +/// when the run ends (e.g. an interactive wait orphaned by a CLI +/// transport drop) is dropped instead of lingering on the rmcp session +/// worker until its own timeout. Dropping those futures fires their +/// cleanup guards (pending-registry removal + `resolved` UI events). +#[derive(Clone)] +struct BoundRun { + binding: ToolBinding, + run_scope: CancellationToken, } #[derive(Clone)] @@ -89,6 +102,7 @@ impl LocalMcpRuntime { /// bindings map. pub fn bind_run(&self, binding: ToolBinding) -> BindingGuard { let token = Uuid::new_v4().to_string(); + let run_scope = CancellationToken::new(); // Bindings map only ever holds short, await-free critical sections, // so `std::sync::RwLock` is fine and lets `Drop` clean up sync. // A poisoned lock means the binding map is unusable; we'd rather @@ -96,17 +110,24 @@ impl LocalMcpRuntime { self.bindings .write() .expect("local MCP binding map poisoned") - .insert(token.clone(), binding); + .insert( + token.clone(), + BoundRun { + binding, + run_scope: run_scope.clone(), + }, + ); BindingGuard { bindings: self.bindings.clone(), token, + run_scope, } } fn binding_from_request( &self, context: &RequestContext, - ) -> Result { + ) -> Result { let token = bearer_token(context).ok_or_else(|| { McpError::invalid_request("missing bearer token for CLAI MCP request", None) })?; @@ -125,9 +146,18 @@ impl LocalMcpRuntime { /// token while alive and removes it from the runtime on drop, so a panic /// or early return between bind and the end of a run cannot leak a stale /// binding into the process-singleton MCP server. +/// +/// Dropping the guard also cancels the binding's run-scope token, which +/// reaps any tool call still in flight on the rmcp session worker. This +/// matters for interactive waits (`ask_user`, bash approvals, path +/// grants) orphaned by a CLI transport drop: the worker keeps their +/// futures alive past the dropped connection, so without this reap they +/// would pin pending approval entries (and their UI cards) until their +/// own multi-minute timeout. pub struct BindingGuard { - bindings: Arc>>, + bindings: Arc>>, token: String, + run_scope: CancellationToken, } impl BindingGuard { @@ -138,6 +168,10 @@ impl BindingGuard { impl Drop for BindingGuard { fn drop(&mut self) { + // Reap in-flight tool calls for this run BEFORE unbinding, so a + // racing request can't observe the binding gone while its + // already-running future survives the run. + self.run_scope.cancel(); if let Ok(mut bindings) = self.bindings.write() { bindings.remove(&self.token); } @@ -186,7 +220,11 @@ pub async fn ensure_started(app: &AppHandle) -> Result, Str // 127.0.0.1 bind, port-agnostically. let mut config = StreamableHttpServerConfig::default(); config.stateful_mode = true; - config.sse_keep_alive = None; + // Keep rmcp's default sse_keep_alive (15s pings). The + // response stream for an interactive tool call sits idle + // for as long as a human takes to answer, and an unpinged + // idle connection is exactly what rots into "transport + // dropped mid-call; response for tool was lost". config.cancellation_token = cancellation_token.child_token(); config }, @@ -234,7 +272,7 @@ impl ServerHandler for ClaiMcpService { context: RequestContext, ) -> impl Future> + Send + '_ { async move { - let binding = self.runtime.binding_from_request(&context)?; + let binding = self.runtime.binding_from_request(&context)?.binding; let session = repository::get_session(&binding.pool, &binding.session_id) .await .map_err(|e| McpError::internal_error(e, None))? @@ -262,14 +300,22 @@ impl ServerHandler for ClaiMcpService { context: RequestContext, ) -> impl Future> + Send + '_ { async move { - let binding = self.runtime.binding_from_request(&context)?; + let bound = self.runtime.binding_from_request(&context)?; let tool_name = request.name.to_string(); let params = request .arguments .map(serde_json::Value::Object) .unwrap_or(serde_json::Value::Object(Default::default())); - match execute_bound_tool(&self.runtime.app, &binding, &tool_name, params).await { + match execute_bound_tool( + &self.runtime.app, + &bound.binding, + &bound.run_scope, + &tool_name, + params, + ) + .await + { Ok(value) => Ok(CallToolResult::structured(value)), Err(error) => Ok(CallToolResult::structured_error(serde_json::json!({ "error": error, @@ -292,6 +338,7 @@ impl ServerHandler for ClaiMcpService { async fn execute_bound_tool( app: &AppHandle, binding: &ToolBinding, + run_scope: &CancellationToken, tool_name: &str, params: serde_json::Value, ) -> Result { @@ -347,6 +394,13 @@ async fn execute_bound_tool( tokio::select! { _ = binding.cancel_token.cancelled() => Err("run cancelled".to_string()), + // Run-scope reap: fires when `BindingGuard` drops at the end of the + // run. A live CLI always waits for its tool results before ending + // its turn, so the only futures still here at that point are + // orphans whose response stream was lost to a transport drop. + // Dropping them fires their cleanup guards (pending-approval + // removal, `resolved` UI events). + _ = run_scope.cancelled() => Err("run ended before this tool call completed".to_string()), result = tools::execute_tool(&deps, &tool_context, tool_name, params) => result, } } @@ -380,3 +434,99 @@ fn bearer_token(context: &RequestContext) -> Option { .filter(|value| !value.is_empty()) .map(str::to_string) } + +#[cfg(test)] +mod tests { + use super::*; + + fn test_binding() -> ToolBinding { + ToolBinding { + pool: sqlx::Pool::connect_lazy("sqlite::memory:").expect("lazy pool"), + session_id: "session-1".to_string(), + run_id: "run-1".to_string(), + cancel_token: CancellationToken::new(), + inter_agent_call_depth: None, + notices: Arc::new(Mutex::new(Vec::new())), + session_grants: Arc::new(Mutex::new(Vec::new())), + session_allowed_command_prefixes: Arc::new(Mutex::new(Vec::new())), + session_blocked_command_prefixes: Arc::new(Mutex::new(Vec::new())), + } + } + + /// Dropping the guard must cancel the run scope (reaping in-flight + /// tool futures racing on it in `execute_bound_tool`) and unbind the + /// bearer token — while leaving the run's own cancel token alone, so + /// reaping orphans at normal run end is not a run cancellation. + #[tokio::test] // tokio: the lazy sqlx pool requires a runtime context on drop + async fn binding_guard_drop_cancels_run_scope_and_unbinds() { + let bindings: Arc>> = + Arc::new(RwLock::new(HashMap::new())); + let binding = test_binding(); + let run_cancel = binding.cancel_token.clone(); + let run_scope = CancellationToken::new(); + bindings.write().unwrap().insert( + "token-1".to_string(), + BoundRun { + binding, + run_scope: run_scope.clone(), + }, + ); + let guard = BindingGuard { + bindings: bindings.clone(), + token: "token-1".to_string(), + run_scope: run_scope.clone(), + }; + + assert!(!run_scope.is_cancelled()); + drop(guard); + + assert!( + run_scope.is_cancelled(), + "guard drop must reap in-flight tool calls via the run scope" + ); + assert!( + bindings.read().unwrap().is_empty(), + "guard drop must unbind the bearer token" + ); + assert!( + !run_cancel.is_cancelled(), + "reaping at run end must not look like a run cancellation" + ); + } + + /// The reap arm in `execute_bound_tool` must drop the racing tool + /// future (firing its cleanup guards), not merely resolve alongside it. + #[tokio::test] + async fn run_scope_cancel_drops_in_flight_future() { + struct DropFlag(Arc>); + impl Drop for DropFlag { + fn drop(&mut self) { + *self.0.lock().unwrap() = true; + } + } + + let dropped = Arc::new(Mutex::new(false)); + let flag = DropFlag(dropped.clone()); + let run_scope = CancellationToken::new(); + let scope = run_scope.clone(); + + let task = tokio::spawn(async move { + let _flag = flag; // owned by the racing future, dropped with it + tokio::select! { + _ = scope.cancelled() => Err::<(), String>("run ended before this tool call completed".to_string()), + _ = std::future::pending::<()>() => Ok(()), // never resolves, like an unanswered approval + } + }); + + run_scope.cancel(); + let result = task.await.expect("select task must not panic"); + assert_eq!( + result.unwrap_err(), + "run ended before this tool call completed" + ); + assert!( + *dropped.lock().unwrap(), + "the in-flight future must be dropped so its RAII cleanup guards fire" + ); + } +} diff --git a/src-tauri/src/assistant/tools/ask_user.rs b/src-tauri/src/assistant/tools/ask_user.rs index 73b4ee0..a6fbc6b 100644 --- a/src-tauri/src/assistant/tools/ask_user.rs +++ b/src-tauri/src/assistant/tools/ask_user.rs @@ -97,7 +97,13 @@ pub struct AskUserAnswer { // Keyed by pending id; the value carries the owning session id so callers // (mid-run input delivery) can check whether a session is currently blocked // on a human answer without threading new state through the tool router. -type PendingMap = HashMap)>; +#[derive(Debug)] +enum AskUserOutcome { + Answer(AskUserAnswer), + Superseded, +} + +type PendingMap = HashMap)>; static PENDING: OnceLock> = OnceLock::new(); fn pending_map() -> &'static Mutex { @@ -124,15 +130,41 @@ pub fn submit_answer(pending_id: &str, answer: AskUserAnswer) -> Result<(), Stri let (_, tx) = map .remove(pending_id) .ok_or_else(|| format!("No pending ask_user with id `{}`", pending_id))?; - tx.send(answer) + tx.send(AskUserOutcome::Answer(answer)) .map_err(|_| "ask_user receiver was dropped (run already ended)".to_string()) } +/// Removes every pending ask for `session_id`, returning the removed +/// pending ids. Called when a new ask is registered for the session: the +/// frontend renders a single ask panel per session, so a still-pending +/// ask at registration time is an orphan (its CLI transport dropped +/// mid-call and the model is re-asking). The removed senders receive an +/// explicit superseded outcome, so channel closure remains reserved for +/// cancellation/teardown. +pub fn take_for_session(session_id: &str) -> Vec { + let Ok(mut map) = pending_map().lock() else { + return Vec::new(); + }; + let ids: Vec = map + .iter() + .filter(|(_, (owner, _))| owner == session_id) + .map(|(id, _)| id.clone()) + .collect(); + for id in &ids { + if let Some((_, tx)) = map.remove(id) { + let _ = tx.send(AskUserOutcome::Superseded); + } + } + ids +} + /// RAII guard: removes the pending entry if the tool future is dropped -/// before the channel resolves (run cancellation, CLI MCP timeout, or -/// abandoned transport). In that case the answer panel cannot resume the -/// original tool call, so clear the UI and cancel the run rather than -/// letting the model continue without the requested human answer. +/// before the channel resolves — run cancellation, or run-end reaping of +/// a wait orphaned by a CLI transport drop (`BindingGuard` cancels the +/// run scope, dropping this future). In that case the answer panel +/// cannot resume the original tool call, so clear the UI and cancel the +/// run rather than letting the model continue without the requested +/// human answer (a no-op when the run already ended). struct PendingGuard { id: String, app: tauri::AppHandle, @@ -146,6 +178,22 @@ impl PendingGuard { fn disarm(&mut self) { self.armed = false; } + + async fn expire_and_stop(&mut self) -> T { + if let Ok(mut map) = pending_map().lock() { + map.remove(&self.id); + } + let _ = emit_event( + &self.app, + &self.session, + Some(self.run_id.as_str()), + AssistantUiEvent::AskUserResolved { + pending_id: self.id.clone(), + }, + ); + self.armed = false; + super::cancel_run_and_park(&self.cancel_token).await + } } impl Drop for PendingGuard { @@ -191,8 +239,24 @@ pub async fn execute( .await? .ok_or_else(|| format!("Session not found: {}", context.session_id))?; + // Supersede any orphaned ask for this session (the model re-asking + // after a CLI transport drop lost the original call). The frontend + // keeps one ask panel per session, so the fresh question below + // replaces the stale one; resolving the old ids also unblocks + // `session_has_pending_ask` for mid-run input delivery. + for stale_id in take_for_session(&session.id) { + let _ = emit_event( + &deps.app, + &session, + Some(context.run_id.as_str()), + AssistantUiEvent::AskUserResolved { + pending_id: stale_id, + }, + ); + } + let pending_id = Uuid::new_v4().to_string(); - let (tx, rx) = oneshot::channel::(); + let (tx, rx) = oneshot::channel::(); { let mut map = pending_map() .lock() @@ -223,13 +287,23 @@ pub async fn execute( let wait_timeout = context.interactive_wait_timeout(ASK_USER_TIMEOUT); let answer = match tokio::time::timeout(wait_timeout, rx).await { - Ok(Ok(answer)) => answer, - Ok(Err(_)) => return Err("ask_user channel closed (sender dropped)".to_string()), + Ok(Ok(AskUserOutcome::Answer(answer))) => answer, + Ok(Ok(AskUserOutcome::Superseded)) => { + // A fresh question for this session replaced this orphaned + // wait after a transport drop. Ignore the stale future. + guard.disarm(); + return Err( + "ask_user was superseded by a newer question before an answer arrived".to_string(), + ); + } + Ok(Err(_)) if context.cancel_token.is_cancelled() => { + return super::cancel_run_and_park(&context.cancel_token).await; + } + Ok(Err(_)) => { + return guard.expire_and_stop().await; + } Err(_) => { - return Err(format!( - "ask_user timed out waiting for a user answer after {} seconds", - wait_timeout.as_secs() - )) + return guard.expire_and_stop().await; } }; guard.disarm(); @@ -325,4 +399,63 @@ mod tests { let labels: Vec<&str> = result.iter().map(|o| o.label.as_str()).collect(); assert_eq!(labels, vec!["Other option", "None of the above"]); } + + // PENDING is a process-wide static shared across parallel tests, so + // each test uses unique session ids to stay isolated. + + #[test] + fn take_for_session_removes_only_that_sessions_asks_and_closes_channels() { + let (tx_a, rx_a) = oneshot::channel::(); + let (tx_b, mut rx_b) = oneshot::channel::(); + { + let mut map = pending_map().lock().unwrap(); + map.insert( + "tfs-pending-a".to_string(), + ("tfs-session-a".to_string(), tx_a), + ); + map.insert( + "tfs-pending-b".to_string(), + ("tfs-session-b".to_string(), tx_b), + ); + } + + let removed = take_for_session("tfs-session-a"); + assert_eq!(removed, vec!["tfs-pending-a".to_string()]); + assert!(!session_has_pending_ask("tfs-session-a")); + assert!(session_has_pending_ask("tfs-session-b")); + + // The orphaned wait receives an explicit supersede signal. + assert!(matches!( + rx_a.blocking_recv(), + Ok(AskUserOutcome::Superseded) + )); + // ...while the other session's channel stays open. + assert!(matches!( + rx_b.try_recv(), + Err(oneshot::error::TryRecvError::Empty) + )); + + // Cleanup so other tests see an empty map for these ids. + take_for_session("tfs-session-b"); + } + + #[test] + fn submit_answer_fails_for_superseded_pending_id() { + let (tx, _rx) = oneshot::channel::(); + pending_map() + .lock() + .unwrap() + .insert("sup-pending".to_string(), ("sup-session".to_string(), tx)); + take_for_session("sup-session"); + + let result = submit_answer( + "sup-pending", + AskUserAnswer { + text: "late".to_string(), + selected_option_index: None, + selected_option_indexes: None, + }, + ); + assert!(result.is_err(), "superseded ask must not accept answers"); + } } diff --git a/src-tauri/src/assistant/tools/local.rs b/src-tauri/src/assistant/tools/local.rs index 1fe1750..ff5a112 100644 --- a/src-tauri/src/assistant/tools/local.rs +++ b/src-tauri/src/assistant/tools/local.rs @@ -1194,13 +1194,15 @@ fn is_pure_assignment_token(tok: &str) -> bool { } /// Cleans up an abandoned permission request when the approval-wait future -/// is dropped without a user decision — the Claude Code CLI dropping the MCP -/// transport mid-call (its "response for tool bash_exec was lost" message), -/// or the run being cancelled. In either case the awaiting future below is -/// dropped, so this guard runs: it removes the still-pending registry entry, -/// tells the frontend to drop the now-useless approval card, and cancels the -/// run so the model cannot continue without the missing decision. Disarmed on -/// a normal decision, where the submit command already removed the entry. +/// is dropped without a user decision — the run being cancelled, or the +/// run ending while this wait was orphaned by a CLI transport drop (the +/// rmcp session worker keeps the future alive past the dropped +/// connection; `BindingGuard` reaps it at run end, which drops this +/// future). The guard then removes the still-pending registry entry, +/// tells the frontend to drop the now-useless approval card, and cancels +/// the run so the model cannot continue without the missing decision +/// (a no-op when the run already ended). Disarmed on a normal decision or +/// explicit supersede, where the caller already removed/replaced the entry. /// /// Cleanup is async (registry lock) so it's spawned onto the app runtime; /// `Drop` can't await. `take` is a no-op if the entry was already removed @@ -1217,6 +1219,22 @@ impl AbandonedApprovalGuard { fn disarm(&mut self) { self.armed = false; } + + async fn expire_and_stop(&mut self) -> T { + use tauri::Manager; + + let state = self.app.state::(); + if let Some((_, remaining)) = state.pending_approvals.take(&self.request_id).await { + crate::commands::permissions::emit_permission_resolved(&self.app, &self.request_id); + crate::commands::permissions::emit_attention( + &self.app, + self.workspace_id.clone(), + remaining, + ); + } + self.armed = false; + super::cancel_run_and_park(&self.cancel_token).await + } } impl Drop for AbandonedApprovalGuard { @@ -1256,8 +1274,8 @@ async fn await_user_permission( segments: Vec, ) -> Result<(), String> { use crate::commands::permissions::{ - emit_attention, PermissionRequest, SegmentDecision, APPROVAL_TIMEOUT, - PERMISSION_REQUEST_EVENT, + emit_attention, PendingApprovalOutcome, PermissionRequest, SegmentDecision, + APPROVAL_TIMEOUT, PERMISSION_REQUEST_EVENT, }; use tauri::{Emitter, Manager}; @@ -1277,7 +1295,26 @@ async fn await_user_permission( }; let request_id = request.request_id.clone(); - let (rx, count) = app_state.pending_approvals.register(request.clone()).await; + // Supersede: if a previous request for this exact run + command is + // still pending, it is an orphan — its CLI transport dropped mid-call + // and the model is now re-asking. Replace it (and its UI card) with + // the fresh request instead of stacking a duplicate the user can no + // longer meaningfully answer. The stale waiter receives an explicit + // supersede outcome (no run cancellation — this run is alive and + // waiting on the NEW request). + for stale in app_state + .pending_approvals + .take_superseded(&context.run_id, command) + .await + { + crate::commands::permissions::emit_permission_resolved(&deps.app, &stale.request_id); + emit_attention(&deps.app, stale.workspace_id.clone(), stale.remaining); + } + + let (rx, count) = app_state + .pending_approvals + .register(request.clone(), context.run_id.clone()) + .await; if let Err(e) = deps.app.emit(PERMISSION_REQUEST_EVENT, &request) { tracing::warn!("Failed to emit permission request event: {}", e); @@ -1287,7 +1324,7 @@ async fn await_user_permission( // Cleans up if this future is abandoned before a decision (CLI transport // drop mid-call, or run cancellation): clears the pending entry and tells // the frontend to drop the now-useless approval card. Disarmed below on a - // normal decision; the timeout / channel-closed arms let it fire on drop. + // normal decision and explicit supersede. let mut abandon_guard = AbandonedApprovalGuard { app: deps.app.clone(), cancel_token: context.cancel_token.clone(), @@ -1298,27 +1335,29 @@ async fn await_user_permission( let wait_timeout = context.interactive_wait_timeout(APPROVAL_TIMEOUT); let decisions = match tokio::time::timeout(wait_timeout, rx).await { - Ok(Ok(d)) => { + Ok(Ok(PendingApprovalOutcome::Decision(d))) => { // The submit command already removed the registry entry and the // frontend cleared the card optimistically. abandon_guard.disarm(); d } - Ok(Err(_)) => { - let msg = "Permission approval channel closed before a decision was made"; - context.add_notice(RunNoticeKind::CommandDenied, msg.to_string()); + Ok(Ok(PendingApprovalOutcome::Superseded)) => { + // A fresh registration for the same run + command replaced + // this orphaned wait after a transport drop. This stale future + // is intentionally ignored: no notice, no warning, no cancel. + abandon_guard.disarm(); + let msg = "Permission request was superseded by a newer request \ + for the same command before a decision was made"; return Err(msg.to_string()); } + Ok(Err(_)) if context.cancel_token.is_cancelled() => { + return super::cancel_run_and_park(&context.cancel_token).await; + } + Ok(Err(_)) => { + return abandon_guard.expire_and_stop().await; + } Err(_) => { - // Human-wait timeout. `abandon_guard` clears the pending entry, - // emits attention, drops the card, and cancels the run when it - // goes out of scope. - let msg = format!( - "Permission approval timed out after {} seconds", - wait_timeout.as_secs() - ); - context.add_notice(RunNoticeKind::CommandDenied, msg.to_string()); - return Err(msg); + return abandon_guard.expire_and_stop().await; } }; @@ -1528,8 +1567,8 @@ fn access_to_str(access: FilesystemPathAccess) -> &'static str { /// return shape (path grants aren't per-segment). /// Path-grant analogue of [`AbandonedApprovalGuard`]: clears a pending /// filesystem path-grant request, drops its card, and cancels the run when -/// the approval-wait future is abandoned (CLI transport drop mid-call, or -/// run cancellation). +/// the approval-wait future is abandoned (run cancellation, or run-end +/// reaping of a wait orphaned by a CLI transport drop). struct AbandonedPathGrantGuard { app: tauri::AppHandle, cancel_token: tokio_util::sync::CancellationToken, @@ -1542,6 +1581,22 @@ impl AbandonedPathGrantGuard { fn disarm(&mut self) { self.armed = false; } + + async fn expire_and_stop(&mut self) -> T { + use tauri::Manager; + + let state = self.app.state::(); + if let Some((_, remaining)) = state.pending_path_grants.take(&self.request_id).await { + crate::commands::path_grants::emit_path_grant_resolved(&self.app, &self.request_id); + crate::commands::path_grants::emit_attention( + &self.app, + self.workspace_id.clone(), + remaining, + ); + } + self.armed = false; + super::cancel_run_and_park(&self.cancel_token).await + } } impl Drop for AbandonedPathGrantGuard { @@ -1570,7 +1625,7 @@ async fn await_path_grant_decision( request: crate::commands::path_grants::PathGrantRequest, ) -> Result { use crate::commands::path_grants::{ - emit_attention, PATH_GRANT_REQUEST_EVENT, PATH_GRANT_TIMEOUT, + emit_attention, PendingPathGrantOutcome, PATH_GRANT_REQUEST_EVENT, PATH_GRANT_TIMEOUT, }; use tauri::{Emitter, Manager}; @@ -1578,9 +1633,25 @@ async fn await_path_grant_decision( let workspace_id = context.workspace_id.clone(); let request_id = request.request_id.clone(); + // Supersede a stale orphaned request for the same run + path + access + // (the model re-asked after a CLI transport drop). See the analogous + // block in `await_user_permission` for the full rationale. + for stale in app_state + .pending_path_grants + .take_superseded( + &context.run_id, + &request.requested_path, + request.requested_access, + ) + .await + { + crate::commands::path_grants::emit_path_grant_resolved(&deps.app, &stale.request_id); + emit_attention(&deps.app, stale.workspace_id.clone(), stale.remaining); + } + let (rx, count) = app_state .pending_path_grants - .register(request.clone()) + .register(request.clone(), context.run_id.clone()) .await; if let Err(e) = deps.app.emit(PATH_GRANT_REQUEST_EVENT, &request) { @@ -1590,7 +1661,7 @@ async fn await_path_grant_decision( // See `AbandonedApprovalGuard`: clears the pending entry and drops the // card if this future is abandoned before a decision. Disarmed on a - // normal decision; the timeout / channel-closed arms let it fire on drop. + // normal decision and explicit supersede. let mut abandon_guard = AbandonedPathGrantGuard { app: deps.app.clone(), cancel_token: context.cancel_token.clone(), @@ -1601,26 +1672,25 @@ async fn await_path_grant_decision( let wait_timeout = context.interactive_wait_timeout(PATH_GRANT_TIMEOUT); match tokio::time::timeout(wait_timeout, rx).await { - Ok(Ok(decision)) => { + Ok(Ok(PendingPathGrantOutcome::Decision(decision))) => { abandon_guard.disarm(); Ok(decision) } - Ok(Err(_)) => { - let msg = "Path-grant approval channel closed before a decision was made".to_string(); - context.add_notice(RunNoticeKind::PathGrantDenied, msg.clone()); + Ok(Ok(PendingPathGrantOutcome::Superseded)) => { + // A fresh registration for the same run + path + access + // replaced this orphaned wait after a transport drop. This + // stale future is intentionally ignored. + abandon_guard.disarm(); + let msg = "Path-grant request was superseded by a newer request \ + for the same path before a decision was made" + .to_string(); Err(msg) } - Err(_) => { - // Human-wait timeout. `abandon_guard` clears the pending entry, - // emits attention, drops the card, and cancels the run when it - // goes out of scope. - let msg = format!( - "Path-grant approval timed out after {} seconds", - wait_timeout.as_secs() - ); - context.add_notice(RunNoticeKind::PathGrantDenied, msg.clone()); - Err(msg) + Ok(Err(_)) if context.cancel_token.is_cancelled() => { + super::cancel_run_and_park(&context.cancel_token).await } + Ok(Err(_)) => abandon_guard.expire_and_stop().await, + Err(_) => abandon_guard.expire_and_stop().await, } } diff --git a/src-tauri/src/assistant/tools/mod.rs b/src-tauri/src/assistant/tools/mod.rs index bba7390..ba5000d 100644 --- a/src-tauri/src/assistant/tools/mod.rs +++ b/src-tauri/src/assistant/tools/mod.rs @@ -20,6 +20,15 @@ use crate::assistant::types::{ }; use crate::config::{ExecutionCapabilityConfig, FilesystemPathGrant}; +/// Terminal human-wait handling: once a CLAI-owned prompt for user input or +/// permission expires, cancel the run and never return a tool result for the +/// model to route around. The outer run driver races tool execution against +/// this token and will drop this parked future while cancelling the run. +pub async fn cancel_run_and_park(cancel_token: &CancellationToken) -> T { + cancel_token.cancel(); + std::future::pending::().await +} + /// The name under which clai's local MCP server is registered with CLI /// providers (`write_mcp_config` for Claude Code, `add_codex_common_args` /// for Codex — both in `assistant::local_agent`). diff --git a/src-tauri/src/commands/path_grants.rs b/src-tauri/src/commands/path_grants.rs index bbb6440..d8625bb 100644 --- a/src-tauri/src/commands/path_grants.rs +++ b/src-tauri/src/commands/path_grants.rs @@ -42,10 +42,11 @@ use crate::AppState; pub const PATH_GRANT_REQUEST_EVENT: &str = "path-grants://request"; pub const PATH_GRANT_ATTENTION_EVENT: &str = "path-grants://attention"; /// Emitted when a pending path-grant request is cleared *without* a user -/// decision — the tool call was abandoned (CLI transport dropped mid-call, -/// run cancelled) or it timed out. The inline path-grant card removes the -/// now-useless card on this. Normal submissions clear the card optimistically -/// on the frontend, so they don't emit this. +/// decision — the run was cancelled or ended (reaping a wait orphaned by +/// a CLI transport drop), the wait timed out, or a re-asked grant +/// superseded the stale request. The inline path-grant card removes the +/// now-useless card on this. Normal submissions clear the card +/// optimistically on the frontend, so they don't emit this. pub const PATH_GRANT_RESOLVED_EVENT: &str = "path-grants://resolved"; /// Same bound as the command-approval flow: 24h is generous enough that @@ -119,13 +120,30 @@ struct PendingInner { counts: HashMap, u32>, } +#[derive(Debug)] +pub enum PendingPathGrantOutcome { + Decision(PathGrantDecision), + Superseded, +} + pub struct PendingEntry { - pub sender: oneshot::Sender, + pub sender: oneshot::Sender, pub workspace_id: Option, pub agent_id: Option, + /// The run awaiting this decision. Used by + /// [`PendingPathGrants::take_superseded`] so a re-asked grant (after a + /// CLI transport drop orphaned the original request) replaces the + /// stale entry instead of stacking a duplicate card. + pub run_id: String, pub request: PathGrantRequest, } +pub struct SupersededPathGrant { + pub request_id: String, + pub workspace_id: Option, + pub remaining: u32, +} + impl PendingPathGrants { pub fn new() -> Self { Self { @@ -139,7 +157,8 @@ impl PendingPathGrants { pub async fn register( &self, request: PathGrantRequest, - ) -> (oneshot::Receiver, u32) { + run_id: String, + ) -> (oneshot::Receiver, u32) { let (tx, rx) = oneshot::channel(); let mut inner = self.inner.lock().await; let request_id = request.request_id.clone(); @@ -151,6 +170,7 @@ impl PendingPathGrants { sender: tx, workspace_id: workspace_id.clone(), agent_id, + run_id, request, }, ); @@ -182,10 +202,18 @@ impl PendingPathGrants { .collect() } - /// See [`crate::commands::permissions::PendingApprovals::purge_workspace`]. - /// Same semantics — drops every pending path-grant request for the - /// given workspace and clears its count. Used by `workspace_delete`. - pub async fn purge_workspace(&self, workspace_id: &str) -> usize { + /// See [`crate::commands::permissions::PendingApprovals::purge_workspace_canceling_runs`]. + /// Same semantics — cancels runs before dropping every pending + /// path-grant request for the given workspace, clears its count, and + /// returns the cancelled run ids. Used by `workspace_delete`. + pub async fn purge_workspace_canceling_runs( + &self, + workspace_id: &str, + mut cancel_run: F, + ) -> Vec + where + F: FnMut(&str), + { let mut inner = self.inner.lock().await; let to_remove: Vec = inner .entries @@ -193,12 +221,22 @@ impl PendingPathGrants { .filter(|(_, entry)| entry.workspace_id.as_deref() == Some(workspace_id)) .map(|(id, _)| id.clone()) .collect(); - let count = to_remove.len(); + let mut run_ids = Vec::with_capacity(to_remove.len()); + for id in &to_remove { + if let Some(entry) = inner.entries.get(id) { + run_ids.push(entry.run_id.clone()); + } + } + run_ids.sort(); + run_ids.dedup(); + for run_id in &run_ids { + cancel_run(run_id); + } for id in to_remove { inner.entries.remove(&id); } inner.counts.remove(&Some(workspace_id.to_string())); - count + run_ids } pub async fn take(&self, request_id: &str) -> Option<(PendingEntry, u32)> { @@ -217,6 +255,57 @@ impl PendingPathGrants { }; Some((entry, count)) } + + /// See [`crate::commands::permissions::PendingApprovals::take_superseded`]. + /// Same semantics for path grants, keyed on run + requested path + + /// requested access: a fresh registration for the same grant replaces + /// any stale orphaned entry (and its UI card) instead of stacking a + /// duplicate. Supersede is explicit; channel closure remains + /// cancellation/teardown. + pub async fn take_superseded( + &self, + run_id: &str, + requested_path: &str, + requested_access: crate::config::FilesystemPathAccess, + ) -> Vec { + let mut inner = self.inner.lock().await; + let ids: Vec = inner + .entries + .iter() + .filter(|(_, entry)| { + entry.run_id == run_id + && entry.request.requested_path == requested_path + && entry.request.requested_access == requested_access + }) + .map(|(id, _)| id.clone()) + .collect(); + let mut taken = Vec::with_capacity(ids.len()); + for id in ids { + let Some(entry) = inner.entries.remove(&id) else { + continue; + }; + let request_id = entry.request.request_id.clone(); + let workspace_id = entry.workspace_id.clone(); + let count = match inner.counts.get_mut(&entry.workspace_id) { + Some(n) if *n > 0 => { + *n -= 1; + let v = *n; + if v == 0 { + inner.counts.remove(&entry.workspace_id); + } + v + } + _ => 0, + }; + let _ = entry.sender.send(PendingPathGrantOutcome::Superseded); + taken.push(SupersededPathGrant { + request_id, + workspace_id, + remaining: count, + }); + } + taken + } } impl Default for PendingPathGrants { @@ -293,7 +382,9 @@ pub async fn submit_path_grant_decision( )?; } - let _ = entry.sender.send(validated); + let _ = entry + .sender + .send(PendingPathGrantOutcome::Decision(validated)); emit_attention(&app, entry.workspace_id, remaining); Ok(()) } @@ -620,7 +711,7 @@ mod tests { requested_access: FilesystemPathAccess::ReadOnly, reason: "r".to_string(), }; - let (_rx, count) = pending.register(request).await; + let (_rx, count) = pending.register(request, "run-1".to_string()).await; assert_eq!(count, 1); let taken = pending.take("id-1").await; assert!(taken.is_some()); @@ -647,10 +738,91 @@ mod tests { ..a.clone() }; a.request_id = "id-a".to_string(); - let _ = pending.register(a).await; - let _ = pending.register(b).await; + let _ = pending.register(a, "run-1".to_string()).await; + let _ = pending.register(b, "run-1".to_string()).await; let list = pending.list_for_workspace("ws-A").await; assert_eq!(list.len(), 1); assert_eq!(list[0].request_id, "id-a"); } + + #[tokio::test] + async fn take_superseded_matches_run_path_and_access() { + let pending = PendingPathGrants::new(); + let base = PathGrantRequest { + request_id: "id-stale".to_string(), + workspace_id: Some("ws".to_string()), + agent_id: None, + agent_name: None, + requested_path: "/p".to_string(), + requested_access: FilesystemPathAccess::ReadOnly, + reason: "r".to_string(), + }; + let other_access = PathGrantRequest { + request_id: "id-access".to_string(), + requested_access: FilesystemPathAccess::ReadWrite, + ..base.clone() + }; + let other_path = PathGrantRequest { + request_id: "id-path".to_string(), + requested_path: "/q".to_string(), + ..base.clone() + }; + let (stale_rx, _) = pending.register(base, "run-1".to_string()).await; + let _rx2 = pending.register(other_access, "run-1".to_string()).await; + let _rx3 = pending.register(other_path, "run-1".to_string()).await; + + let taken = pending + .take_superseded("run-1", "/p", FilesystemPathAccess::ReadOnly) + .await; + assert_eq!(taken.len(), 1); + assert_eq!(taken[0].request_id, "id-stale"); + assert_eq!( + taken[0].remaining, 2, + "unrelated entries must remain counted" + ); + + assert!(matches!( + stale_rx.await, + Ok(PendingPathGrantOutcome::Superseded) + )); + assert!(pending.take("id-access").await.is_some()); + assert!(pending.take("id-path").await.is_some()); + + // A different run never supersedes. + assert!(pending + .take_superseded("run-2", "/q", FilesystemPathAccess::ReadOnly) + .await + .is_empty()); + } + + #[tokio::test] + async fn purge_workspace_cancels_run_before_dropping_sender() { + let pending = PendingPathGrants::new(); + let request = PathGrantRequest { + request_id: "id-1".to_string(), + workspace_id: Some("ws-1".to_string()), + agent_id: None, + agent_name: None, + requested_path: "/p".to_string(), + requested_access: FilesystemPathAccess::ReadOnly, + reason: "r".to_string(), + }; + let mut rx = pending.register(request, "run-1".to_string()).await.0; + + let run_ids = pending + .purge_workspace_canceling_runs("ws-1", |run_id| { + assert_eq!(run_id, "run-1"); + assert!(matches!( + rx.try_recv(), + Err(oneshot::error::TryRecvError::Empty) + )); + }) + .await; + + assert_eq!(run_ids, vec!["run-1".to_string()]); + assert!( + rx.await.is_err(), + "purge drops the sender after cancellation" + ); + } } diff --git a/src-tauri/src/commands/permissions.rs b/src-tauri/src/commands/permissions.rs index 67b0449..5e1e6e3 100644 --- a/src-tauri/src/commands/permissions.rs +++ b/src-tauri/src/commands/permissions.rs @@ -38,10 +38,11 @@ use crate::AppState; pub const PERMISSION_REQUEST_EVENT: &str = "permissions://request"; pub const PERMISSION_ATTENTION_EVENT: &str = "permissions://attention"; /// Emitted when a pending request is cleared *without* a user decision — -/// the tool call was abandoned (CLI transport dropped mid-call, run -/// cancelled) or it timed out. The inline approval card removes the now- -/// useless card on this. Normal user submissions remove the card -/// optimistically on the frontend, so they don't emit this. +/// the run was cancelled or ended (reaping a wait orphaned by a CLI +/// transport drop), the wait timed out, or a re-asked command superseded +/// the stale request. The inline approval card removes the now-useless +/// card on this. Normal user submissions remove the card optimistically +/// on the frontend, so they don't emit this. pub const PERMISSION_RESOLVED_EVENT: &str = "permissions://resolved"; /// Maximum time the bash handler waits for a user response. Past this @@ -140,15 +141,32 @@ struct PendingInner { counts: HashMap, u32>, } +#[derive(Debug)] +pub enum PendingApprovalOutcome { + Decision(Vec), + Superseded, +} + pub struct PendingEntry { - pub sender: oneshot::Sender>, + pub sender: oneshot::Sender, pub workspace_id: Option, + /// The run that is awaiting this decision. Used by + /// [`PendingApprovals::take_superseded`] so a re-asked command (after + /// a CLI transport drop orphaned the original request) replaces the + /// stale entry instead of stacking a duplicate card. + pub run_id: String, /// The original request payload as emitted to the frontend. Stored /// so that components that mount after the event fired can still /// discover the request via [`list_pending_permission_requests`]. pub request: PermissionRequest, } +pub struct SupersededApproval { + pub request_id: String, + pub workspace_id: Option, + pub remaining: u32, +} + impl PendingApprovals { pub fn new() -> Self { Self { @@ -169,7 +187,8 @@ impl PendingApprovals { pub async fn register( &self, request: PermissionRequest, - ) -> (oneshot::Receiver>, u32) { + run_id: String, + ) -> (oneshot::Receiver, u32) { let (tx, rx) = oneshot::channel(); let mut inner = self.inner.lock().await; let request_id = request.request_id.clone(); @@ -179,6 +198,7 @@ impl PendingApprovals { PendingEntry { sender: tx, workspace_id: workspace_id.clone(), + run_id, request, }, ); @@ -215,11 +235,18 @@ impl PendingApprovals { /// Drops every pending entry belonging to `workspace_id` and clears /// its count. Used by `workspace_delete` so requests scoped to a /// just-deleted workspace don't linger in memory until restart. - /// Dropping each entry's `sender` closes the oneshot channel, which - /// surfaces as a "channel closed" error on the bash-tool side that - /// was awaiting the decision — appropriate, since the workspace - /// (and therefore the in-flight call's context) is gone. - pub async fn purge_workspace(&self, workspace_id: &str) -> usize { + /// Cancels the runs awaiting those entries before dropping their + /// senders, then returns the cancelled run ids. Channel closure is + /// reserved for cancellation/teardown; supersede is delivered as an + /// explicit [`PendingApprovalOutcome::Superseded`]. + pub async fn purge_workspace_canceling_runs( + &self, + workspace_id: &str, + mut cancel_run: F, + ) -> Vec + where + F: FnMut(&str), + { let mut inner = self.inner.lock().await; let to_remove: Vec = inner .entries @@ -227,12 +254,22 @@ impl PendingApprovals { .filter(|(_, entry)| entry.workspace_id.as_deref() == Some(workspace_id)) .map(|(id, _)| id.clone()) .collect(); - let count = to_remove.len(); + let mut run_ids = Vec::with_capacity(to_remove.len()); + for id in &to_remove { + if let Some(entry) = inner.entries.get(id) { + run_ids.push(entry.run_id.clone()); + } + } + run_ids.sort(); + run_ids.dedup(); + for run_id in &run_ids { + cancel_run(run_id); + } for id in to_remove { inner.entries.remove(&id); } inner.counts.remove(&Some(workspace_id.to_string())); - count + run_ids } /// Removes the pending entry and decrements its workspace count. @@ -254,6 +291,50 @@ impl PendingApprovals { }; Some((entry, count)) } + + /// Removes every pending entry for the same run + command and returns + /// each with the post-removal workspace count. Called by the bash + /// approval flow right before registering a fresh request: when a CLI + /// transport drop orphans an in-flight approval and the model re-asks + /// the same command, the stale entry (and its UI card) is replaced by + /// the fresh one instead of lingering next to it. The stale waiter is + /// woken with an explicit supersede outcome, not by dropping its + /// channel, so unrelated teardown cannot masquerade as supersede. + pub async fn take_superseded(&self, run_id: &str, command: &str) -> Vec { + let mut inner = self.inner.lock().await; + let ids: Vec = inner + .entries + .iter() + .filter(|(_, entry)| entry.run_id == run_id && entry.request.command == command) + .map(|(id, _)| id.clone()) + .collect(); + let mut taken = Vec::with_capacity(ids.len()); + for id in ids { + let Some(entry) = inner.entries.remove(&id) else { + continue; + }; + let request_id = entry.request.request_id.clone(); + let workspace_id = entry.workspace_id.clone(); + let count = match inner.counts.get_mut(&entry.workspace_id) { + Some(n) if *n > 0 => { + *n -= 1; + let v = *n; + if v == 0 { + inner.counts.remove(&entry.workspace_id); + } + v + } + _ => 0, + }; + let _ = entry.sender.send(PendingApprovalOutcome::Superseded); + taken.push(SupersededApproval { + request_id, + workspace_id, + remaining: count, + }); + } + taken + } } impl Default for PendingApprovals { @@ -318,7 +399,9 @@ pub async fn submit_permission_decision( ) { persist_decisions_to_agent(state.inner(), workspace_id, agent_id, &decisions)?; } - let _ = entry.sender.send(decisions); + let _ = entry + .sender + .send(PendingApprovalOutcome::Decision(decisions)); emit_attention(&app, entry.workspace_id, remaining); Ok(()) } @@ -457,7 +540,7 @@ mod tests { let pending = PendingApprovals::new(); let req = fake_request(Some("ws-1")); let id = req.request_id.clone(); - let (_rx, count) = pending.register(req).await; + let (_rx, count) = pending.register(req, "run-1".to_string()).await; assert_eq!(count, 1); let taken = pending.take(&id).await; assert!(taken.is_some()); @@ -472,8 +555,8 @@ mod tests { let req2 = fake_request(Some("ws-1")); let id1 = req1.request_id.clone(); let id2 = req2.request_id.clone(); - let (_rx1, c1) = pending.register(req1).await; - let (_rx2, c2) = pending.register(req2).await; + let (_rx1, c1) = pending.register(req1, "run-1".to_string()).await; + let (_rx2, c2) = pending.register(req2, "run-1".to_string()).await; assert_eq!(c1, 1); assert_eq!(c2, 2); let (_, remaining) = pending.take(&id1).await.unwrap(); @@ -487,8 +570,8 @@ mod tests { let pending = PendingApprovals::new(); let req_a = fake_request(Some("ws-A")); let req_b = fake_request(Some("ws-B")); - let _ = pending.register(req_a.clone()).await; - let _ = pending.register(req_b).await; + let _ = pending.register(req_a.clone(), "run-1".to_string()).await; + let _ = pending.register(req_b, "run-1".to_string()).await; let list = pending.list_for_workspace("ws-A").await; assert_eq!(list.len(), 1); assert_eq!(list[0].request_id, req_a.request_id); @@ -497,14 +580,92 @@ mod tests { #[tokio::test] async fn counts_snapshot_aggregates_by_workspace_and_drops_anon() { let pending = PendingApprovals::new(); - let _ = pending.register(fake_request(Some("ws-A"))).await; - let _ = pending.register(fake_request(Some("ws-A"))).await; - let _ = pending.register(fake_request(Some("ws-B"))).await; - let _ = pending.register(fake_request(None)).await; + let _ = pending + .register(fake_request(Some("ws-A")), "run-1".to_string()) + .await; + let _ = pending + .register(fake_request(Some("ws-A")), "run-1".to_string()) + .await; + let _ = pending + .register(fake_request(Some("ws-B")), "run-1".to_string()) + .await; + let _ = pending + .register(fake_request(None), "run-1".to_string()) + .await; let snapshot = pending.counts_snapshot().await; assert_eq!(snapshot.get("ws-A"), Some(&2)); assert_eq!(snapshot.get("ws-B"), Some(&1)); assert_eq!(snapshot.len(), 2); } + + #[tokio::test] + async fn take_superseded_removes_only_same_run_and_command() { + let pending = PendingApprovals::new(); + let stale = fake_request(Some("ws-1")); // command "cmd" + let stale_id = stale.request_id.clone(); + let mut other_cmd = fake_request(Some("ws-1")); + other_cmd.command = "different".to_string(); + let other_cmd_id = other_cmd.request_id.clone(); + let other_run = fake_request(Some("ws-1")); // command "cmd", run-2 + let other_run_id = other_run.request_id.clone(); + let (stale_rx, _) = pending.register(stale, "run-1".to_string()).await; + let _rx2 = pending.register(other_cmd, "run-1".to_string()).await; + let _rx3 = pending.register(other_run, "run-2".to_string()).await; + + let taken = pending.take_superseded("run-1", "cmd").await; + assert_eq!(taken.len(), 1); + assert_eq!(taken[0].request_id, stale_id); + assert_eq!( + taken[0].remaining, 2, + "two unrelated entries must remain counted" + ); + + // Supersede is explicit; closed channels remain cancellation/teardown. + assert!(matches!( + stale_rx.await, + Ok(PendingApprovalOutcome::Superseded) + )); + + // Unrelated entries are untouched. + assert!(pending.take(&other_cmd_id).await.is_some()); + assert!(pending.take(&other_run_id).await.is_some()); + } + + #[tokio::test] + async fn take_superseded_returns_empty_when_nothing_matches() { + let pending = PendingApprovals::new(); + let req = fake_request(Some("ws-1")); + let id = req.request_id.clone(); + let _rx = pending.register(req, "run-1".to_string()).await; + + assert!(pending.take_superseded("run-9", "cmd").await.is_empty()); + assert!(pending.take_superseded("run-1", "other").await.is_empty()); + assert!(pending.take(&id).await.is_some(), "entry must survive"); + } + + #[tokio::test] + async fn purge_workspace_cancels_run_before_dropping_sender() { + let pending = PendingApprovals::new(); + let mut rx = pending + .register(fake_request(Some("ws-1")), "run-1".to_string()) + .await + .0; + + let run_ids = pending + .purge_workspace_canceling_runs("ws-1", |run_id| { + assert_eq!(run_id, "run-1"); + assert!(matches!( + rx.try_recv(), + Err(oneshot::error::TryRecvError::Empty) + )); + }) + .await; + + assert_eq!(run_ids, vec!["run-1".to_string()]); + assert!( + rx.await.is_err(), + "purge drops the sender after cancellation" + ); + } } diff --git a/src-tauri/src/commands/workspace.rs b/src-tauri/src/commands/workspace.rs index 9ef7f2c..e1edaee 100644 --- a/src-tauri/src/commands/workspace.rs +++ b/src-tauri/src/commands/workspace.rs @@ -2486,21 +2486,35 @@ pub async fn workspace_delete( })?; } - // Drain in-memory queues for this workspace. Pending bash-tool / - // path-grant approvals get their oneshot channels dropped, which - // surfaces as a closed-channel error on the awaiting side — correct, - // since the workspace they were scoped to no longer exists. - let purged_approvals = state.pending_approvals.purge_workspace(&workspace_id).await; + // Drain in-memory queues for this workspace. The purge helpers cancel + // runs before dropping pending senders, so workspace deletion cannot be + // mistaken for an interactive-request supersede. + let purged_approvals = state + .pending_approvals + .purge_workspace_canceling_runs(&workspace_id, |run_id| { + let _ = crate::assistant::runtime::cancel_run(run_id); + }) + .await; let purged_path_grants = state .pending_path_grants - .purge_workspace(&workspace_id) + .purge_workspace_canceling_runs(&workspace_id, |run_id| { + let _ = crate::assistant::runtime::cancel_run(run_id); + }) .await; + let mut purged_run_ids: Vec = purged_approvals + .iter() + .chain(purged_path_grants.iter()) + .cloned() + .collect(); + purged_run_ids.sort(); + purged_run_ids.dedup(); tracing::info!( workspace_id = %workspace_id, agents_cleared = agent_ids.len(), - approvals_purged = purged_approvals, - path_grants_purged = purged_path_grants, + approvals_purged = purged_approvals.len(), + path_grants_purged = purged_path_grants.len(), + runs_cancelled = purged_run_ids.len(), "Deleted general workspace" ); diff --git a/src/components/InlineApprovalCard.tsx b/src/components/InlineApprovalCard.tsx index 5693acb..70c5800 100644 --- a/src/components/InlineApprovalCard.tsx +++ b/src/components/InlineApprovalCard.tsx @@ -8,9 +8,9 @@ import type { PermissionRequest, SegmentDecision } from '../generated/bindings'; import styles from './InlineApprovalCard.module.css'; const PERMISSION_REQUEST_EVENT = 'permissions://request'; -// Backend cleared a pending request without a user decision (the tool call -// was abandoned — CLI transport dropped mid-call — or it timed out). Drop -// the now-useless card. Payload: { requestId }. +// Backend cleared a pending request without a user decision (the run was +// cancelled or ended, the wait timed out, or a re-asked command superseded +// it). Drop the now-useless card. Payload: { requestId }. const PERMISSION_RESOLVED_EVENT = 'permissions://resolved'; // Local UI alias matching the discriminator on SegmentDecision. Kept diff --git a/src/components/InlinePathGrantCard.tsx b/src/components/InlinePathGrantCard.tsx index 4c958c4..302bdb6 100644 --- a/src/components/InlinePathGrantCard.tsx +++ b/src/components/InlinePathGrantCard.tsx @@ -12,8 +12,9 @@ import type { import styles from './InlinePathGrantCard.module.css'; const PATH_GRANT_REQUEST_EVENT = 'path-grants://request'; -// Backend cleared a pending grant without a user decision (tool call -// abandoned — CLI transport dropped — or timed out). Drop the stale card. +// Backend cleared a pending grant without a user decision (run cancelled +// or ended, wait timed out, or a re-asked grant superseded it). Drop the +// stale card. const PATH_GRANT_RESOLVED_EVENT = 'path-grants://resolved'; type PathAccess = FilesystemPathAccess;