From 80cd2794b94b03ae2bcd6ebb40f81a4efdc7803b Mon Sep 17 00:00:00 2001 From: nonoqing Date: Mon, 11 May 2026 11:32:50 +0800 Subject: [PATCH] feat(ai-adapters,agentic): recover Write tool args truncated by max_tokens MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the model hits its max_tokens limit while streaming a Write tool call, the accumulated arguments end inside an unterminated string and fail JSON parsing. Previously this surfaced as "Arguments are invalid JSON" and the deep-research agent would retry from scratch, hitting the same cap again. - Add repair_truncated_json in the tool-call accumulator: closes the open string and any unclosed {/[ brackets. Refuses to fabricate values when truncated mid-pair (trailing ',' or ':'). - Gate recovery by tool name (Write/file_write/write_notebook). Bash, Edit, Task etc. still fail loudly — executing a partial shell command or a partial old_string is unsafe. - Plumb a recovered_from_truncation flag through FinalizedToolCall -> ToolCall (#[serde(default)] for back-compat) -> tool pipeline. - When recovery succeeds, prepend a warning to result_for_assistant instructing the agent to continue with a follow-up call rather than rewriting the whole file. - When recovery isn't possible for an unsafe tool, surface a clearer error message naming max_tokens instead of generic "invalid JSON". - 8 new accumulator tests cover the Write recovery path, the Bash refusal path, multibyte content, nested-bracket repair, and the refuse-mid-pair guards. All 23 existing tests still pass. Co-Authored-By: Claude Opus 4.7 --- .../ai-adapters/src/tool_call_accumulator.rs | 270 ++++++++++++++++-- src/crates/core/src/agentic/core/message.rs | 8 +- .../src/agentic/execution/execution_engine.rs | 2 + .../src/agentic/execution/round_executor.rs | 2 + .../src/agentic/execution/stream_processor.rs | 1 + .../core/src/agentic/insights/collector.rs | 1 + .../agentic/session/compression/compressor.rs | 1 + .../session/compression/fallback/tests.rs | 3 + .../agentic/tools/pipeline/state_manager.rs | 1 + .../agentic/tools/pipeline/tool_pipeline.rs | 32 +++ 10 files changed, 304 insertions(+), 17 deletions(-) diff --git a/src/crates/ai-adapters/src/tool_call_accumulator.rs b/src/crates/ai-adapters/src/tool_call_accumulator.rs index 01c5ebc2f..6a1687295 100644 --- a/src/crates/ai-adapters/src/tool_call_accumulator.rs +++ b/src/crates/ai-adapters/src/tool_call_accumulator.rs @@ -1,4 +1,4 @@ -use log::error; +use log::{error, warn}; use serde_json::{json, Value}; use std::collections::BTreeMap; @@ -53,6 +53,11 @@ pub struct FinalizedToolCall { pub raw_arguments: String, pub arguments: Value, pub is_error: bool, + /// True when the raw stream produced unparseable JSON (e.g. truncated by + /// `max_tokens`) and we successfully patched the trailing brackets/strings + /// to make it parse. The recovered call still executes, but downstream + /// consumers should warn the model that the content may be incomplete. + pub recovered_from_truncation: bool, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -80,6 +85,100 @@ pub struct PendingToolCalls { pending: BTreeMap, } +/// Tools where executing a truncated tool call is **safe and meaningful** — +/// the model intended to write content and a partial file is strictly more +/// useful than a hard failure. For everything else (Bash, Edit, Task, ...) we +/// surface the truncation as an error: a partial shell command or a partial +/// `old_string`/`new_string` for Edit can change semantics destructively. +fn is_truncation_safe_to_recover(tool_name: &str) -> bool { + matches!(tool_name, "Write" | "file_write" | "write_notebook") +} + +/// Attempt to repair a JSON document that was truncated mid-stream (typically +/// because the model hit `max_tokens`). Closes any open string literal and any +/// unclosed `{`/`[` brackets in their correct nesting order. Returns `None` +/// when the truncation occurs at a position where we would have to invent a +/// missing value (e.g. trailing `,` or `:`) since blindly closing in those +/// states would silently corrupt the semantics. +fn repair_truncated_json(raw: &str) -> Option { + let mut in_string = false; + let mut escape = false; + let mut stack: Vec = Vec::new(); + let mut last_significant: Option = None; + + for &b in raw.as_bytes() { + if escape { + escape = false; + continue; + } + if in_string { + match b { + b'\\' => escape = true, + b'"' => { + in_string = false; + last_significant = Some(b'"'); + } + _ => {} + } + continue; + } + match b { + b'"' => { + in_string = true; + last_significant = Some(b'"'); + } + b'{' => { + stack.push(b'{'); + last_significant = Some(b'{'); + } + b'[' => { + stack.push(b'['); + last_significant = Some(b'['); + } + b'}' => { + if stack.pop() != Some(b'{') { + return None; + } + last_significant = Some(b'}'); + } + b']' => { + if stack.pop() != Some(b'[') { + return None; + } + last_significant = Some(b']'); + } + b' ' | b'\t' | b'\n' | b'\r' => {} + other => last_significant = Some(other), + } + } + + // Nothing to repair (parser failed for some other reason). + if !in_string && stack.is_empty() { + return None; + } + + // Refuse to fabricate values when truncated mid-pair. + if !in_string { + if let Some(b',') | Some(b':') = last_significant { + return None; + } + } + + let mut out = String::with_capacity(raw.len() + stack.len() + 1); + out.push_str(raw); + if in_string { + out.push('"'); + } + while let Some(c) = stack.pop() { + out.push(match c { + b'{' => '}', + b'[' => ']', + _ => unreachable!(), + }); + } + Some(out) +} + impl PendingToolCall { fn parse_arguments(_tool_name: &str, raw_arguments: &str) -> Result { serde_json::from_str::(raw_arguments).map_err(|error| error.to_string()) @@ -141,25 +240,58 @@ impl PendingToolCall { let raw_arguments = std::mem::take(&mut self.raw_arguments); self.early_detected_emitted = false; let parsed_arguments = Self::parse_arguments(&tool_name, &raw_arguments); - let is_error = parsed_arguments.is_err(); - - if let Err(error) = &parsed_arguments { - error!( - "Tool call arguments parsing failed at boundary={}: tool_id={}, tool_name={}, error={}, raw_arguments={}", - boundary.as_str(), - tool_id, - tool_name, - error, - raw_arguments - ); - } + + let (arguments, is_error, recovered_from_truncation) = match parsed_arguments { + Ok(value) => (value, false, false), + Err(parse_err) => { + let repaired = repair_truncated_json(&raw_arguments) + .and_then(|candidate| Self::parse_arguments(&tool_name, &candidate).ok()); + match repaired { + Some(value) if is_truncation_safe_to_recover(&tool_name) => { + warn!( + "Tool call arguments recovered from truncation at boundary={}: tool_id={}, tool_name={}, raw_len={}", + boundary.as_str(), + tool_id, + tool_name, + raw_arguments.len() + ); + (value, false, true) + } + Some(_) => { + // We *could* repair but the tool's semantics make + // executing a partial call unsafe (Bash, Edit, ...). + // Surface as an error so the user/model knows the + // truncation happened and can retry sensibly. + warn!( + "Tool call arguments truncated at boundary={}: tool_id={}, tool_name={} — refusing to execute partial call (tool not in safe-recovery list)", + boundary.as_str(), + tool_id, + tool_name + ); + (json!({}), true, true) + } + None => { + error!( + "Tool call arguments parsing failed at boundary={}: tool_id={}, tool_name={}, error={}, raw_arguments={}", + boundary.as_str(), + tool_id, + tool_name, + parse_err, + raw_arguments + ); + (json!({}), true, false) + } + } + } + }; Some(FinalizedToolCall { tool_id, tool_name, raw_arguments, - arguments: parsed_arguments.unwrap_or_else(|_| json!({})), + arguments, is_error, + recovered_from_truncation, }) } } @@ -252,8 +384,8 @@ impl PendingToolCalls { #[cfg(test)] mod tests { use super::{ - EarlyDetectedToolCall, PendingToolCall, PendingToolCalls, ToolCallBoundary, - ToolCallParamsChunk, ToolCallStreamKey, + repair_truncated_json, EarlyDetectedToolCall, PendingToolCall, PendingToolCalls, + ToolCallBoundary, ToolCallParamsChunk, ToolCallStreamKey, }; use serde_json::json; @@ -698,4 +830,110 @@ mod tests { assert!(empty_delta.early_detected.is_none()); assert!(empty_delta.params_partial.is_none()); } + + // ------------------------------------------------------------------ + // Truncation recovery tests + // ------------------------------------------------------------------ + + #[test] + fn write_truncated_mid_content_string_is_recovered() { + // Reproduces the deep-research dump: the model hit max_tokens while + // streaming `content`, so the JSON ends inside the string literal + // with no closing `"` and no closing `}`. + let raw = "{\"file_path\": \"/tmp/report.md\", \"content\": \"# Report\\n\\nA long body that was cut"; + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Write".to_string())); + pending.append_arguments(raw); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert!(!finalized.is_error, "Write recovery should succeed"); + assert!(finalized.recovered_from_truncation); + assert_eq!( + finalized.arguments, + json!({ + "file_path": "/tmp/report.md", + "content": "# Report\n\nA long body that was cut" + }) + ); + } + + #[test] + fn write_truncated_with_chinese_multibyte_is_recovered() { + let raw = "{\"file_path\": \"/tmp/r.md\", \"content\": \"深度研究报告:未完"; + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Write".to_string())); + pending.append_arguments(raw); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + assert!(!finalized.is_error); + assert!(finalized.recovered_from_truncation); + assert_eq!( + finalized.arguments["content"].as_str(), + Some("深度研究报告:未完") + ); + } + + #[test] + fn bash_truncated_mid_command_still_errors_but_records_truncation() { + let raw = r#"{"command": "git log --since=\"2026-05-02\" --on"#; + let mut pending = PendingToolCall::default(); + pending.start_new("call_1".to_string(), Some("Bash".to_string())); + pending.append_arguments(raw); + + let finalized = pending + .finalize(ToolCallBoundary::FinishReason) + .expect("finalized tool"); + + // We never execute a partial shell command. + assert!(finalized.is_error); + assert_eq!(finalized.arguments, json!({})); + // But the truncation is recorded so the surface error message and + // diagnostic dump can distinguish "truncated" from "model emitted + // bad JSON". + assert!(finalized.recovered_from_truncation); + } + + #[test] + fn repair_refuses_truncation_after_colon() { + // We can't invent the missing value, so this must not auto-repair. + assert!(repair_truncated_json(r#"{"a": 1, "b":"#).is_none()); + } + + #[test] + fn repair_refuses_truncation_after_comma() { + assert!(repair_truncated_json(r#"{"a": 1,"#).is_none()); + } + + #[test] + fn repair_returns_none_for_already_valid_json() { + // Already balanced — repair has nothing to do (parser would have + // succeeded anyway). + assert!(repair_truncated_json(r#"{"a": 1}"#).is_none()); + } + + #[test] + fn repair_closes_nested_brackets_in_correct_order() { + let raw = r#"{"a": [1, 2, {"b": "incomplete"#; + let repaired = repair_truncated_json(raw).expect("repaired"); + let parsed: serde_json::Value = + serde_json::from_str(&repaired).expect("repaired is valid JSON"); + assert_eq!(parsed, json!({"a": [1, 2, {"b": "incomplete"}]})); + } + + #[test] + fn repair_preserves_escaped_quote_inside_truncated_string() { + let raw = r#"{"content": "she said \"hello\" and then"#; + let repaired = repair_truncated_json(raw).expect("repaired"); + let parsed: serde_json::Value = serde_json::from_str(&repaired).expect("valid JSON"); + assert_eq!( + parsed["content"].as_str(), + Some("she said \"hello\" and then") + ); + } } diff --git a/src/crates/core/src/agentic/core/message.rs b/src/crates/core/src/agentic/core/message.rs index d1f317535..387bd1b68 100644 --- a/src/crates/core/src/agentic/core/message.rs +++ b/src/crates/core/src/agentic/core/message.rs @@ -626,7 +626,7 @@ mod tests { // ============ Tool Calls and Results ============ -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct ToolCall { pub tool_id: String, pub tool_name: String, @@ -635,7 +635,13 @@ pub struct ToolCall { #[serde(default, skip_serializing_if = "Option::is_none")] pub raw_arguments: Option, /// Record whether tool parameters are valid + #[serde(default)] pub is_error: bool, + /// True when the raw JSON arguments were truncated mid-stream and we + /// successfully repaired them. Downstream consumers can flag this to the + /// model so it understands the content may be incomplete. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub recovered_from_truncation: bool, } impl ToolCall { diff --git a/src/crates/core/src/agentic/execution/execution_engine.rs b/src/crates/core/src/agentic/execution/execution_engine.rs index eb7e61d4c..fcddb59b5 100644 --- a/src/crates/core/src/agentic/execution/execution_engine.rs +++ b/src/crates/core/src/agentic/execution/execution_engine.rs @@ -2144,6 +2144,7 @@ mod tests { arguments: json!({ "path": "README.md" }), raw_arguments: None, is_error: false, + recovered_from_truncation: false, }], ); @@ -2163,6 +2164,7 @@ mod tests { arguments: json!({ "path": "README.md" }), raw_arguments: None, is_error: false, + recovered_from_truncation: false, }], ); let tool_result = Message::tool_result(ToolResult { diff --git a/src/crates/core/src/agentic/execution/round_executor.rs b/src/crates/core/src/agentic/execution/round_executor.rs index ab744b7a3..a95766fd4 100644 --- a/src/crates/core/src/agentic/execution/round_executor.rs +++ b/src/crates/core/src/agentic/execution/round_executor.rs @@ -971,6 +971,7 @@ mod tests { arguments: serde_json::json!({}), raw_arguments: Some("{\"file_path\":\"src/lib.rs\"".to_string()), is_error: true, + recovered_from_truncation: false, }], usage: None, provider_metadata: None, @@ -996,6 +997,7 @@ mod tests { arguments: serde_json::json!({}), raw_arguments: Some("{\"file_path\":\"src/lib.rs\"".to_string()), is_error: true, + recovered_from_truncation: false, }], usage: None, provider_metadata: None, diff --git a/src/crates/core/src/agentic/execution/stream_processor.rs b/src/crates/core/src/agentic/execution/stream_processor.rs index 4d5a6d21f..c0822b919 100644 --- a/src/crates/core/src/agentic/execution/stream_processor.rs +++ b/src/crates/core/src/agentic/execution/stream_processor.rs @@ -265,6 +265,7 @@ impl StreamContext { raw_arguments: (!finalized.raw_arguments.is_empty()) .then_some(finalized.raw_arguments.clone()), is_error: finalized.is_error, + recovered_from_truncation: finalized.recovered_from_truncation, }); } diff --git a/src/crates/core/src/agentic/insights/collector.rs b/src/crates/core/src/agentic/insights/collector.rs index 33f5f9570..da8367278 100644 --- a/src/crates/core/src/agentic/insights/collector.rs +++ b/src/crates/core/src/agentic/insights/collector.rs @@ -449,6 +449,7 @@ fn rebuild_messages_from_turns(turns: &[DialogTurnData]) -> Vec { arguments: ti.tool_call.input.clone(), raw_arguments: None, is_error: false, + recovered_from_truncation: false, }) .collect(); diff --git a/src/crates/core/src/agentic/session/compression/compressor.rs b/src/crates/core/src/agentic/session/compression/compressor.rs index c3bc8b690..3426fab3c 100644 --- a/src/crates/core/src/agentic/session/compression/compressor.rs +++ b/src/crates/core/src/agentic/session/compression/compressor.rs @@ -757,6 +757,7 @@ mod tests { }), raw_arguments: None, is_error: false, + recovered_from_truncation: false, }], ), ]) diff --git a/src/crates/core/src/agentic/session/compression/fallback/tests.rs b/src/crates/core/src/agentic/session/compression/fallback/tests.rs index e90e006ef..29b7fa840 100644 --- a/src/crates/core/src/agentic/session/compression/fallback/tests.rs +++ b/src/crates/core/src/agentic/session/compression/fallback/tests.rs @@ -29,6 +29,7 @@ fn clears_tool_results_from_compressed_history() { }), raw_arguments: None, is_error: false, + recovered_from_truncation: false, }], ); let tool_result = Message::tool_result(ToolResult { @@ -142,6 +143,7 @@ fn groups_consecutive_assistant_messages_under_single_role_header() { }), raw_arguments: None, is_error: false, + recovered_from_truncation: false, }], ), Message::assistant_with_tools( @@ -156,6 +158,7 @@ fn groups_consecutive_assistant_messages_under_single_role_header() { }), raw_arguments: None, is_error: false, + recovered_from_truncation: false, }], ), Message::assistant("Updated the styling changes.".to_string()), diff --git a/src/crates/core/src/agentic/tools/pipeline/state_manager.rs b/src/crates/core/src/agentic/tools/pipeline/state_manager.rs index 7c78fbb86..a7884a26c 100644 --- a/src/crates/core/src/agentic/tools/pipeline/state_manager.rs +++ b/src/crates/core/src/agentic/tools/pipeline/state_manager.rs @@ -306,6 +306,7 @@ mod tests { arguments: serde_json::json!({}), raw_arguments: None, is_error: false, + recovered_from_truncation: false, }, ToolExecutionContext { session_id: "session-1".to_string(), diff --git a/src/crates/core/src/agentic/tools/pipeline/tool_pipeline.rs b/src/crates/core/src/agentic/tools/pipeline/tool_pipeline.rs index f9b3736b8..5a6ebfc83 100644 --- a/src/crates/core/src/agentic/tools/pipeline/tool_pipeline.rs +++ b/src/crates/core/src/agentic/tools/pipeline/tool_pipeline.rs @@ -566,6 +566,7 @@ impl ToolPipeline { let tool_name = task.tool_call.tool_name.clone(); let tool_args = task.tool_call.arguments.clone(); let tool_is_error = task.tool_call.is_error; + let recovered_from_truncation = task.tool_call.recovered_from_truncation; let queue_wait_ms = elapsed_ms_since(task.created_at); let mut confirmation_wait_ms = 0; @@ -574,6 +575,13 @@ impl ToolPipeline { tool_name, tool_id, queue_wait_ms ); + if recovered_from_truncation { + warn!( + "Tool '{}' arguments were recovered from a truncated stream (tool_id={}, session_id={}). Executing with patched arguments — content may be incomplete.", + tool_name, tool_id, task.context.session_id + ); + } + if tool_name.is_empty() || tool_is_error { let raw_arguments_preview = task .tool_call @@ -584,6 +592,11 @@ impl ToolPipeline { "Missing valid tool name and arguments are invalid JSON.".to_string() } else if tool_name.is_empty() { "Missing valid tool name.".to_string() + } else if recovered_from_truncation { + format!( + "Tool arguments were truncated by the model (likely hit max_tokens). Refusing to execute a partial '{}' call. Increase max_tokens, split the work into smaller calls, or retry.", + tool_name + ) } else { "Arguments are invalid JSON.".to_string() }; @@ -592,6 +605,7 @@ impl ToolPipeline { } else { error_msg }; + self.state_manager .update_state( &tool_id, @@ -902,6 +916,23 @@ impl ToolPipeline { let mut tool_result = tool_result; tool_result.duration_ms = Some(duration_ms); + // The tool call succeeded with arguments that we patched + // because the model's output was truncated mid-stream. Tell + // the model so it can decide whether the partial write needs + // to be continued or regenerated. + if recovered_from_truncation { + let original = tool_result.result_for_assistant.unwrap_or_default(); + let notice = format!( + "[WARNING: tool arguments were truncated by the model (likely hit max_tokens) and were auto-repaired before this {} call executed. The written content stops at the truncation point and may be incomplete; verify the result and, if needed, continue with a follow-up call that appends the remaining content (do not rewrite the whole file from scratch). Original tool result follows.]\n\n", + tool_name + ); + tool_result.result_for_assistant = Some(if original.is_empty() { + notice.trim_end().to_string() + } else { + format!("{notice}{original}") + }); + } + self.state_manager .update_state( &tool_id, @@ -1390,6 +1421,7 @@ mod tests { arguments: json!({ "path": "src/main.rs" }), raw_arguments: None, is_error: false, + recovered_from_truncation: false, }, ToolExecutionContext { session_id: "session_1".to_string(),