Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions src-tauri/src/assistant/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,30 @@ pub(crate) fn build_system_prompt(
- Be concise and direct in your responses. Prefer concrete actions and evidence over vague summaries.\n",
);

// Transport-drop recovery for grant/response-blocking tools. The local MCP
// transport can drop an in-flight call (surfaced to the model as
// "transport dropped mid-call; response for tool <name> was lost"). For a
// tool that blocks on a user grant or answer, the outcome is then unknown,
// so the model must re-ask rather than assume an answer or proceed. The
// backend treats the re-asked call as superseding the orphaned one (the
// stale approval/question card is replaced in place), so no UI caveats are
// needed here. Scoped to sessions that actually expose such a tool;
// ordinary read/write tools, which can be retried without side effects,
// need no special handling.
let has_interactive_tool = tool_names
.iter()
.any(|n| matches!(*n, "ask_user" | "bash_exec" | "fs_request_grant"));
if has_interactive_tool {
prompt.push_str(
"\n## Interactive Tool Reliability\n\
A tool call can occasionally fail with a transport error such as `MCP server \"clai\" transport dropped mid-call; response for tool <name> was lost`. This means CLAI lost the in-flight call before its result reached you, so the call's outcome is UNKNOWN — it may or may not have run.\n\
- This matters specifically for tools that block on a user grant or response — `ask_user`, and approval-gated `bash_exec` / `fs_request_grant`. When one of these drops mid-call, the user may never have answered, or they answered but the decision was lost.\n\
- When it happens, re-issue the SAME interactive call once. CLAI replaces the lost prompt with the fresh one in the app, so the user simply answers the new prompt. Do NOT assume the lost call was approved, denied, or answered, and do NOT proceed past it.\n\
- Apply this only to active CLAI human waits. If a user-input, command-approval, or filesystem-grant prompt expires or is denied, do not invent convoluted workarounds to bypass it. If the permission or answer is required, stop and explain what is blocked; retry only after a transport drop where the outcome is unknown.\n\
- For non-interactive tools (reads, searches, writes), a transport drop needs no special handling — just retry normally if you still need the result.\n",
);
}

