From 6d4b31c536083f09f223ce3d79bb2b90d024e9f6 Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Mon, 15 Jun 2026 12:42:23 +0200 Subject: [PATCH 1/2] refac --- cptr/utils/ai.py | 19 +++- cptr/utils/chat_task.py | 239 ++++++++++++++++++++++++++++++++++------ 2 files changed, 226 insertions(+), 32 deletions(-) diff --git a/cptr/utils/ai.py b/cptr/utils/ai.py index dd1fe81..769a20a 100644 --- a/cptr/utils/ai.py +++ b/cptr/utils/ai.py @@ -476,6 +476,15 @@ async def stream_openai_completions( def _to_responses_input(messages: list[dict], instructions: str) -> list[dict]: """Canonical messages → Responses API input items.""" + # Pre-collect all tool result call_ids so we can validate function_calls + # have matching outputs. This prevents orphaned function_calls (from + # crashes, data corruption, etc.) from permanently breaking the chat. + tool_result_ids = { + m.get("tool_call_id", "") + for m in messages + if m.get("role") == "tool" + } + items = [] for m in messages: role = m["role"] @@ -504,8 +513,16 @@ def _to_responses_input(messages: list[dict], instructions: str) -> list[dict]: for ri in m.get("reasoning_items", []): items.append(ri) for tc in m["tool_calls"]: - args = tc["function"].get("arguments", "{}") call_id = tc.get("id", "") + # Skip function_calls that have no matching tool result + if call_id and call_id not in tool_result_ids: + logger.warning( + "[responses] Skipping orphaned function_call %s (%s) — no matching tool result", + call_id, + tc.get("function", {}).get("name", "?"), + ) + continue + args = tc["function"].get("arguments", "{}") # Responses API requires id to start with "fc_" fc_id = tc.get("fc_id", "") if not fc_id or not fc_id.startswith("fc_"): diff --git a/cptr/utils/chat_task.py b/cptr/utils/chat_task.py index aad0e81..37c470f 100644 --- a/cptr/utils/chat_task.py +++ b/cptr/utils/chat_task.py @@ -594,12 +594,56 @@ async def _load_message_history(chat_id: str, message_id: str) -> tuple[list[dic elif text_content != entry["content"]: entry["content"] = text_content - # Reconstruct tool calls from output items for the provider + # Reconstruct tool calls from output items for the provider. + # + # Output items accumulate across agentic-loop iterations within a + # single assistant message. The Responses API (reasoning models) + # requires that each function_call is paired with the reasoning + # items from the *same* API response. To reconstruct the correct + # interleaved history we group output items into "turns": + # + # Turn = [reasoning…, function_call…, function_call_output…] + # + # Each turn produces one assistant message (with tool_calls + + # reasoning_items) followed by the corresponding tool-result + # messages. if m.output: - tool_calls = [] - reasoning_items = [] + # ── Collect the set of call_ids that have outputs ── + # This is needed to filter out orphaned function_calls + # (e.g. from crashes, partial persistence, or data corruption). + output_call_ids = { + item["call_id"] + for item in m.output + if item.get("type") == "function_call_output" + } + + # ── Group output items into per-iteration turns ── + turns: list[dict] = [] # each: {reasoning: [], calls: [], outputs: []} + current_turn: dict = {"reasoning": [], "calls": [], "outputs": []} + for item in m.output: - if item.get("type") == "function_call" and item.get("status") == "completed": + itype = item.get("type") + if itype == "reasoning": + # A reasoning item after we already have outputs means + # a new API iteration started — flush the current turn. + if current_turn["outputs"]: + turns.append(current_turn) + current_turn = {"reasoning": [], "calls": [], "outputs": []} + current_turn["reasoning"].append(item) + elif itype == "function_call" and item.get("status") == "completed": + # Only include calls that have a matching output + if item["call_id"] not in output_call_ids: + logger.warning( + "[history] Skipping orphaned function_call %s (%s) — no matching output", + item.get("call_id", "?"), + item.get("name", "?"), + ) + continue + # A new function_call after outputs means + # a new iteration (model saw results and called again). + if current_turn["outputs"]: + turns.append(current_turn) + current_turn = {"reasoning": [], "calls": [], "outputs": []} tc = { "id": item["call_id"], "type": "function", @@ -611,23 +655,42 @@ async def _load_message_history(chat_id: str, message_id: str) -> tuple[list[dic # Preserve Responses API fc_ ID for round-tripping if item.get("fc_id"): tc["fc_id"] = item["fc_id"] - tool_calls.append(tc) - elif item.get("type") == "reasoning": - reasoning_items.append(item) - if tool_calls: - entry["tool_calls"] = tool_calls - if reasoning_items: - entry["reasoning_items"] = reasoning_items - - # Add tool results as separate messages - for item in m.output: - if item.get("type") == "function_call_output": - result.append(entry) - entry = { - "role": "tool", - "tool_call_id": item["call_id"], - "content": item.get("output", ""), - } + current_turn["calls"].append(tc) + elif itype == "function_call_output": + current_turn["outputs"].append(item) + # Skip other types (message, artifact, pending calls, etc.) + + # Don't forget the last turn + if current_turn["calls"] or current_turn["outputs"]: + turns.append(current_turn) + + if not turns: + # No tool calls — keep entry as-is (plain text assistant) + pass + else: + for ti, turn in enumerate(turns): + if turn["calls"]: + if ti == 0: + # First turn: attach to the existing entry + entry["tool_calls"] = turn["calls"] + if turn["reasoning"]: + entry["reasoning_items"] = turn["reasoning"] + else: + # Subsequent turns: create a new assistant entry + entry = { + "role": "assistant", + "content": "", + "tool_calls": turn["calls"], + } + if turn["reasoning"]: + entry["reasoning_items"] = turn["reasoning"] + for out in turn["outputs"]: + result.append(entry) + entry = { + "role": "tool", + "tool_call_id": out["call_id"], + "content": out.get("output", ""), + } result.append(entry) return result, existing_summary @@ -714,6 +777,96 @@ def _append_tool_to_messages( ) +def _append_batch_to_messages( + messages: list[dict], + call_results: list[tuple[dict, str]], + provider: str, + reasoning_items: list[dict] | None = None, +): + """Append multiple tool calls + results as a single assistant message. + + Unlike _append_tool_to_messages (which creates one assistant message per + call), this batches all calls into one assistant message. This is + required for the Responses API with reasoning models, where all + function_calls from one API response share the same reasoning items. + """ + if not call_results: + return + + tool_calls = [] + for event, _ in call_results: + tool_calls.append( + { + "id": event["call_id"], + "fc_id": event.get("id", ""), + "type": "function", + "function": { + "name": event["name"], + "arguments": json.dumps(event["arguments"]), + }, + } + ) + + assistant_msg: dict = { + "role": "assistant", + "content": "", + "tool_calls": tool_calls, + } + if reasoning_items: + assistant_msg["reasoning_items"] = reasoning_items + messages.append(assistant_msg) + + for event, result in call_results: + # Guard against oversized tool outputs + image = _parse_image_data_uri(result) + if not image: + if len(result) > CHAT_TOOL_MAX_CHARS: + half = CHAT_TOOL_MAX_CHARS // 2 + result = result[:half] + "\n\n...(truncated)...\n\n" + result[-half:] + + if image: + media_type, b64_data = image + path = event["arguments"].get("path", "image") + messages.append( + { + "role": "tool", + "tool_call_id": event["call_id"], + "content": [ + {"type": "text", "text": f"Image file: {path}"}, + { + "type": "image", + "media_type": media_type, + "base64": b64_data, + }, + ], + } + ) + else: + messages.append( + { + "role": "tool", + "tool_call_id": event["call_id"], + "content": result, + } + ) + + +def _scrub_incomplete_items(output_items: list[dict]) -> None: + """Mark any in-progress function_call items as 'failed'. + + Called before persisting output_items after an error or cancellation. + Without this, "in_progress" items stay in the DB. While + _load_message_history skips non-"completed" items today, marking + them explicitly prevents any future code from misinterpreting them. + """ + for item in output_items: + if item.get("type") == "function_call" and item.get("status") in ( + "in_progress", + "pending", + ): + item["status"] = "failed" + + def _find_safe_split(messages: list[dict], target_keep: int) -> int: """Find a safe split index that doesn't break tool call pairs. @@ -1172,6 +1325,8 @@ def _sync_state(): other_indices = [i for i, (tc, _) in enumerate(call_items) if tc["name"] != "delegate_task"] # Execute non-delegate tools sequentially first + # Collect results for batch message construction + sequential_results: list[tuple[dict, str]] = [] # (event, result) for idx in other_indices: tc, item = call_items[idx] if tc["name"] == "create_artifact": @@ -1179,13 +1334,17 @@ def _sync_state(): else: result = await execute_tool(tc["name"], tc["arguments"], tool_ctx) - item["status"] = "completed" + # Append output BEFORE marking completed — if anything + # between here and persist fails, the call stays + # "in_progress" (safely skipped on reload) rather than + # "completed" without an output (corrupts history). result_item = { "type": "function_call_output", "call_id": tc["call_id"], "output": result, } output_items.append(result_item) + item["status"] = "completed" await emit(output=item) await emit(output=result_item) _sync_state() @@ -1196,10 +1355,16 @@ def _sync_state(): await emit(output=artifact_item) _sync_state() - # Only attach reasoning to the first tool call message - ri = pending_reasoning if idx == other_indices[0] else None - _append_tool_to_messages(messages, tc, result, provider, reasoning_items=ri) - new_messages_since += 2 + sequential_results.append((tc, result)) + + # Build a single combined assistant message for all sequential calls + # with their shared reasoning items (required for reasoning model round-tripping) + if sequential_results: + _append_batch_to_messages( + messages, sequential_results, provider, + reasoning_items=pending_reasoning, + ) + new_messages_since += 1 + len(sequential_results) # Execute delegate_task calls concurrently, emit each as it completes if delegate_indices: @@ -1212,6 +1377,7 @@ def _sync_state(): ) inflight[task] = idx + delegate_results: list[tuple[dict, str]] = [] while inflight: done_set, _ = await asyncio.wait( inflight.keys(), return_when=asyncio.FIRST_COMPLETED @@ -1224,20 +1390,29 @@ def _sync_state(): except Exception as e: result = f"Error: {e}" - item["status"] = "completed" + # Append output BEFORE marking completed (same + # ordering fix as sequential path above). result_item = { "type": "function_call_output", "call_id": tc["call_id"], "output": result, } output_items.append(result_item) + item["status"] = "completed" await emit(output=item) await emit(output=result_item) _sync_state() - # Attach reasoning to first delegate call if no sequential calls consumed it - ri = pending_reasoning if not other_indices and idx == delegate_indices[0] else None - _append_tool_to_messages(messages, tc, result, provider, reasoning_items=ri) - new_messages_since += 2 + delegate_results.append((tc, result)) + + # Build combined message for all delegate calls + if delegate_results: + # Only attach reasoning if sequential calls didn't consume it + ri = pending_reasoning if not other_indices else None + _append_batch_to_messages( + messages, delegate_results, provider, + reasoning_items=ri, + ) + new_messages_since += 1 + len(delegate_results) # Persist after all tool calls await ChatMessage.update(message_id, content=content, output=output_items) @@ -1277,6 +1452,7 @@ def _sync_state(): except asyncio.CancelledError: _flush_text() + _scrub_incomplete_items(output_items) await ChatMessage.update(message_id, content=content, output=output_items, done=True) _task_state.pop(message_id, None) await _emit_done() @@ -1304,6 +1480,7 @@ def _sync_state(): flushed_item = _flush_text() if flushed_item: await emit(output=flushed_item) + _scrub_incomplete_items(output_items) await ChatMessage.update( message_id, content=content, From 19d26cfba2faae1c5c4edbc3c8e935f3af9bb7f5 Mon Sep 17 00:00:00 2001 From: Timothy Jaeryang Baek Date: Mon, 15 Jun 2026 12:44:52 +0200 Subject: [PATCH 2/2] refac --- CHANGELOG.md | 9 +++++++++ pyproject.toml | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d9596a8..beb6fc4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.4.5] - 2026-06-15 + +### Fixed + +- 🧠 **Reasoning models no longer lose chain of thought across parallel tool calls.** Multiple tool calls in the same turn are now grouped into a single assistant message with shared reasoning items, instead of being split into separate messages that broke reasoning model round-tripping (e.g. o3, o4-mini). +- 🛡️ **Orphaned tool calls no longer break chat history.** If a crash or data corruption left a function call without a matching result, the Responses API conversion now skips the orphaned call instead of sending invalid input that caused permanent 400 errors. +- 🔄 **Tool output ordering prevents corrupted history on crash.** Tool call outputs are now appended to the output list *before* marking the call as "completed", so a crash between the two steps can no longer produce a "completed" call with no output — which would corrupt the message history on reload. +- 🧹 **In-progress tool calls scrubbed on error or cancellation.** When a chat task is cancelled or hits an error, any tool calls still marked "in_progress" are now set to "failed" before persisting, preventing stale in-progress items from lingering in the database. + ## [0.4.4] - 2026-06-15 ### Added diff --git a/pyproject.toml b/pyproject.toml index 5439821..551b42a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "cptr" -version = "0.4.4" +version = "0.4.5" description = "Your computer, from anywhere. Code, manage, and control your machine from the web." license = {file = "LICENSE"} readme = "README.md"