From affc483a20434e6f366bc2315fd8b5f57c295364 Mon Sep 17 00:00:00 2001 From: juan <2930882+juacker@users.noreply.github.com> Date: Fri, 12 Jun 2026 10:36:06 +0200 Subject: [PATCH 1/3] feat(prompt): guide re-asking interactive tools after a transport drop The local MCP transport can drop an in-flight call (the model sees "transport dropped mid-call; response for tool was lost"). For tools that block on a user grant or answer (ask_user, approval-gated bash_exec / fs_request_grant) the outcome is then unknown and a stale approval/question card can linger in the UI, so the model must re-issue the same interactive call rather than assume an answer or proceed. Add a scoped 'Interactive Tool Reliability' section to the system prompt, emitted only when such a tool is present, plus tests for presence/absence. --- src-tauri/src/assistant/engine.rs | 60 +++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/src-tauri/src/assistant/engine.rs b/src-tauri/src/assistant/engine.rs index af5d5c6..32f33c7 100644 --- a/src-tauri/src/assistant/engine.rs +++ b/src-tauri/src/assistant/engine.rs @@ -903,6 +903,28 @@ pub(crate) fn build_system_prompt( - Be concise and direct in your responses. Prefer concrete actions and evidence over vague summaries.\n", ); + // Transport-drop recovery for grant/response-blocking tools. The local MCP + // transport can drop an in-flight call (surfaced to the model as + // "transport dropped mid-call; response for tool was lost"). For a + // tool that blocks on a user grant or answer, the outcome is then unknown + // AND a now-stale approval/question card can linger in the UI, so the model + // must re-ask rather than assume an answer or proceed. Scoped to sessions + // that actually expose such a tool; ordinary read/write tools, which can be + // retried without side effects, need no special handling. + let has_interactive_tool = tool_names + .iter() + .any(|n| matches!(*n, "ask_user" | "bash_exec" | "fs_request_grant")); + if has_interactive_tool { + prompt.push_str( + "\n## Interactive Tool Reliability\n\ + A tool call can occasionally fail with a transport error such as `MCP server \"clai\" transport dropped mid-call; response for tool was lost`. This means CLAI lost the in-flight call before its result reached you, so the call's outcome is UNKNOWN — it may or may not have run.\n\ + - This matters specifically for tools that block on a user grant or response — `ask_user`, and approval-gated `bash_exec` / `fs_request_grant`. When one of these drops mid-call, the user may never have answered, or they answered but the decision was lost, and a now-stale approval/question card may still be visible in the app.\n\ + - When it happens, re-issue the SAME interactive call once so the user gets a fresh, answerable prompt. Do NOT assume it was approved, denied, or answered, and do NOT proceed past it on the strength of the lost call.\n\ + - Briefly tell the user you hit a transport drop and are re-requesting; they can dismiss any duplicate or stale permission card.\n\ + - For non-interactive tools (reads, searches, writes), a transport drop needs no special handling — just retry normally if you still need the result.\n", + ); + } + if context.space_id.is_some() || !context.mcp_server_ids.is_empty() { prompt.push_str( "- This tab already carries session-specific context and capabilities. \ @@ -1427,6 +1449,44 @@ mod tests { assert!(text.contains("re-run the relevant tools if freshness matters.")); } + #[test] + fn build_system_prompt_adds_interactive_reliability_guidance_when_blocking_tool_present() { + let context = SessionContext::default(); + let tools = [crate::assistant::types::ToolDefinition { + name: "ask_user".to_string(), + description: "desc".to_string(), + input_schema: serde_json::json!({}), + }]; + + let message = build_system_prompt(&context, None, &tools, &RunTrigger::UserMessage); + let text = match &message.content[0] { + ContentPart::Text { text } => text, + other => panic!("expected text content, got {:?}", other), + }; + + assert!(text.contains("## Interactive Tool Reliability")); + assert!(text.contains("transport dropped mid-call")); + assert!(text.contains("re-issue the SAME interactive call once")); + } + + #[test] + fn build_system_prompt_omits_interactive_reliability_guidance_without_blocking_tool() { + let context = SessionContext::default(); + let tools = [crate::assistant::types::ToolDefinition { + name: "fs_read".to_string(), + description: "desc".to_string(), + input_schema: serde_json::json!({}), + }]; + + let message = build_system_prompt(&context, None, &tools, &RunTrigger::UserMessage); + let text = match &message.content[0] { + ContentPart::Text { text } => text, + other => panic!("expected text content, got {:?}", other), + }; + + assert!(!text.contains("## Interactive Tool Reliability")); + } + #[test] fn build_system_prompt_describes_autonomous_run_mode() { let context = SessionContext { From e8ec4d380553587e023e4c24dcf239d0031527b4 Mon Sep 17 00:00:00 2001 From: juan <2930882+juacker@users.noreply.github.com> Date: Fri, 12 Jun 2026 11:03:35 +0200 Subject: [PATCH 2/3] =?UTF-8?q?fix(mcp):=20root-fix=20interactive-tool=20t?= =?UTF-8?q?ransport=20drops=20=E2=80=94=20supersede,=20reap,=20keep-alive?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A CLI transport drop used to orphan in-flight interactive waits (ask_user, bash approvals, path grants): the rmcp stateful session worker keeps the tool future alive past the dropped connection, so the abandoned-wait guards never fired, pending registry entries (and their UI cards) lingered until the 55-min interactive timeout, and a re-asked command stacked a duplicate card next to the stale one. Three root fixes: - Supersede on re-ask: registering a bash approval (same run + command), path grant (same run + path + access), or ask_user (same session) takes the stale pending entry, emits resolved/attention so the old card is replaced in place, and wakes the orphaned wait with a channel-closed error that it now treats as superseded (disarm, never cancel the live run). - Run-end reaping: BindingGuard owns a run-scope CancellationToken, cancelled on drop; execute_bound_tool races it, so orphaned tool futures are dropped at run end and their RAII cleanup guards finally fire as documented. - Keep rmcp's default sse_keep_alive (15s) instead of disabling it; idle unpinged response streams during long human waits are exactly what rotted into transport drops. The None came from the pre-1.7 struct literal, not a decision. Workspace deletion now cancels the runs whose approvals it purges (purge_workspace returns run ids), since the awaiting side no longer treats a closed channel as cancel-the-run. The Interactive Tool Reliability prompt section drops the 'user can dismiss the stale card' caveat: re-asking now replaces the card automatically, so the model just re-issues the same call. --- src-tauri/src/assistant/engine.rs | 22 +-- src-tauri/src/assistant/local_mcp.rs | 166 ++++++++++++++++++++-- src-tauri/src/assistant/tools/ask_user.rs | 117 ++++++++++++++- src-tauri/src/assistant/tools/local.rs | 87 ++++++++++-- src-tauri/src/commands/path_grants.rs | 124 ++++++++++++++-- src-tauri/src/commands/permissions.rs | 139 +++++++++++++++--- src-tauri/src/commands/workspace.rs | 24 +++- src/components/InlineApprovalCard.tsx | 6 +- src/components/InlinePathGrantCard.tsx | 5 +- 9 files changed, 612 insertions(+), 78 deletions(-) diff --git a/src-tauri/src/assistant/engine.rs b/src-tauri/src/assistant/engine.rs index 32f33c7..2f3d703 100644 --- a/src-tauri/src/assistant/engine.rs +++ b/src-tauri/src/assistant/engine.rs @@ -906,11 +906,13 @@ pub(crate) fn build_system_prompt( // Transport-drop recovery for grant/response-blocking tools. The local MCP // transport can drop an in-flight call (surfaced to the model as // "transport dropped mid-call; response for tool was lost"). For a - // tool that blocks on a user grant or answer, the outcome is then unknown - // AND a now-stale approval/question card can linger in the UI, so the model - // must re-ask rather than assume an answer or proceed. Scoped to sessions - // that actually expose such a tool; ordinary read/write tools, which can be - // retried without side effects, need no special handling. + // tool that blocks on a user grant or answer, the outcome is then unknown, + // so the model must re-ask rather than assume an answer or proceed. The + // backend treats the re-asked call as superseding the orphaned one (the + // stale approval/question card is replaced in place), so no UI caveats are + // needed here. Scoped to sessions that actually expose such a tool; + // ordinary read/write tools, which can be retried without side effects, + // need no special handling. let has_interactive_tool = tool_names .iter() .any(|n| matches!(*n, "ask_user" | "bash_exec" | "fs_request_grant")); @@ -918,9 +920,8 @@ pub(crate) fn build_system_prompt( prompt.push_str( "\n## Interactive Tool Reliability\n\ A tool call can occasionally fail with a transport error such as `MCP server \"clai\" transport dropped mid-call; response for tool was lost`. This means CLAI lost the in-flight call before its result reached you, so the call's outcome is UNKNOWN — it may or may not have run.\n\ - - This matters specifically for tools that block on a user grant or response — `ask_user`, and approval-gated `bash_exec` / `fs_request_grant`. When one of these drops mid-call, the user may never have answered, or they answered but the decision was lost, and a now-stale approval/question card may still be visible in the app.\n\ - - When it happens, re-issue the SAME interactive call once so the user gets a fresh, answerable prompt. Do NOT assume it was approved, denied, or answered, and do NOT proceed past it on the strength of the lost call.\n\ - - Briefly tell the user you hit a transport drop and are re-requesting; they can dismiss any duplicate or stale permission card.\n\ + - This matters specifically for tools that block on a user grant or response — `ask_user`, and approval-gated `bash_exec` / `fs_request_grant`. When one of these drops mid-call, the user may never have answered, or they answered but the decision was lost.\n\ + - When it happens, re-issue the SAME interactive call once. CLAI replaces the lost prompt with the fresh one in the app, so the user simply answers the new prompt. Do NOT assume the lost call was approved, denied, or answered, and do NOT proceed past it.\n\ - For non-interactive tools (reads, searches, writes), a transport drop needs no special handling — just retry normally if you still need the result.\n", ); } @@ -1467,6 +1468,11 @@ mod tests { assert!(text.contains("## Interactive Tool Reliability")); assert!(text.contains("transport dropped mid-call")); assert!(text.contains("re-issue the SAME interactive call once")); + // The backend supersedes the orphaned request when the model + // re-asks, so the prompt must NOT push stale-card caveats (e.g. + // telling the user to dismiss duplicates) onto the model. + assert!(text.contains("replaces the lost prompt with the fresh one")); + assert!(!text.contains("dismiss")); } #[test] diff --git a/src-tauri/src/assistant/local_mcp.rs b/src-tauri/src/assistant/local_mcp.rs index 96d8099..7eeaed8 100644 --- a/src-tauri/src/assistant/local_mcp.rs +++ b/src-tauri/src/assistant/local_mcp.rs @@ -42,7 +42,20 @@ pub struct LocalMcpRuntime { // whichever workspace happened to bind first. app: AppHandle, cancellation_token: CancellationToken, - bindings: Arc>>, + bindings: Arc>>, +} + +/// A registered run binding plus the run-scope token that reaps its +/// in-flight tool calls. [`BindingGuard::drop`] cancels `run_scope`, so +/// any tool future still racing in [`execute_bound_tool`]'s `select!` +/// when the run ends (e.g. an interactive wait orphaned by a CLI +/// transport drop) is dropped instead of lingering on the rmcp session +/// worker until its own timeout. Dropping those futures fires their +/// cleanup guards (pending-registry removal + `resolved` UI events). +#[derive(Clone)] +struct BoundRun { + binding: ToolBinding, + run_scope: CancellationToken, } #[derive(Clone)] @@ -89,6 +102,7 @@ impl LocalMcpRuntime { /// bindings map. pub fn bind_run(&self, binding: ToolBinding) -> BindingGuard { let token = Uuid::new_v4().to_string(); + let run_scope = CancellationToken::new(); // Bindings map only ever holds short, await-free critical sections, // so `std::sync::RwLock` is fine and lets `Drop` clean up sync. // A poisoned lock means the binding map is unusable; we'd rather @@ -96,17 +110,24 @@ impl LocalMcpRuntime { self.bindings .write() .expect("local MCP binding map poisoned") - .insert(token.clone(), binding); + .insert( + token.clone(), + BoundRun { + binding, + run_scope: run_scope.clone(), + }, + ); BindingGuard { bindings: self.bindings.clone(), token, + run_scope, } } fn binding_from_request( &self, context: &RequestContext, - ) -> Result { + ) -> Result { let token = bearer_token(context).ok_or_else(|| { McpError::invalid_request("missing bearer token for CLAI MCP request", None) })?; @@ -125,9 +146,18 @@ impl LocalMcpRuntime { /// token while alive and removes it from the runtime on drop, so a panic /// or early return between bind and the end of a run cannot leak a stale /// binding into the process-singleton MCP server. +/// +/// Dropping the guard also cancels the binding's run-scope token, which +/// reaps any tool call still in flight on the rmcp session worker. This +/// matters for interactive waits (`ask_user`, bash approvals, path +/// grants) orphaned by a CLI transport drop: the worker keeps their +/// futures alive past the dropped connection, so without this reap they +/// would pin pending approval entries (and their UI cards) until their +/// own multi-minute timeout. pub struct BindingGuard { - bindings: Arc>>, + bindings: Arc>>, token: String, + run_scope: CancellationToken, } impl BindingGuard { @@ -138,6 +168,10 @@ impl BindingGuard { impl Drop for BindingGuard { fn drop(&mut self) { + // Reap in-flight tool calls for this run BEFORE unbinding, so a + // racing request can't observe the binding gone while its + // already-running future survives the run. + self.run_scope.cancel(); if let Ok(mut bindings) = self.bindings.write() { bindings.remove(&self.token); } @@ -186,7 +220,11 @@ pub async fn ensure_started(app: &AppHandle) -> Result, Str // 127.0.0.1 bind, port-agnostically. let mut config = StreamableHttpServerConfig::default(); config.stateful_mode = true; - config.sse_keep_alive = None; + // Keep rmcp's default sse_keep_alive (15s pings). The + // response stream for an interactive tool call sits idle + // for as long as a human takes to answer, and an unpinged + // idle connection is exactly what rots into "transport + // dropped mid-call; response for tool was lost". config.cancellation_token = cancellation_token.child_token(); config }, @@ -234,7 +272,7 @@ impl ServerHandler for ClaiMcpService { context: RequestContext, ) -> impl Future> + Send + '_ { async move { - let binding = self.runtime.binding_from_request(&context)?; + let binding = self.runtime.binding_from_request(&context)?.binding; let session = repository::get_session(&binding.pool, &binding.session_id) .await .map_err(|e| McpError::internal_error(e, None))? @@ -262,14 +300,22 @@ impl ServerHandler for ClaiMcpService { context: RequestContext, ) -> impl Future> + Send + '_ { async move { - let binding = self.runtime.binding_from_request(&context)?; + let bound = self.runtime.binding_from_request(&context)?; let tool_name = request.name.to_string(); let params = request .arguments .map(serde_json::Value::Object) .unwrap_or(serde_json::Value::Object(Default::default())); - match execute_bound_tool(&self.runtime.app, &binding, &tool_name, params).await { + match execute_bound_tool( + &self.runtime.app, + &bound.binding, + &bound.run_scope, + &tool_name, + params, + ) + .await + { Ok(value) => Ok(CallToolResult::structured(value)), Err(error) => Ok(CallToolResult::structured_error(serde_json::json!({ "error": error, @@ -292,6 +338,7 @@ impl ServerHandler for ClaiMcpService { async fn execute_bound_tool( app: &AppHandle, binding: &ToolBinding, + run_scope: &CancellationToken, tool_name: &str, params: serde_json::Value, ) -> Result { @@ -347,6 +394,13 @@ async fn execute_bound_tool( tokio::select! { _ = binding.cancel_token.cancelled() => Err("run cancelled".to_string()), + // Run-scope reap: fires when `BindingGuard` drops at the end of the + // run. A live CLI always waits for its tool results before ending + // its turn, so the only futures still here at that point are + // orphans whose response stream was lost to a transport drop. + // Dropping them fires their cleanup guards (pending-approval + // removal, `resolved` UI events). + _ = run_scope.cancelled() => Err("run ended before this tool call completed".to_string()), result = tools::execute_tool(&deps, &tool_context, tool_name, params) => result, } } @@ -380,3 +434,99 @@ fn bearer_token(context: &RequestContext) -> Option { .filter(|value| !value.is_empty()) .map(str::to_string) } + +#[cfg(test)] +mod tests { + use super::*; + + fn test_binding() -> ToolBinding { + ToolBinding { + pool: sqlx::Pool::connect_lazy("sqlite::memory:").expect("lazy pool"), + session_id: "session-1".to_string(), + run_id: "run-1".to_string(), + cancel_token: CancellationToken::new(), + inter_agent_call_depth: None, + notices: Arc::new(Mutex::new(Vec::new())), + session_grants: Arc::new(Mutex::new(Vec::new())), + session_allowed_command_prefixes: Arc::new(Mutex::new(Vec::new())), + session_blocked_command_prefixes: Arc::new(Mutex::new(Vec::new())), + } + } + + /// Dropping the guard must cancel the run scope (reaping in-flight + /// tool futures racing on it in `execute_bound_tool`) and unbind the + /// bearer token — while leaving the run's own cancel token alone, so + /// reaping orphans at normal run end is not a run cancellation. + #[tokio::test] // tokio: the lazy sqlx pool requires a runtime context on drop + async fn binding_guard_drop_cancels_run_scope_and_unbinds() { + let bindings: Arc>> = + Arc::new(RwLock::new(HashMap::new())); + let binding = test_binding(); + let run_cancel = binding.cancel_token.clone(); + let run_scope = CancellationToken::new(); + bindings.write().unwrap().insert( + "token-1".to_string(), + BoundRun { + binding, + run_scope: run_scope.clone(), + }, + ); + let guard = BindingGuard { + bindings: bindings.clone(), + token: "token-1".to_string(), + run_scope: run_scope.clone(), + }; + + assert!(!run_scope.is_cancelled()); + drop(guard); + + assert!( + run_scope.is_cancelled(), + "guard drop must reap in-flight tool calls via the run scope" + ); + assert!( + bindings.read().unwrap().is_empty(), + "guard drop must unbind the bearer token" + ); + assert!( + !run_cancel.is_cancelled(), + "reaping at run end must not look like a run cancellation" + ); + } + + /// The reap arm in `execute_bound_tool` must drop the racing tool + /// future (firing its cleanup guards), not merely resolve alongside it. + #[tokio::test] + async fn run_scope_cancel_drops_in_flight_future() { + struct DropFlag(Arc>); + impl Drop for DropFlag { + fn drop(&mut self) { + *self.0.lock().unwrap() = true; + } + } + + let dropped = Arc::new(Mutex::new(false)); + let flag = DropFlag(dropped.clone()); + let run_scope = CancellationToken::new(); + let scope = run_scope.clone(); + + let task = tokio::spawn(async move { + let _flag = flag; // owned by the racing future, dropped with it + tokio::select! { + _ = scope.cancelled() => Err::<(), String>("run ended before this tool call completed".to_string()), + _ = std::future::pending::<()>() => Ok(()), // never resolves, like an unanswered approval + } + }); + + run_scope.cancel(); + let result = task.await.expect("select task must not panic"); + assert_eq!( + result.unwrap_err(), + "run ended before this tool call completed" + ); + assert!( + *dropped.lock().unwrap(), + "the in-flight future must be dropped so its RAII cleanup guards fire" + ); + } +} diff --git a/src-tauri/src/assistant/tools/ask_user.rs b/src-tauri/src/assistant/tools/ask_user.rs index 73b4ee0..7263887 100644 --- a/src-tauri/src/assistant/tools/ask_user.rs +++ b/src-tauri/src/assistant/tools/ask_user.rs @@ -128,11 +128,35 @@ pub fn submit_answer(pending_id: &str, answer: AskUserAnswer) -> Result<(), Stri .map_err(|_| "ask_user receiver was dropped (run already ended)".to_string()) } +/// Removes every pending ask for `session_id`, returning the removed +/// pending ids. Called when a new ask is registered for the session: the +/// frontend renders a single ask panel per session, so a still-pending +/// ask at registration time is an orphan (its CLI transport dropped +/// mid-call and the model is re-asking). Dropping the removed senders +/// wakes the orphaned waits with a channel-closed error, which they +/// treat as superseded (no run cancellation). +pub fn take_for_session(session_id: &str) -> Vec { + let Ok(mut map) = pending_map().lock() else { + return Vec::new(); + }; + let ids: Vec = map + .iter() + .filter(|(_, (owner, _))| owner == session_id) + .map(|(id, _)| id.clone()) + .collect(); + for id in &ids { + map.remove(id); + } + ids +} + /// RAII guard: removes the pending entry if the tool future is dropped -/// before the channel resolves (run cancellation, CLI MCP timeout, or -/// abandoned transport). In that case the answer panel cannot resume the -/// original tool call, so clear the UI and cancel the run rather than -/// letting the model continue without the requested human answer. +/// before the channel resolves — run cancellation, or run-end reaping of +/// a wait orphaned by a CLI transport drop (`BindingGuard` cancels the +/// run scope, dropping this future). In that case the answer panel +/// cannot resume the original tool call, so clear the UI and cancel the +/// run rather than letting the model continue without the requested +/// human answer (a no-op when the run already ended). struct PendingGuard { id: String, app: tauri::AppHandle, @@ -191,6 +215,22 @@ pub async fn execute( .await? .ok_or_else(|| format!("Session not found: {}", context.session_id))?; + // Supersede any orphaned ask for this session (the model re-asking + // after a CLI transport drop lost the original call). The frontend + // keeps one ask panel per session, so the fresh question below + // replaces the stale one; resolving the old ids also unblocks + // `session_has_pending_ask` for mid-run input delivery. + for stale_id in take_for_session(&session.id) { + let _ = emit_event( + &deps.app, + &session, + Some(context.run_id.as_str()), + AssistantUiEvent::AskUserResolved { + pending_id: stale_id, + }, + ); + } + let pending_id = Uuid::new_v4().to_string(); let (tx, rx) = oneshot::channel::(); { @@ -224,7 +264,17 @@ pub async fn execute( let wait_timeout = context.interactive_wait_timeout(ASK_USER_TIMEOUT); let answer = match tokio::time::timeout(wait_timeout, rx).await { Ok(Ok(answer)) => answer, - Ok(Err(_)) => return Err("ask_user channel closed (sender dropped)".to_string()), + Ok(Err(_)) => { + // Sender dropped without an answer: a newer ask for this + // session superseded this one (transport-drop re-ask). The + // superseding call already cleaned the map and the UI, and + // the run is alive waiting on the NEW question — disarm + // rather than cancel the run. + guard.disarm(); + return Err( + "ask_user was superseded by a newer question before an answer arrived".to_string(), + ); + } Err(_) => { return Err(format!( "ask_user timed out waiting for a user answer after {} seconds", @@ -325,4 +375,61 @@ mod tests { let labels: Vec<&str> = result.iter().map(|o| o.label.as_str()).collect(); assert_eq!(labels, vec!["Other option", "None of the above"]); } + + // PENDING is a process-wide static shared across parallel tests, so + // each test uses unique session ids to stay isolated. + + #[test] + fn take_for_session_removes_only_that_sessions_asks_and_closes_channels() { + let (tx_a, rx_a) = oneshot::channel::(); + let (tx_b, mut rx_b) = oneshot::channel::(); + { + let mut map = pending_map().lock().unwrap(); + map.insert( + "tfs-pending-a".to_string(), + ("tfs-session-a".to_string(), tx_a), + ); + map.insert( + "tfs-pending-b".to_string(), + ("tfs-session-b".to_string(), tx_b), + ); + } + + let removed = take_for_session("tfs-session-a"); + assert_eq!(removed, vec!["tfs-pending-a".to_string()]); + assert!(!session_has_pending_ask("tfs-session-a")); + assert!(session_has_pending_ask("tfs-session-b")); + + // The removed sender is dropped → the orphaned wait wakes with a + // channel-closed error (the supersede signal)... + assert!(rx_a.blocking_recv().is_err()); + // ...while the other session's channel stays open. + assert!(matches!( + rx_b.try_recv(), + Err(oneshot::error::TryRecvError::Empty) + )); + + // Cleanup so other tests see an empty map for these ids. + take_for_session("tfs-session-b"); + } + + #[test] + fn submit_answer_fails_for_superseded_pending_id() { + let (tx, _rx) = oneshot::channel::(); + pending_map() + .lock() + .unwrap() + .insert("sup-pending".to_string(), ("sup-session".to_string(), tx)); + take_for_session("sup-session"); + + let result = submit_answer( + "sup-pending", + AskUserAnswer { + text: "late".to_string(), + selected_option_index: None, + selected_option_indexes: None, + }, + ); + assert!(result.is_err(), "superseded ask must not accept answers"); + } } diff --git a/src-tauri/src/assistant/tools/local.rs b/src-tauri/src/assistant/tools/local.rs index 1fe1750..5f2143c 100644 --- a/src-tauri/src/assistant/tools/local.rs +++ b/src-tauri/src/assistant/tools/local.rs @@ -1194,13 +1194,17 @@ fn is_pure_assignment_token(tok: &str) -> bool { } /// Cleans up an abandoned permission request when the approval-wait future -/// is dropped without a user decision — the Claude Code CLI dropping the MCP -/// transport mid-call (its "response for tool bash_exec was lost" message), -/// or the run being cancelled. In either case the awaiting future below is -/// dropped, so this guard runs: it removes the still-pending registry entry, -/// tells the frontend to drop the now-useless approval card, and cancels the -/// run so the model cannot continue without the missing decision. Disarmed on -/// a normal decision, where the submit command already removed the entry. +/// is dropped without a user decision — the run being cancelled, or the +/// run ending while this wait was orphaned by a CLI transport drop (the +/// rmcp session worker keeps the future alive past the dropped +/// connection; `BindingGuard` reaps it at run end, which drops this +/// future). The guard then removes the still-pending registry entry, +/// tells the frontend to drop the now-useless approval card, and cancels +/// the run so the model cannot continue without the missing decision +/// (a no-op when the run already ended). Disarmed on a normal decision, +/// where the submit command already removed the entry, and on a +/// channel-closed wakeup (supersede), where the superseding caller +/// already cleaned up and the run must stay alive. /// /// Cleanup is async (registry lock) so it's spawned onto the app runtime; /// `Drop` can't await. `take` is a no-op if the entry was already removed @@ -1277,7 +1281,30 @@ async fn await_user_permission( }; let request_id = request.request_id.clone(); - let (rx, count) = app_state.pending_approvals.register(request.clone()).await; + // Supersede: if a previous request for this exact run + command is + // still pending, it is an orphan — its CLI transport dropped mid-call + // and the model is now re-asking. Replace it (and its UI card) with + // the fresh request instead of stacking a duplicate the user can no + // longer meaningfully answer. Dropping the stale entry's sender wakes + // the orphaned wait with a channel-closed error, which it treats as + // superseded (no run cancellation — this run is alive and waiting on + // the NEW request). + for (stale, remaining) in app_state + .pending_approvals + .take_superseded(&context.run_id, command) + .await + { + crate::commands::permissions::emit_permission_resolved( + &deps.app, + &stale.request.request_id, + ); + emit_attention(&deps.app, stale.workspace_id.clone(), remaining); + } + + let (rx, count) = app_state + .pending_approvals + .register(request.clone(), context.run_id.clone()) + .await; if let Err(e) = deps.app.emit(PERMISSION_REQUEST_EVENT, &request) { tracing::warn!("Failed to emit permission request event: {}", e); @@ -1305,7 +1332,15 @@ async fn await_user_permission( d } Ok(Err(_)) => { - let msg = "Permission approval channel closed before a decision was made"; + // The sender was dropped without a decision: this request was + // superseded by a fresh registration for the same run + command + // (the model re-asked after a transport drop), or app state is + // tearing down. Either way the registry entry is already gone + // and the run may be live, waiting on the NEW request — so + // disarm the guard rather than cancel the run. + abandon_guard.disarm(); + let msg = "Permission request was superseded by a newer request \ + for the same command before a decision was made"; context.add_notice(RunNoticeKind::CommandDenied, msg.to_string()); return Err(msg.to_string()); } @@ -1528,8 +1563,8 @@ fn access_to_str(access: FilesystemPathAccess) -> &'static str { /// return shape (path grants aren't per-segment). /// Path-grant analogue of [`AbandonedApprovalGuard`]: clears a pending /// filesystem path-grant request, drops its card, and cancels the run when -/// the approval-wait future is abandoned (CLI transport drop mid-call, or -/// run cancellation). +/// the approval-wait future is abandoned (run cancellation, or run-end +/// reaping of a wait orphaned by a CLI transport drop). struct AbandonedPathGrantGuard { app: tauri::AppHandle, cancel_token: tokio_util::sync::CancellationToken, @@ -1578,9 +1613,28 @@ async fn await_path_grant_decision( let workspace_id = context.workspace_id.clone(); let request_id = request.request_id.clone(); + // Supersede a stale orphaned request for the same run + path + access + // (the model re-asked after a CLI transport drop). See the analogous + // block in `await_user_permission` for the full rationale. + for (stale, remaining) in app_state + .pending_path_grants + .take_superseded( + &context.run_id, + &request.requested_path, + request.requested_access, + ) + .await + { + crate::commands::path_grants::emit_path_grant_resolved( + &deps.app, + &stale.request.request_id, + ); + emit_attention(&deps.app, stale.workspace_id.clone(), remaining); + } + let (rx, count) = app_state .pending_path_grants - .register(request.clone()) + .register(request.clone(), context.run_id.clone()) .await; if let Err(e) = deps.app.emit(PATH_GRANT_REQUEST_EVENT, &request) { @@ -1606,7 +1660,14 @@ async fn await_path_grant_decision( Ok(decision) } Ok(Err(_)) => { - let msg = "Path-grant approval channel closed before a decision was made".to_string(); + // Sender dropped without a decision: superseded by a fresh + // registration for the same run + path + access, or app + // teardown. The superseding caller already cleaned up and the + // run may be live on the NEW request — disarm, don't cancel. + abandon_guard.disarm(); + let msg = "Path-grant request was superseded by a newer request \ + for the same path before a decision was made" + .to_string(); context.add_notice(RunNoticeKind::PathGrantDenied, msg.clone()); Err(msg) } diff --git a/src-tauri/src/commands/path_grants.rs b/src-tauri/src/commands/path_grants.rs index bbb6440..68734d8 100644 --- a/src-tauri/src/commands/path_grants.rs +++ b/src-tauri/src/commands/path_grants.rs @@ -42,10 +42,11 @@ use crate::AppState; pub const PATH_GRANT_REQUEST_EVENT: &str = "path-grants://request"; pub const PATH_GRANT_ATTENTION_EVENT: &str = "path-grants://attention"; /// Emitted when a pending path-grant request is cleared *without* a user -/// decision — the tool call was abandoned (CLI transport dropped mid-call, -/// run cancelled) or it timed out. The inline path-grant card removes the -/// now-useless card on this. Normal submissions clear the card optimistically -/// on the frontend, so they don't emit this. +/// decision — the run was cancelled or ended (reaping a wait orphaned by +/// a CLI transport drop), the wait timed out, or a re-asked grant +/// superseded the stale request. The inline path-grant card removes the +/// now-useless card on this. Normal submissions clear the card +/// optimistically on the frontend, so they don't emit this. pub const PATH_GRANT_RESOLVED_EVENT: &str = "path-grants://resolved"; /// Same bound as the command-approval flow: 24h is generous enough that @@ -123,6 +124,11 @@ pub struct PendingEntry { pub sender: oneshot::Sender, pub workspace_id: Option, pub agent_id: Option, + /// The run awaiting this decision. Used by + /// [`PendingPathGrants::take_superseded`] so a re-asked grant (after a + /// CLI transport drop orphaned the original request) replaces the + /// stale entry instead of stacking a duplicate card. + pub run_id: String, pub request: PathGrantRequest, } @@ -139,6 +145,7 @@ impl PendingPathGrants { pub async fn register( &self, request: PathGrantRequest, + run_id: String, ) -> (oneshot::Receiver, u32) { let (tx, rx) = oneshot::channel(); let mut inner = self.inner.lock().await; @@ -151,6 +158,7 @@ impl PendingPathGrants { sender: tx, workspace_id: workspace_id.clone(), agent_id, + run_id, request, }, ); @@ -184,8 +192,10 @@ impl PendingPathGrants { /// See [`crate::commands::permissions::PendingApprovals::purge_workspace`]. /// Same semantics — drops every pending path-grant request for the - /// given workspace and clears its count. Used by `workspace_delete`. - pub async fn purge_workspace(&self, workspace_id: &str) -> usize { + /// given workspace, clears its count, and returns the run ids that + /// were awaiting the dropped entries so the caller can cancel those + /// runs. Used by `workspace_delete`. + pub async fn purge_workspace(&self, workspace_id: &str) -> Vec { let mut inner = self.inner.lock().await; let to_remove: Vec = inner .entries @@ -193,12 +203,14 @@ impl PendingPathGrants { .filter(|(_, entry)| entry.workspace_id.as_deref() == Some(workspace_id)) .map(|(id, _)| id.clone()) .collect(); - let count = to_remove.len(); + let mut run_ids = Vec::with_capacity(to_remove.len()); for id in to_remove { - inner.entries.remove(&id); + if let Some(entry) = inner.entries.remove(&id) { + run_ids.push(entry.run_id); + } } inner.counts.remove(&Some(workspace_id.to_string())); - count + run_ids } pub async fn take(&self, request_id: &str) -> Option<(PendingEntry, u32)> { @@ -217,6 +229,49 @@ impl PendingPathGrants { }; Some((entry, count)) } + + /// See [`crate::commands::permissions::PendingApprovals::take_superseded`]. + /// Same semantics for path grants, keyed on run + requested path + + /// requested access: a fresh registration for the same grant replaces + /// any stale orphaned entry (and its UI card) instead of stacking a + /// duplicate. + pub async fn take_superseded( + &self, + run_id: &str, + requested_path: &str, + requested_access: crate::config::FilesystemPathAccess, + ) -> Vec<(PendingEntry, u32)> { + let mut inner = self.inner.lock().await; + let ids: Vec = inner + .entries + .iter() + .filter(|(_, entry)| { + entry.run_id == run_id + && entry.request.requested_path == requested_path + && entry.request.requested_access == requested_access + }) + .map(|(id, _)| id.clone()) + .collect(); + let mut taken = Vec::with_capacity(ids.len()); + for id in ids { + let Some(entry) = inner.entries.remove(&id) else { + continue; + }; + let count = match inner.counts.get_mut(&entry.workspace_id) { + Some(n) if *n > 0 => { + *n -= 1; + let v = *n; + if v == 0 { + inner.counts.remove(&entry.workspace_id); + } + v + } + _ => 0, + }; + taken.push((entry, count)); + } + taken + } } impl Default for PendingPathGrants { @@ -620,7 +675,7 @@ mod tests { requested_access: FilesystemPathAccess::ReadOnly, reason: "r".to_string(), }; - let (_rx, count) = pending.register(request).await; + let (_rx, count) = pending.register(request, "run-1".to_string()).await; assert_eq!(count, 1); let taken = pending.take("id-1").await; assert!(taken.is_some()); @@ -647,10 +702,55 @@ mod tests { ..a.clone() }; a.request_id = "id-a".to_string(); - let _ = pending.register(a).await; - let _ = pending.register(b).await; + let _ = pending.register(a, "run-1".to_string()).await; + let _ = pending.register(b, "run-1".to_string()).await; let list = pending.list_for_workspace("ws-A").await; assert_eq!(list.len(), 1); assert_eq!(list[0].request_id, "id-a"); } + + #[tokio::test] + async fn take_superseded_matches_run_path_and_access() { + let pending = PendingPathGrants::new(); + let base = PathGrantRequest { + request_id: "id-stale".to_string(), + workspace_id: Some("ws".to_string()), + agent_id: None, + agent_name: None, + requested_path: "/p".to_string(), + requested_access: FilesystemPathAccess::ReadOnly, + reason: "r".to_string(), + }; + let other_access = PathGrantRequest { + request_id: "id-access".to_string(), + requested_access: FilesystemPathAccess::ReadWrite, + ..base.clone() + }; + let other_path = PathGrantRequest { + request_id: "id-path".to_string(), + requested_path: "/q".to_string(), + ..base.clone() + }; + let (stale_rx, _) = pending.register(base, "run-1".to_string()).await; + let _rx2 = pending.register(other_access, "run-1".to_string()).await; + let _rx3 = pending.register(other_path, "run-1".to_string()).await; + + let taken = pending + .take_superseded("run-1", "/p", FilesystemPathAccess::ReadOnly) + .await; + assert_eq!(taken.len(), 1); + assert_eq!(taken[0].0.request.request_id, "id-stale"); + assert_eq!(taken[0].1, 2, "unrelated entries must remain counted"); + + drop(taken); + assert!(stale_rx.await.is_err(), "supersede closes the channel"); + assert!(pending.take("id-access").await.is_some()); + assert!(pending.take("id-path").await.is_some()); + + // A different run never supersedes. + assert!(pending + .take_superseded("run-2", "/q", FilesystemPathAccess::ReadOnly) + .await + .is_empty()); + } } diff --git a/src-tauri/src/commands/permissions.rs b/src-tauri/src/commands/permissions.rs index 67b0449..c85ff7a 100644 --- a/src-tauri/src/commands/permissions.rs +++ b/src-tauri/src/commands/permissions.rs @@ -38,10 +38,11 @@ use crate::AppState; pub const PERMISSION_REQUEST_EVENT: &str = "permissions://request"; pub const PERMISSION_ATTENTION_EVENT: &str = "permissions://attention"; /// Emitted when a pending request is cleared *without* a user decision — -/// the tool call was abandoned (CLI transport dropped mid-call, run -/// cancelled) or it timed out. The inline approval card removes the now- -/// useless card on this. Normal user submissions remove the card -/// optimistically on the frontend, so they don't emit this. +/// the run was cancelled or ended (reaping a wait orphaned by a CLI +/// transport drop), the wait timed out, or a re-asked command superseded +/// the stale request. The inline approval card removes the now-useless +/// card on this. Normal user submissions remove the card optimistically +/// on the frontend, so they don't emit this. pub const PERMISSION_RESOLVED_EVENT: &str = "permissions://resolved"; /// Maximum time the bash handler waits for a user response. Past this @@ -143,6 +144,11 @@ struct PendingInner { pub struct PendingEntry { pub sender: oneshot::Sender>, pub workspace_id: Option, + /// The run that is awaiting this decision. Used by + /// [`PendingApprovals::take_superseded`] so a re-asked command (after + /// a CLI transport drop orphaned the original request) replaces the + /// stale entry instead of stacking a duplicate card. + pub run_id: String, /// The original request payload as emitted to the frontend. Stored /// so that components that mount after the event fired can still /// discover the request via [`list_pending_permission_requests`]. @@ -169,6 +175,7 @@ impl PendingApprovals { pub async fn register( &self, request: PermissionRequest, + run_id: String, ) -> (oneshot::Receiver>, u32) { let (tx, rx) = oneshot::channel(); let mut inner = self.inner.lock().await; @@ -179,6 +186,7 @@ impl PendingApprovals { PendingEntry { sender: tx, workspace_id: workspace_id.clone(), + run_id, request, }, ); @@ -215,11 +223,11 @@ impl PendingApprovals { /// Drops every pending entry belonging to `workspace_id` and clears /// its count. Used by `workspace_delete` so requests scoped to a /// just-deleted workspace don't linger in memory until restart. - /// Dropping each entry's `sender` closes the oneshot channel, which - /// surfaces as a "channel closed" error on the bash-tool side that - /// was awaiting the decision — appropriate, since the workspace - /// (and therefore the in-flight call's context) is gone. - pub async fn purge_workspace(&self, workspace_id: &str) -> usize { + /// Returns the run ids that were awaiting the dropped entries so the + /// caller can cancel those runs — the awaiting side treats a closed + /// channel as "superseded, keep the run alive", so without explicit + /// cancellation a run would continue against the deleted workspace. + pub async fn purge_workspace(&self, workspace_id: &str) -> Vec { let mut inner = self.inner.lock().await; let to_remove: Vec = inner .entries @@ -227,12 +235,14 @@ impl PendingApprovals { .filter(|(_, entry)| entry.workspace_id.as_deref() == Some(workspace_id)) .map(|(id, _)| id.clone()) .collect(); - let count = to_remove.len(); + let mut run_ids = Vec::with_capacity(to_remove.len()); for id in to_remove { - inner.entries.remove(&id); + if let Some(entry) = inner.entries.remove(&id) { + run_ids.push(entry.run_id); + } } inner.counts.remove(&Some(workspace_id.to_string())); - count + run_ids } /// Removes the pending entry and decrements its workspace count. @@ -254,6 +264,44 @@ impl PendingApprovals { }; Some((entry, count)) } + + /// Removes every pending entry for the same run + command and returns + /// each with the post-removal workspace count. Called by the bash + /// approval flow right before registering a fresh request: when a CLI + /// transport drop orphans an in-flight approval and the model re-asks + /// the same command, the stale entry (and its UI card) is replaced by + /// the fresh one instead of lingering next to it. Dropping the + /// returned entries' senders ends the orphaned waits with a + /// channel-closed error, which they treat as superseded (no run + /// cancellation). + pub async fn take_superseded(&self, run_id: &str, command: &str) -> Vec<(PendingEntry, u32)> { + let mut inner = self.inner.lock().await; + let ids: Vec = inner + .entries + .iter() + .filter(|(_, entry)| entry.run_id == run_id && entry.request.command == command) + .map(|(id, _)| id.clone()) + .collect(); + let mut taken = Vec::with_capacity(ids.len()); + for id in ids { + let Some(entry) = inner.entries.remove(&id) else { + continue; + }; + let count = match inner.counts.get_mut(&entry.workspace_id) { + Some(n) if *n > 0 => { + *n -= 1; + let v = *n; + if v == 0 { + inner.counts.remove(&entry.workspace_id); + } + v + } + _ => 0, + }; + taken.push((entry, count)); + } + taken + } } impl Default for PendingApprovals { @@ -457,7 +505,7 @@ mod tests { let pending = PendingApprovals::new(); let req = fake_request(Some("ws-1")); let id = req.request_id.clone(); - let (_rx, count) = pending.register(req).await; + let (_rx, count) = pending.register(req, "run-1".to_string()).await; assert_eq!(count, 1); let taken = pending.take(&id).await; assert!(taken.is_some()); @@ -472,8 +520,8 @@ mod tests { let req2 = fake_request(Some("ws-1")); let id1 = req1.request_id.clone(); let id2 = req2.request_id.clone(); - let (_rx1, c1) = pending.register(req1).await; - let (_rx2, c2) = pending.register(req2).await; + let (_rx1, c1) = pending.register(req1, "run-1".to_string()).await; + let (_rx2, c2) = pending.register(req2, "run-1".to_string()).await; assert_eq!(c1, 1); assert_eq!(c2, 2); let (_, remaining) = pending.take(&id1).await.unwrap(); @@ -487,8 +535,8 @@ mod tests { let pending = PendingApprovals::new(); let req_a = fake_request(Some("ws-A")); let req_b = fake_request(Some("ws-B")); - let _ = pending.register(req_a.clone()).await; - let _ = pending.register(req_b).await; + let _ = pending.register(req_a.clone(), "run-1".to_string()).await; + let _ = pending.register(req_b, "run-1".to_string()).await; let list = pending.list_for_workspace("ws-A").await; assert_eq!(list.len(), 1); assert_eq!(list[0].request_id, req_a.request_id); @@ -497,14 +545,63 @@ mod tests { #[tokio::test] async fn counts_snapshot_aggregates_by_workspace_and_drops_anon() { let pending = PendingApprovals::new(); - let _ = pending.register(fake_request(Some("ws-A"))).await; - let _ = pending.register(fake_request(Some("ws-A"))).await; - let _ = pending.register(fake_request(Some("ws-B"))).await; - let _ = pending.register(fake_request(None)).await; + let _ = pending + .register(fake_request(Some("ws-A")), "run-1".to_string()) + .await; + let _ = pending + .register(fake_request(Some("ws-A")), "run-1".to_string()) + .await; + let _ = pending + .register(fake_request(Some("ws-B")), "run-1".to_string()) + .await; + let _ = pending + .register(fake_request(None), "run-1".to_string()) + .await; let snapshot = pending.counts_snapshot().await; assert_eq!(snapshot.get("ws-A"), Some(&2)); assert_eq!(snapshot.get("ws-B"), Some(&1)); assert_eq!(snapshot.len(), 2); } + + #[tokio::test] + async fn take_superseded_removes_only_same_run_and_command() { + let pending = PendingApprovals::new(); + let stale = fake_request(Some("ws-1")); // command "cmd" + let stale_id = stale.request_id.clone(); + let mut other_cmd = fake_request(Some("ws-1")); + other_cmd.command = "different".to_string(); + let other_cmd_id = other_cmd.request_id.clone(); + let other_run = fake_request(Some("ws-1")); // command "cmd", run-2 + let other_run_id = other_run.request_id.clone(); + let (stale_rx, _) = pending.register(stale, "run-1".to_string()).await; + let _rx2 = pending.register(other_cmd, "run-1".to_string()).await; + let _rx3 = pending.register(other_run, "run-2".to_string()).await; + + let taken = pending.take_superseded("run-1", "cmd").await; + assert_eq!(taken.len(), 1); + assert_eq!(taken[0].0.request.request_id, stale_id); + assert_eq!(taken[0].1, 2, "two unrelated entries must remain counted"); + + // Dropping the taken entry's sender ends the orphaned wait with a + // channel-closed error — the supersede signal. + drop(taken); + assert!(stale_rx.await.is_err()); + + // Unrelated entries are untouched. + assert!(pending.take(&other_cmd_id).await.is_some()); + assert!(pending.take(&other_run_id).await.is_some()); + } + + #[tokio::test] + async fn take_superseded_returns_empty_when_nothing_matches() { + let pending = PendingApprovals::new(); + let req = fake_request(Some("ws-1")); + let id = req.request_id.clone(); + let _rx = pending.register(req, "run-1".to_string()).await; + + assert!(pending.take_superseded("run-9", "cmd").await.is_empty()); + assert!(pending.take_superseded("run-1", "other").await.is_empty()); + assert!(pending.take(&id).await.is_some(), "entry must survive"); + } } diff --git a/src-tauri/src/commands/workspace.rs b/src-tauri/src/commands/workspace.rs index 9ef7f2c..0452209 100644 --- a/src-tauri/src/commands/workspace.rs +++ b/src-tauri/src/commands/workspace.rs @@ -2486,21 +2486,33 @@ pub async fn workspace_delete( })?; } - // Drain in-memory queues for this workspace. Pending bash-tool / - // path-grant approvals get their oneshot channels dropped, which - // surfaces as a closed-channel error on the awaiting side — correct, - // since the workspace they were scoped to no longer exists. + // Drain in-memory queues for this workspace, then cancel the runs + // that were awaiting those decisions. The awaiting side treats a + // dropped oneshot as "superseded by a newer request; keep the run + // alive", so the explicit cancel here is what actually stops runs + // scoped to the now-deleted workspace. let purged_approvals = state.pending_approvals.purge_workspace(&workspace_id).await; let purged_path_grants = state .pending_path_grants .purge_workspace(&workspace_id) .await; + let mut purged_run_ids: Vec = purged_approvals + .iter() + .chain(purged_path_grants.iter()) + .cloned() + .collect(); + purged_run_ids.sort(); + purged_run_ids.dedup(); + for run_id in &purged_run_ids { + crate::assistant::runtime::cancel_run(run_id); + } tracing::info!( workspace_id = %workspace_id, agents_cleared = agent_ids.len(), - approvals_purged = purged_approvals, - path_grants_purged = purged_path_grants, + approvals_purged = purged_approvals.len(), + path_grants_purged = purged_path_grants.len(), + runs_cancelled = purged_run_ids.len(), "Deleted general workspace" ); diff --git a/src/components/InlineApprovalCard.tsx b/src/components/InlineApprovalCard.tsx index 5693acb..70c5800 100644 --- a/src/components/InlineApprovalCard.tsx +++ b/src/components/InlineApprovalCard.tsx @@ -8,9 +8,9 @@ import type { PermissionRequest, SegmentDecision } from '../generated/bindings'; import styles from './InlineApprovalCard.module.css'; const PERMISSION_REQUEST_EVENT = 'permissions://request'; -// Backend cleared a pending request without a user decision (the tool call -// was abandoned — CLI transport dropped mid-call — or it timed out). Drop -// the now-useless card. Payload: { requestId }. +// Backend cleared a pending request without a user decision (the run was +// cancelled or ended, the wait timed out, or a re-asked command superseded +// it). Drop the now-useless card. Payload: { requestId }. const PERMISSION_RESOLVED_EVENT = 'permissions://resolved'; // Local UI alias matching the discriminator on SegmentDecision. Kept diff --git a/src/components/InlinePathGrantCard.tsx b/src/components/InlinePathGrantCard.tsx index 4c958c4..302bdb6 100644 --- a/src/components/InlinePathGrantCard.tsx +++ b/src/components/InlinePathGrantCard.tsx @@ -12,8 +12,9 @@ import type { import styles from './InlinePathGrantCard.module.css'; const PATH_GRANT_REQUEST_EVENT = 'path-grants://request'; -// Backend cleared a pending grant without a user decision (tool call -// abandoned — CLI transport dropped — or timed out). Drop the stale card. +// Backend cleared a pending grant without a user decision (run cancelled +// or ended, wait timed out, or a re-asked grant superseded it). Drop the +// stale card. const PATH_GRANT_RESOLVED_EVENT = 'path-grants://resolved'; type PathAccess = FilesystemPathAccess; From 4797206145a9ca56d8a56037078b3aa30951d3a9 Mon Sep 17 00:00:00 2001 From: juan <2930882+juacker@users.noreply.github.com> Date: Fri, 12 Jun 2026 14:45:10 +0200 Subject: [PATCH 3/3] fix(mcp): make human-wait expiry terminal --- src-tauri/src/assistant/engine.rs | 3 + src-tauri/src/assistant/tools/ask_user.rs | 74 +++++++++---- src-tauri/src/assistant/tools/local.rs | 129 ++++++++++++---------- src-tauri/src/assistant/tools/mod.rs | 9 ++ src-tauri/src/commands/path_grants.rs | 110 ++++++++++++++---- src-tauri/src/commands/permissions.rs | 110 ++++++++++++++---- src-tauri/src/commands/workspace.rs | 22 ++-- 7 files changed, 321 insertions(+), 136 deletions(-) diff --git a/src-tauri/src/assistant/engine.rs b/src-tauri/src/assistant/engine.rs index 2f3d703..b12630e 100644 --- a/src-tauri/src/assistant/engine.rs +++ b/src-tauri/src/assistant/engine.rs @@ -922,6 +922,7 @@ pub(crate) fn build_system_prompt( A tool call can occasionally fail with a transport error such as `MCP server \"clai\" transport dropped mid-call; response for tool was lost`. This means CLAI lost the in-flight call before its result reached you, so the call's outcome is UNKNOWN — it may or may not have run.\n\ - This matters specifically for tools that block on a user grant or response — `ask_user`, and approval-gated `bash_exec` / `fs_request_grant`. When one of these drops mid-call, the user may never have answered, or they answered but the decision was lost.\n\ - When it happens, re-issue the SAME interactive call once. CLAI replaces the lost prompt with the fresh one in the app, so the user simply answers the new prompt. Do NOT assume the lost call was approved, denied, or answered, and do NOT proceed past it.\n\ + - Apply this only to active CLAI human waits. If a user-input, command-approval, or filesystem-grant prompt expires or is denied, do not invent convoluted workarounds to bypass it. If the permission or answer is required, stop and explain what is blocked; retry only after a transport drop where the outcome is unknown.\n\ - For non-interactive tools (reads, searches, writes), a transport drop needs no special handling — just retry normally if you still need the result.\n", ); } @@ -1468,6 +1469,8 @@ mod tests { assert!(text.contains("## Interactive Tool Reliability")); assert!(text.contains("transport dropped mid-call")); assert!(text.contains("re-issue the SAME interactive call once")); + assert!(text.contains("Apply this only to active CLAI human waits")); + assert!(text.contains("do not invent convoluted workarounds")); // The backend supersedes the orphaned request when the model // re-asks, so the prompt must NOT push stale-card caveats (e.g. // telling the user to dismiss duplicates) onto the model. diff --git a/src-tauri/src/assistant/tools/ask_user.rs b/src-tauri/src/assistant/tools/ask_user.rs index 7263887..a6fbc6b 100644 --- a/src-tauri/src/assistant/tools/ask_user.rs +++ b/src-tauri/src/assistant/tools/ask_user.rs @@ -97,7 +97,13 @@ pub struct AskUserAnswer { // Keyed by pending id; the value carries the owning session id so callers // (mid-run input delivery) can check whether a session is currently blocked // on a human answer without threading new state through the tool router. -type PendingMap = HashMap)>; +#[derive(Debug)] +enum AskUserOutcome { + Answer(AskUserAnswer), + Superseded, +} + +type PendingMap = HashMap)>; static PENDING: OnceLock> = OnceLock::new(); fn pending_map() -> &'static Mutex { @@ -124,7 +130,7 @@ pub fn submit_answer(pending_id: &str, answer: AskUserAnswer) -> Result<(), Stri let (_, tx) = map .remove(pending_id) .ok_or_else(|| format!("No pending ask_user with id `{}`", pending_id))?; - tx.send(answer) + tx.send(AskUserOutcome::Answer(answer)) .map_err(|_| "ask_user receiver was dropped (run already ended)".to_string()) } @@ -132,9 +138,9 @@ pub fn submit_answer(pending_id: &str, answer: AskUserAnswer) -> Result<(), Stri /// pending ids. Called when a new ask is registered for the session: the /// frontend renders a single ask panel per session, so a still-pending /// ask at registration time is an orphan (its CLI transport dropped -/// mid-call and the model is re-asking). Dropping the removed senders -/// wakes the orphaned waits with a channel-closed error, which they -/// treat as superseded (no run cancellation). +/// mid-call and the model is re-asking). The removed senders receive an +/// explicit superseded outcome, so channel closure remains reserved for +/// cancellation/teardown. pub fn take_for_session(session_id: &str) -> Vec { let Ok(mut map) = pending_map().lock() else { return Vec::new(); @@ -145,7 +151,9 @@ pub fn take_for_session(session_id: &str) -> Vec { .map(|(id, _)| id.clone()) .collect(); for id in &ids { - map.remove(id); + if let Some((_, tx)) = map.remove(id) { + let _ = tx.send(AskUserOutcome::Superseded); + } } ids } @@ -170,6 +178,22 @@ impl PendingGuard { fn disarm(&mut self) { self.armed = false; } + + async fn expire_and_stop(&mut self) -> T { + if let Ok(mut map) = pending_map().lock() { + map.remove(&self.id); + } + let _ = emit_event( + &self.app, + &self.session, + Some(self.run_id.as_str()), + AssistantUiEvent::AskUserResolved { + pending_id: self.id.clone(), + }, + ); + self.armed = false; + super::cancel_run_and_park(&self.cancel_token).await + } } impl Drop for PendingGuard { @@ -232,7 +256,7 @@ pub async fn execute( } let pending_id = Uuid::new_v4().to_string(); - let (tx, rx) = oneshot::channel::(); + let (tx, rx) = oneshot::channel::(); { let mut map = pending_map() .lock() @@ -263,23 +287,23 @@ pub async fn execute( let wait_timeout = context.interactive_wait_timeout(ASK_USER_TIMEOUT); let answer = match tokio::time::timeout(wait_timeout, rx).await { - Ok(Ok(answer)) => answer, - Ok(Err(_)) => { - // Sender dropped without an answer: a newer ask for this - // session superseded this one (transport-drop re-ask). The - // superseding call already cleaned the map and the UI, and - // the run is alive waiting on the NEW question — disarm - // rather than cancel the run. + Ok(Ok(AskUserOutcome::Answer(answer))) => answer, + Ok(Ok(AskUserOutcome::Superseded)) => { + // A fresh question for this session replaced this orphaned + // wait after a transport drop. Ignore the stale future. guard.disarm(); return Err( "ask_user was superseded by a newer question before an answer arrived".to_string(), ); } + Ok(Err(_)) if context.cancel_token.is_cancelled() => { + return super::cancel_run_and_park(&context.cancel_token).await; + } + Ok(Err(_)) => { + return guard.expire_and_stop().await; + } Err(_) => { - return Err(format!( - "ask_user timed out waiting for a user answer after {} seconds", - wait_timeout.as_secs() - )) + return guard.expire_and_stop().await; } }; guard.disarm(); @@ -381,8 +405,8 @@ mod tests { #[test] fn take_for_session_removes_only_that_sessions_asks_and_closes_channels() { - let (tx_a, rx_a) = oneshot::channel::(); - let (tx_b, mut rx_b) = oneshot::channel::(); + let (tx_a, rx_a) = oneshot::channel::(); + let (tx_b, mut rx_b) = oneshot::channel::(); { let mut map = pending_map().lock().unwrap(); map.insert( @@ -400,9 +424,11 @@ mod tests { assert!(!session_has_pending_ask("tfs-session-a")); assert!(session_has_pending_ask("tfs-session-b")); - // The removed sender is dropped → the orphaned wait wakes with a - // channel-closed error (the supersede signal)... - assert!(rx_a.blocking_recv().is_err()); + // The orphaned wait receives an explicit supersede signal. + assert!(matches!( + rx_a.blocking_recv(), + Ok(AskUserOutcome::Superseded) + )); // ...while the other session's channel stays open. assert!(matches!( rx_b.try_recv(), @@ -415,7 +441,7 @@ mod tests { #[test] fn submit_answer_fails_for_superseded_pending_id() { - let (tx, _rx) = oneshot::channel::(); + let (tx, _rx) = oneshot::channel::(); pending_map() .lock() .unwrap() diff --git a/src-tauri/src/assistant/tools/local.rs b/src-tauri/src/assistant/tools/local.rs index 5f2143c..ff5a112 100644 --- a/src-tauri/src/assistant/tools/local.rs +++ b/src-tauri/src/assistant/tools/local.rs @@ -1201,10 +1201,8 @@ fn is_pure_assignment_token(tok: &str) -> bool { /// future). The guard then removes the still-pending registry entry, /// tells the frontend to drop the now-useless approval card, and cancels /// the run so the model cannot continue without the missing decision -/// (a no-op when the run already ended). Disarmed on a normal decision, -/// where the submit command already removed the entry, and on a -/// channel-closed wakeup (supersede), where the superseding caller -/// already cleaned up and the run must stay alive. +/// (a no-op when the run already ended). Disarmed on a normal decision or +/// explicit supersede, where the caller already removed/replaced the entry. /// /// Cleanup is async (registry lock) so it's spawned onto the app runtime; /// `Drop` can't await. `take` is a no-op if the entry was already removed @@ -1221,6 +1219,22 @@ impl AbandonedApprovalGuard { fn disarm(&mut self) { self.armed = false; } + + async fn expire_and_stop(&mut self) -> T { + use tauri::Manager; + + let state = self.app.state::(); + if let Some((_, remaining)) = state.pending_approvals.take(&self.request_id).await { + crate::commands::permissions::emit_permission_resolved(&self.app, &self.request_id); + crate::commands::permissions::emit_attention( + &self.app, + self.workspace_id.clone(), + remaining, + ); + } + self.armed = false; + super::cancel_run_and_park(&self.cancel_token).await + } } impl Drop for AbandonedApprovalGuard { @@ -1260,8 +1274,8 @@ async fn await_user_permission( segments: Vec, ) -> Result<(), String> { use crate::commands::permissions::{ - emit_attention, PermissionRequest, SegmentDecision, APPROVAL_TIMEOUT, - PERMISSION_REQUEST_EVENT, + emit_attention, PendingApprovalOutcome, PermissionRequest, SegmentDecision, + APPROVAL_TIMEOUT, PERMISSION_REQUEST_EVENT, }; use tauri::{Emitter, Manager}; @@ -1285,20 +1299,16 @@ async fn await_user_permission( // still pending, it is an orphan — its CLI transport dropped mid-call // and the model is now re-asking. Replace it (and its UI card) with // the fresh request instead of stacking a duplicate the user can no - // longer meaningfully answer. Dropping the stale entry's sender wakes - // the orphaned wait with a channel-closed error, which it treats as - // superseded (no run cancellation — this run is alive and waiting on - // the NEW request). - for (stale, remaining) in app_state + // longer meaningfully answer. The stale waiter receives an explicit + // supersede outcome (no run cancellation — this run is alive and + // waiting on the NEW request). + for stale in app_state .pending_approvals .take_superseded(&context.run_id, command) .await { - crate::commands::permissions::emit_permission_resolved( - &deps.app, - &stale.request.request_id, - ); - emit_attention(&deps.app, stale.workspace_id.clone(), remaining); + crate::commands::permissions::emit_permission_resolved(&deps.app, &stale.request_id); + emit_attention(&deps.app, stale.workspace_id.clone(), stale.remaining); } let (rx, count) = app_state @@ -1314,7 +1324,7 @@ async fn await_user_permission( // Cleans up if this future is abandoned before a decision (CLI transport // drop mid-call, or run cancellation): clears the pending entry and tells // the frontend to drop the now-useless approval card. Disarmed below on a - // normal decision; the timeout / channel-closed arms let it fire on drop. + // normal decision and explicit supersede. let mut abandon_guard = AbandonedApprovalGuard { app: deps.app.clone(), cancel_token: context.cancel_token.clone(), @@ -1325,35 +1335,29 @@ async fn await_user_permission( let wait_timeout = context.interactive_wait_timeout(APPROVAL_TIMEOUT); let decisions = match tokio::time::timeout(wait_timeout, rx).await { - Ok(Ok(d)) => { + Ok(Ok(PendingApprovalOutcome::Decision(d))) => { // The submit command already removed the registry entry and the // frontend cleared the card optimistically. abandon_guard.disarm(); d } - Ok(Err(_)) => { - // The sender was dropped without a decision: this request was - // superseded by a fresh registration for the same run + command - // (the model re-asked after a transport drop), or app state is - // tearing down. Either way the registry entry is already gone - // and the run may be live, waiting on the NEW request — so - // disarm the guard rather than cancel the run. + Ok(Ok(PendingApprovalOutcome::Superseded)) => { + // A fresh registration for the same run + command replaced + // this orphaned wait after a transport drop. This stale future + // is intentionally ignored: no notice, no warning, no cancel. abandon_guard.disarm(); let msg = "Permission request was superseded by a newer request \ for the same command before a decision was made"; - context.add_notice(RunNoticeKind::CommandDenied, msg.to_string()); return Err(msg.to_string()); } + Ok(Err(_)) if context.cancel_token.is_cancelled() => { + return super::cancel_run_and_park(&context.cancel_token).await; + } + Ok(Err(_)) => { + return abandon_guard.expire_and_stop().await; + } Err(_) => { - // Human-wait timeout. `abandon_guard` clears the pending entry, - // emits attention, drops the card, and cancels the run when it - // goes out of scope. - let msg = format!( - "Permission approval timed out after {} seconds", - wait_timeout.as_secs() - ); - context.add_notice(RunNoticeKind::CommandDenied, msg.to_string()); - return Err(msg); + return abandon_guard.expire_and_stop().await; } }; @@ -1577,6 +1581,22 @@ impl AbandonedPathGrantGuard { fn disarm(&mut self) { self.armed = false; } + + async fn expire_and_stop(&mut self) -> T { + use tauri::Manager; + + let state = self.app.state::(); + if let Some((_, remaining)) = state.pending_path_grants.take(&self.request_id).await { + crate::commands::path_grants::emit_path_grant_resolved(&self.app, &self.request_id); + crate::commands::path_grants::emit_attention( + &self.app, + self.workspace_id.clone(), + remaining, + ); + } + self.armed = false; + super::cancel_run_and_park(&self.cancel_token).await + } } impl Drop for AbandonedPathGrantGuard { @@ -1605,7 +1625,7 @@ async fn await_path_grant_decision( request: crate::commands::path_grants::PathGrantRequest, ) -> Result { use crate::commands::path_grants::{ - emit_attention, PATH_GRANT_REQUEST_EVENT, PATH_GRANT_TIMEOUT, + emit_attention, PendingPathGrantOutcome, PATH_GRANT_REQUEST_EVENT, PATH_GRANT_TIMEOUT, }; use tauri::{Emitter, Manager}; @@ -1616,7 +1636,7 @@ async fn await_path_grant_decision( // Supersede a stale orphaned request for the same run + path + access // (the model re-asked after a CLI transport drop). See the analogous // block in `await_user_permission` for the full rationale. - for (stale, remaining) in app_state + for stale in app_state .pending_path_grants .take_superseded( &context.run_id, @@ -1625,11 +1645,8 @@ async fn await_path_grant_decision( ) .await { - crate::commands::path_grants::emit_path_grant_resolved( - &deps.app, - &stale.request.request_id, - ); - emit_attention(&deps.app, stale.workspace_id.clone(), remaining); + crate::commands::path_grants::emit_path_grant_resolved(&deps.app, &stale.request_id); + emit_attention(&deps.app, stale.workspace_id.clone(), stale.remaining); } let (rx, count) = app_state @@ -1644,7 +1661,7 @@ async fn await_path_grant_decision( // See `AbandonedApprovalGuard`: clears the pending entry and drops the // card if this future is abandoned before a decision. Disarmed on a - // normal decision; the timeout / channel-closed arms let it fire on drop. + // normal decision and explicit supersede. let mut abandon_guard = AbandonedPathGrantGuard { app: deps.app.clone(), cancel_token: context.cancel_token.clone(), @@ -1655,33 +1672,25 @@ async fn await_path_grant_decision( let wait_timeout = context.interactive_wait_timeout(PATH_GRANT_TIMEOUT); match tokio::time::timeout(wait_timeout, rx).await { - Ok(Ok(decision)) => { + Ok(Ok(PendingPathGrantOutcome::Decision(decision))) => { abandon_guard.disarm(); Ok(decision) } - Ok(Err(_)) => { - // Sender dropped without a decision: superseded by a fresh - // registration for the same run + path + access, or app - // teardown. The superseding caller already cleaned up and the - // run may be live on the NEW request — disarm, don't cancel. + Ok(Ok(PendingPathGrantOutcome::Superseded)) => { + // A fresh registration for the same run + path + access + // replaced this orphaned wait after a transport drop. This + // stale future is intentionally ignored. abandon_guard.disarm(); let msg = "Path-grant request was superseded by a newer request \ for the same path before a decision was made" .to_string(); - context.add_notice(RunNoticeKind::PathGrantDenied, msg.clone()); Err(msg) } - Err(_) => { - // Human-wait timeout. `abandon_guard` clears the pending entry, - // emits attention, drops the card, and cancels the run when it - // goes out of scope. - let msg = format!( - "Path-grant approval timed out after {} seconds", - wait_timeout.as_secs() - ); - context.add_notice(RunNoticeKind::PathGrantDenied, msg.clone()); - Err(msg) + Ok(Err(_)) if context.cancel_token.is_cancelled() => { + super::cancel_run_and_park(&context.cancel_token).await } + Ok(Err(_)) => abandon_guard.expire_and_stop().await, + Err(_) => abandon_guard.expire_and_stop().await, } } diff --git a/src-tauri/src/assistant/tools/mod.rs b/src-tauri/src/assistant/tools/mod.rs index bba7390..ba5000d 100644 --- a/src-tauri/src/assistant/tools/mod.rs +++ b/src-tauri/src/assistant/tools/mod.rs @@ -20,6 +20,15 @@ use crate::assistant::types::{ }; use crate::config::{ExecutionCapabilityConfig, FilesystemPathGrant}; +/// Terminal human-wait handling: once a CLAI-owned prompt for user input or +/// permission expires, cancel the run and never return a tool result for the +/// model to route around. The outer run driver races tool execution against +/// this token and will drop this parked future while cancelling the run. +pub async fn cancel_run_and_park(cancel_token: &CancellationToken) -> T { + cancel_token.cancel(); + std::future::pending::().await +} + /// The name under which clai's local MCP server is registered with CLI /// providers (`write_mcp_config` for Claude Code, `add_codex_common_args` /// for Codex — both in `assistant::local_agent`). diff --git a/src-tauri/src/commands/path_grants.rs b/src-tauri/src/commands/path_grants.rs index 68734d8..d8625bb 100644 --- a/src-tauri/src/commands/path_grants.rs +++ b/src-tauri/src/commands/path_grants.rs @@ -120,8 +120,14 @@ struct PendingInner { counts: HashMap, u32>, } +#[derive(Debug)] +pub enum PendingPathGrantOutcome { + Decision(PathGrantDecision), + Superseded, +} + pub struct PendingEntry { - pub sender: oneshot::Sender, + pub sender: oneshot::Sender, pub workspace_id: Option, pub agent_id: Option, /// The run awaiting this decision. Used by @@ -132,6 +138,12 @@ pub struct PendingEntry { pub request: PathGrantRequest, } +pub struct SupersededPathGrant { + pub request_id: String, + pub workspace_id: Option, + pub remaining: u32, +} + impl PendingPathGrants { pub fn new() -> Self { Self { @@ -146,7 +158,7 @@ impl PendingPathGrants { &self, request: PathGrantRequest, run_id: String, - ) -> (oneshot::Receiver, u32) { + ) -> (oneshot::Receiver, u32) { let (tx, rx) = oneshot::channel(); let mut inner = self.inner.lock().await; let request_id = request.request_id.clone(); @@ -190,12 +202,18 @@ impl PendingPathGrants { .collect() } - /// See [`crate::commands::permissions::PendingApprovals::purge_workspace`]. - /// Same semantics — drops every pending path-grant request for the - /// given workspace, clears its count, and returns the run ids that - /// were awaiting the dropped entries so the caller can cancel those - /// runs. Used by `workspace_delete`. - pub async fn purge_workspace(&self, workspace_id: &str) -> Vec { + /// See [`crate::commands::permissions::PendingApprovals::purge_workspace_canceling_runs`]. + /// Same semantics — cancels runs before dropping every pending + /// path-grant request for the given workspace, clears its count, and + /// returns the cancelled run ids. Used by `workspace_delete`. + pub async fn purge_workspace_canceling_runs( + &self, + workspace_id: &str, + mut cancel_run: F, + ) -> Vec + where + F: FnMut(&str), + { let mut inner = self.inner.lock().await; let to_remove: Vec = inner .entries @@ -204,11 +222,19 @@ impl PendingPathGrants { .map(|(id, _)| id.clone()) .collect(); let mut run_ids = Vec::with_capacity(to_remove.len()); - for id in to_remove { - if let Some(entry) = inner.entries.remove(&id) { - run_ids.push(entry.run_id); + for id in &to_remove { + if let Some(entry) = inner.entries.get(id) { + run_ids.push(entry.run_id.clone()); } } + run_ids.sort(); + run_ids.dedup(); + for run_id in &run_ids { + cancel_run(run_id); + } + for id in to_remove { + inner.entries.remove(&id); + } inner.counts.remove(&Some(workspace_id.to_string())); run_ids } @@ -234,13 +260,14 @@ impl PendingPathGrants { /// Same semantics for path grants, keyed on run + requested path + /// requested access: a fresh registration for the same grant replaces /// any stale orphaned entry (and its UI card) instead of stacking a - /// duplicate. + /// duplicate. Supersede is explicit; channel closure remains + /// cancellation/teardown. pub async fn take_superseded( &self, run_id: &str, requested_path: &str, requested_access: crate::config::FilesystemPathAccess, - ) -> Vec<(PendingEntry, u32)> { + ) -> Vec { let mut inner = self.inner.lock().await; let ids: Vec = inner .entries @@ -257,6 +284,8 @@ impl PendingPathGrants { let Some(entry) = inner.entries.remove(&id) else { continue; }; + let request_id = entry.request.request_id.clone(); + let workspace_id = entry.workspace_id.clone(); let count = match inner.counts.get_mut(&entry.workspace_id) { Some(n) if *n > 0 => { *n -= 1; @@ -268,7 +297,12 @@ impl PendingPathGrants { } _ => 0, }; - taken.push((entry, count)); + let _ = entry.sender.send(PendingPathGrantOutcome::Superseded); + taken.push(SupersededPathGrant { + request_id, + workspace_id, + remaining: count, + }); } taken } @@ -348,7 +382,9 @@ pub async fn submit_path_grant_decision( )?; } - let _ = entry.sender.send(validated); + let _ = entry + .sender + .send(PendingPathGrantOutcome::Decision(validated)); emit_attention(&app, entry.workspace_id, remaining); Ok(()) } @@ -739,11 +775,16 @@ mod tests { .take_superseded("run-1", "/p", FilesystemPathAccess::ReadOnly) .await; assert_eq!(taken.len(), 1); - assert_eq!(taken[0].0.request.request_id, "id-stale"); - assert_eq!(taken[0].1, 2, "unrelated entries must remain counted"); + assert_eq!(taken[0].request_id, "id-stale"); + assert_eq!( + taken[0].remaining, 2, + "unrelated entries must remain counted" + ); - drop(taken); - assert!(stale_rx.await.is_err(), "supersede closes the channel"); + assert!(matches!( + stale_rx.await, + Ok(PendingPathGrantOutcome::Superseded) + )); assert!(pending.take("id-access").await.is_some()); assert!(pending.take("id-path").await.is_some()); @@ -753,4 +794,35 @@ mod tests { .await .is_empty()); } + + #[tokio::test] + async fn purge_workspace_cancels_run_before_dropping_sender() { + let pending = PendingPathGrants::new(); + let request = PathGrantRequest { + request_id: "id-1".to_string(), + workspace_id: Some("ws-1".to_string()), + agent_id: None, + agent_name: None, + requested_path: "/p".to_string(), + requested_access: FilesystemPathAccess::ReadOnly, + reason: "r".to_string(), + }; + let mut rx = pending.register(request, "run-1".to_string()).await.0; + + let run_ids = pending + .purge_workspace_canceling_runs("ws-1", |run_id| { + assert_eq!(run_id, "run-1"); + assert!(matches!( + rx.try_recv(), + Err(oneshot::error::TryRecvError::Empty) + )); + }) + .await; + + assert_eq!(run_ids, vec!["run-1".to_string()]); + assert!( + rx.await.is_err(), + "purge drops the sender after cancellation" + ); + } } diff --git a/src-tauri/src/commands/permissions.rs b/src-tauri/src/commands/permissions.rs index c85ff7a..5e1e6e3 100644 --- a/src-tauri/src/commands/permissions.rs +++ b/src-tauri/src/commands/permissions.rs @@ -141,8 +141,14 @@ struct PendingInner { counts: HashMap, u32>, } +#[derive(Debug)] +pub enum PendingApprovalOutcome { + Decision(Vec), + Superseded, +} + pub struct PendingEntry { - pub sender: oneshot::Sender>, + pub sender: oneshot::Sender, pub workspace_id: Option, /// The run that is awaiting this decision. Used by /// [`PendingApprovals::take_superseded`] so a re-asked command (after @@ -155,6 +161,12 @@ pub struct PendingEntry { pub request: PermissionRequest, } +pub struct SupersededApproval { + pub request_id: String, + pub workspace_id: Option, + pub remaining: u32, +} + impl PendingApprovals { pub fn new() -> Self { Self { @@ -176,7 +188,7 @@ impl PendingApprovals { &self, request: PermissionRequest, run_id: String, - ) -> (oneshot::Receiver>, u32) { + ) -> (oneshot::Receiver, u32) { let (tx, rx) = oneshot::channel(); let mut inner = self.inner.lock().await; let request_id = request.request_id.clone(); @@ -223,11 +235,18 @@ impl PendingApprovals { /// Drops every pending entry belonging to `workspace_id` and clears /// its count. Used by `workspace_delete` so requests scoped to a /// just-deleted workspace don't linger in memory until restart. - /// Returns the run ids that were awaiting the dropped entries so the - /// caller can cancel those runs — the awaiting side treats a closed - /// channel as "superseded, keep the run alive", so without explicit - /// cancellation a run would continue against the deleted workspace. - pub async fn purge_workspace(&self, workspace_id: &str) -> Vec { + /// Cancels the runs awaiting those entries before dropping their + /// senders, then returns the cancelled run ids. Channel closure is + /// reserved for cancellation/teardown; supersede is delivered as an + /// explicit [`PendingApprovalOutcome::Superseded`]. + pub async fn purge_workspace_canceling_runs( + &self, + workspace_id: &str, + mut cancel_run: F, + ) -> Vec + where + F: FnMut(&str), + { let mut inner = self.inner.lock().await; let to_remove: Vec = inner .entries @@ -236,11 +255,19 @@ impl PendingApprovals { .map(|(id, _)| id.clone()) .collect(); let mut run_ids = Vec::with_capacity(to_remove.len()); - for id in to_remove { - if let Some(entry) = inner.entries.remove(&id) { - run_ids.push(entry.run_id); + for id in &to_remove { + if let Some(entry) = inner.entries.get(id) { + run_ids.push(entry.run_id.clone()); } } + run_ids.sort(); + run_ids.dedup(); + for run_id in &run_ids { + cancel_run(run_id); + } + for id in to_remove { + inner.entries.remove(&id); + } inner.counts.remove(&Some(workspace_id.to_string())); run_ids } @@ -270,11 +297,10 @@ impl PendingApprovals { /// approval flow right before registering a fresh request: when a CLI /// transport drop orphans an in-flight approval and the model re-asks /// the same command, the stale entry (and its UI card) is replaced by - /// the fresh one instead of lingering next to it. Dropping the - /// returned entries' senders ends the orphaned waits with a - /// channel-closed error, which they treat as superseded (no run - /// cancellation). - pub async fn take_superseded(&self, run_id: &str, command: &str) -> Vec<(PendingEntry, u32)> { + /// the fresh one instead of lingering next to it. The stale waiter is + /// woken with an explicit supersede outcome, not by dropping its + /// channel, so unrelated teardown cannot masquerade as supersede. + pub async fn take_superseded(&self, run_id: &str, command: &str) -> Vec { let mut inner = self.inner.lock().await; let ids: Vec = inner .entries @@ -287,6 +313,8 @@ impl PendingApprovals { let Some(entry) = inner.entries.remove(&id) else { continue; }; + let request_id = entry.request.request_id.clone(); + let workspace_id = entry.workspace_id.clone(); let count = match inner.counts.get_mut(&entry.workspace_id) { Some(n) if *n > 0 => { *n -= 1; @@ -298,7 +326,12 @@ impl PendingApprovals { } _ => 0, }; - taken.push((entry, count)); + let _ = entry.sender.send(PendingApprovalOutcome::Superseded); + taken.push(SupersededApproval { + request_id, + workspace_id, + remaining: count, + }); } taken } @@ -366,7 +399,9 @@ pub async fn submit_permission_decision( ) { persist_decisions_to_agent(state.inner(), workspace_id, agent_id, &decisions)?; } - let _ = entry.sender.send(decisions); + let _ = entry + .sender + .send(PendingApprovalOutcome::Decision(decisions)); emit_attention(&app, entry.workspace_id, remaining); Ok(()) } @@ -580,13 +615,17 @@ mod tests { let taken = pending.take_superseded("run-1", "cmd").await; assert_eq!(taken.len(), 1); - assert_eq!(taken[0].0.request.request_id, stale_id); - assert_eq!(taken[0].1, 2, "two unrelated entries must remain counted"); + assert_eq!(taken[0].request_id, stale_id); + assert_eq!( + taken[0].remaining, 2, + "two unrelated entries must remain counted" + ); - // Dropping the taken entry's sender ends the orphaned wait with a - // channel-closed error — the supersede signal. - drop(taken); - assert!(stale_rx.await.is_err()); + // Supersede is explicit; closed channels remain cancellation/teardown. + assert!(matches!( + stale_rx.await, + Ok(PendingApprovalOutcome::Superseded) + )); // Unrelated entries are untouched. assert!(pending.take(&other_cmd_id).await.is_some()); @@ -604,4 +643,29 @@ mod tests { assert!(pending.take_superseded("run-1", "other").await.is_empty()); assert!(pending.take(&id).await.is_some(), "entry must survive"); } + + #[tokio::test] + async fn purge_workspace_cancels_run_before_dropping_sender() { + let pending = PendingApprovals::new(); + let mut rx = pending + .register(fake_request(Some("ws-1")), "run-1".to_string()) + .await + .0; + + let run_ids = pending + .purge_workspace_canceling_runs("ws-1", |run_id| { + assert_eq!(run_id, "run-1"); + assert!(matches!( + rx.try_recv(), + Err(oneshot::error::TryRecvError::Empty) + )); + }) + .await; + + assert_eq!(run_ids, vec!["run-1".to_string()]); + assert!( + rx.await.is_err(), + "purge drops the sender after cancellation" + ); + } } diff --git a/src-tauri/src/commands/workspace.rs b/src-tauri/src/commands/workspace.rs index 0452209..e1edaee 100644 --- a/src-tauri/src/commands/workspace.rs +++ b/src-tauri/src/commands/workspace.rs @@ -2486,15 +2486,20 @@ pub async fn workspace_delete( })?; } - // Drain in-memory queues for this workspace, then cancel the runs - // that were awaiting those decisions. The awaiting side treats a - // dropped oneshot as "superseded by a newer request; keep the run - // alive", so the explicit cancel here is what actually stops runs - // scoped to the now-deleted workspace. - let purged_approvals = state.pending_approvals.purge_workspace(&workspace_id).await; + // Drain in-memory queues for this workspace. The purge helpers cancel + // runs before dropping pending senders, so workspace deletion cannot be + // mistaken for an interactive-request supersede. + let purged_approvals = state + .pending_approvals + .purge_workspace_canceling_runs(&workspace_id, |run_id| { + let _ = crate::assistant::runtime::cancel_run(run_id); + }) + .await; let purged_path_grants = state .pending_path_grants - .purge_workspace(&workspace_id) + .purge_workspace_canceling_runs(&workspace_id, |run_id| { + let _ = crate::assistant::runtime::cancel_run(run_id); + }) .await; let mut purged_run_ids: Vec = purged_approvals .iter() @@ -2503,9 +2508,6 @@ pub async fn workspace_delete( .collect(); purged_run_ids.sort(); purged_run_ids.dedup(); - for run_id in &purged_run_ids { - crate::assistant::runtime::cancel_run(run_id); - } tracing::info!( workspace_id = %workspace_id,