if context.space_id.is_some() || !context.mcp_server_ids.is_empty() {
prompt.push_str(
"- This tab already carries session-specific context and capabilities. \
Expand Down Expand Up @@ -1427,6 +1451,51 @@ mod tests {
assert!(text.contains("re-run the relevant tools if freshness matters."));
}

#[test]
fn build_system_prompt_adds_interactive_reliability_guidance_when_blocking_tool_present() {
let context = SessionContext::default();
let tools = [crate::assistant::types::ToolDefinition {
name: "ask_user".to_string(),
description: "desc".to_string(),
input_schema: serde_json::json!({}),
}];

let message = build_system_prompt(&context, None, &tools, &RunTrigger::UserMessage);
let text = match &message.content[0] {
ContentPart::Text { text } => text,
other => panic!("expected text content, got {:?}", other),
};

assert!(text.contains("## Interactive Tool Reliability"));
assert!(text.contains("transport dropped mid-call"));
assert!(text.contains("re-issue the SAME interactive call once"));
assert!(text.contains("Apply this only to active CLAI human waits"));
assert!(text.contains("do not invent convoluted workarounds"));
// The backend supersedes the orphaned request when the model
// re-asks, so the prompt must NOT push stale-card caveats (e.g.
// telling the user to dismiss duplicates) onto the model.
assert!(text.contains("replaces the lost prompt with the fresh one"));
assert!(!text.contains("dismiss"));
}

#[test]
fn build_system_prompt_omits_interactive_reliability_guidance_without_blocking_tool() {
let context = SessionContext::default();
let tools = [crate::assistant::types::ToolDefinition {
name: "fs_read".to_string(),
description: "desc".to_string(),
input_schema: serde_json::json!({}),
}];

let message = build_system_prompt(&context, None, &tools, &RunTrigger::UserMessage);
let text = match &message.content[0] {
ContentPart::Text { text } => text,
other => panic!("expected text content, got {:?}", other),
};

assert!(!text.contains("## Interactive Tool Reliability"));
}

#[test]
fn build_system_prompt_describes_autonomous_run_mode() {
let context = SessionContext {
Expand Down
166 changes: 158 additions & 8 deletions src-tauri/src/assistant/local_mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,20 @@ pub struct LocalMcpRuntime {
// whichever workspace happened to bind first.
app: AppHandle,
cancellation_token: CancellationToken,
bindings: Arc<RwLock<HashMap<String, ToolBinding>>>,
bindings: Arc<RwLock<HashMap<String, BoundRun>>>,
}

/// A registered run binding plus the run-scope token that reaps its
/// in-flight tool calls. [`BindingGuard::drop`] cancels `run_scope`, so
/// any tool future still racing in [`execute_bound_tool`]'s `select!`
/// when the run ends (e.g. an interactive wait orphaned by a CLI
/// transport drop) is dropped instead of lingering on the rmcp session
/// worker until its own timeout. Dropping those futures fires their
/// cleanup guards (pending-registry removal + `resolved` UI events).
#[derive(Clone)]
struct BoundRun {
binding: ToolBinding,
run_scope: CancellationToken,
}

#[derive(Clone)]
Expand Down Expand Up @@ -89,24 +102,32 @@ impl LocalMcpRuntime {
/// bindings map.
pub fn bind_run(&self, binding: ToolBinding) -> BindingGuard {
let token = Uuid::new_v4().to_string();
let run_scope = CancellationToken::new();
// Bindings map only ever holds short, await-free critical sections,
// so `std::sync::RwLock` is fine and lets `Drop` clean up sync.
// A poisoned lock means the binding map is unusable; we'd rather
// panic here than continue with a corrupted server state.
self.bindings
.write()
.expect("local MCP binding map poisoned")
.insert(token.clone(), binding);
.insert(
token.clone(),
BoundRun {
binding,
run_scope: run_scope.clone(),
},
);
BindingGuard {
bindings: self.bindings.clone(),
token,
run_scope,
}
}

fn binding_from_request(
&self,
context: &RequestContext<RoleServer>,
) -> Result<ToolBinding, McpError> {
) -> Result<BoundRun, McpError> {
let token = bearer_token(context).ok_or_else(|| {
McpError::invalid_request("missing bearer token for CLAI MCP request", None)
})?;
Expand All @@ -125,9 +146,18 @@ impl LocalMcpRuntime {
/// token while alive and removes it from the runtime on drop, so a panic
/// or early return between bind and the end of a run cannot leak a stale
/// binding into the process-singleton MCP server.
///
/// Dropping the guard also cancels the binding's run-scope token, which
/// reaps any tool call still in flight on the rmcp session worker. This
/// matters for interactive waits (`ask_user`, bash approvals, path
/// grants) orphaned by a CLI transport drop: the worker keeps their
/// futures alive past the dropped connection, so without this reap they
/// would pin pending approval entries (and their UI cards) until their
/// own multi-minute timeout.
pub struct BindingGuard {
bindings: Arc<RwLock<HashMap<String, ToolBinding>>>,
bindings: Arc<RwLock<HashMap<String, BoundRun>>>,
token: String,
run_scope: CancellationToken,
}

impl BindingGuard {
Expand All @@ -138,6 +168,10 @@ impl BindingGuard {

impl Drop for BindingGuard {
fn drop(&mut self) {
// Reap in-flight tool calls for this run BEFORE unbinding, so a
// racing request can't observe the binding gone while its
// already-running future survives the run.
self.run_scope.cancel();
if let Ok(mut bindings) = self.bindings.write() {
bindings.remove(&self.token);
}
Expand Down Expand Up @@ -186,7 +220,11 @@ pub async fn ensure_started(app: &AppHandle) -> Result<Arc<LocalMcpRuntime>, Str
// 127.0.0.1 bind, port-agnostically.
let mut config = StreamableHttpServerConfig::default();
config.stateful_mode = true;
config.sse_keep_alive = None;
// Keep rmcp's default sse_keep_alive (15s pings). The
// response stream for an interactive tool call sits idle
// for as long as a human takes to answer, and an unpinged
// idle connection is exactly what rots into "transport
// dropped mid-call; response for tool <name> was lost".
config.cancellation_token = cancellation_token.child_token();
config
},
Expand Down Expand Up @@ -234,7 +272,7 @@ impl ServerHandler for ClaiMcpService {
context: RequestContext<RoleServer>,
) -> impl Future<Output = Result<ListToolsResult, McpError>> + Send + '_ {
async move {
let binding = self.runtime.binding_from_request(&context)?;
let binding = self.runtime.binding_from_request(&context)?.binding;
let session = repository::get_session(&binding.pool, &binding.session_id)
.await
.map_err(|e| McpError::internal_error(e, None))?
Expand Down Expand Up @@ -262,14 +300,22 @@ impl ServerHandler for ClaiMcpService {
context: RequestContext<RoleServer>,
) -> impl Future<Output = Result<CallToolResult, McpError>> + Send + '_ {
async move {
let binding = self.runtime.binding_from_request(&context)?;
let bound = self.runtime.binding_from_request(&context)?;
let tool_name = request.name.to_string();
let params = request
.arguments
.map(serde_json::Value::Object)
.unwrap_or(serde_json::Value::Object(Default::default()));

match execute_bound_tool(&self.runtime.app, &binding, &tool_name, params).await {
match execute_bound_tool(
&self.runtime.app,
&bound.binding,
&bound.run_scope,
&tool_name,
params,
)
.await
{
Ok(value) => Ok(CallToolResult::structured(value)),
Err(error) => Ok(CallToolResult::structured_error(serde_json::json!({
"error": error,
Expand All @@ -292,6 +338,7 @@ impl ServerHandler for ClaiMcpService {
async fn execute_bound_tool(
app: &AppHandle,
binding: &ToolBinding,
run_scope: &CancellationToken,
tool_name: &str,
params: serde_json::Value,
) -> Result<serde_json::Value, String> {
Expand Down Expand Up @@ -347,6 +394,13 @@ async fn execute_bound_tool(

tokio::select! {
_ = binding.cancel_token.cancelled() => Err("run cancelled".to_string()),
// Run-scope reap: fires when `BindingGuard` drops at the end of the
// run. A live CLI always waits for its tool results before ending
// its turn, so the only futures still here at that point are
// orphans whose response stream was lost to a transport drop.
// Dropping them fires their cleanup guards (pending-approval
// removal, `resolved` UI events).
_ = run_scope.cancelled() => Err("run ended before this tool call completed".to_string()),
result = tools::execute_tool(&deps, &tool_context, tool_name, params) => result,
}
}
Expand Down Expand Up @@ -380,3 +434,99 @@ fn bearer_token(context: &RequestContext<RoleServer>) -> Option<String> {
.filter(|value| !value.is_empty())
.map(str::to_string)
}

#[cfg(test)]
mod tests {
use super::*;

fn test_binding() -> ToolBinding {
ToolBinding {
pool: sqlx::Pool::connect_lazy("sqlite::memory:").expect("lazy pool"),
session_id: "session-1".to_string(),
run_id: "run-1".to_string(),
cancel_token: CancellationToken::new(),
inter_agent_call_depth: None,
notices: Arc::new(Mutex::new(Vec::new())),
session_grants: Arc::new(Mutex::new(Vec::new())),
session_allowed_command_prefixes: Arc::new(Mutex::new(Vec::new())),
session_blocked_command_prefixes: Arc::new(Mutex::new(Vec::new())),
}
}

/// Dropping the guard must cancel the run scope (reaping in-flight
/// tool futures racing on it in `execute_bound_tool`) and unbind the
/// bearer token — while leaving the run's own cancel token alone, so
/// reaping orphans at normal run end is not a run cancellation.
#[tokio::test] // tokio: the lazy sqlx pool requires a runtime context on drop
async fn binding_guard_drop_cancels_run_scope_and_unbinds() {
let bindings: Arc<RwLock<HashMap<String, BoundRun>>> =
Arc::new(RwLock::new(HashMap::new()));
let binding = test_binding();
let run_cancel = binding.cancel_token.clone();
let run_scope = CancellationToken::new();
bindings.write().unwrap().insert(
"token-1".to_string(),
BoundRun {
binding,
run_scope: run_scope.clone(),
},
);
let guard = BindingGuard {
bindings: bindings.clone(),
token: "token-1".to_string(),
run_scope: run_scope.clone(),
};

assert!(!run_scope.is_cancelled());
drop(guard);

assert!(
run_scope.is_cancelled(),
"guard drop must reap in-flight tool calls via the run scope"
);
assert!(
bindings.read().unwrap().is_empty(),
"guard drop must unbind the bearer token"
);
assert!(
!run_cancel.is_cancelled(),
"reaping at run end must not look like a run cancellation"
);
}

/// The reap arm in `execute_bound_tool` must drop the racing tool
/// future (firing its cleanup guards), not merely resolve alongside it.
#[tokio::test]
async fn run_scope_cancel_drops_in_flight_future() {
struct DropFlag(Arc<Mutex<bool>>);
impl Drop for DropFlag {
fn drop(&mut self) {
*self.0.lock().unwrap() = true;
}
}

let dropped = Arc::new(Mutex::new(false));
let flag = DropFlag(dropped.clone());
let run_scope = CancellationToken::new();
let scope = run_scope.clone();

let task = tokio::spawn(async move {
let _flag = flag; // owned by the racing future, dropped with it
tokio::select! {
_ = scope.cancelled() => Err::<(), String>("run ended before this tool call completed".to_string()),
_ = std::future::pending::<()>() => Ok(()), // never resolves, like an unanswered approval
}
});

run_scope.cancel();
let result = task.await.expect("select task must not panic");
assert_eq!(
result.unwrap_err(),
"run ended before this tool call completed"
);
assert!(
*dropped.lock().unwrap(),
"the in-flight future must be dropped so its RAII cleanup guards fire"
);
}
}
Loading
Loading