Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.4.5] - 2026-06-15

### Fixed

- 🧠 **Reasoning models no longer lose chain of thought across parallel tool calls.** Multiple tool calls in the same turn are now grouped into a single assistant message with shared reasoning items, instead of being split into separate messages that broke reasoning model round-tripping (e.g. o3, o4-mini).
- 🛡️ **Orphaned tool calls no longer break chat history.** If a crash or data corruption left a function call without a matching result, the Responses API conversion now skips the orphaned call instead of sending invalid input that caused permanent 400 errors.
- 🔄 **Tool output ordering prevents corrupted history on crash.** Tool call outputs are now appended to the output list *before* marking the call as "completed", so a crash between the two steps can no longer produce a "completed" call with no output — which would corrupt the message history on reload.
- 🧹 **In-progress tool calls scrubbed on error or cancellation.** When a chat task is cancelled or hits an error, any tool calls still marked "in_progress" are now set to "failed" before persisting, preventing stale in-progress items from lingering in the database.

## [0.4.4] - 2026-06-15

### Added
Expand Down
19 changes: 18 additions & 1 deletion cptr/utils/ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,15 @@ async def stream_openai_completions(

def _to_responses_input(messages: list[dict], instructions: str) -> list[dict]:
"""Canonical messages → Responses API input items."""
# Pre-collect all tool result call_ids so we can validate function_calls
# have matching outputs. This prevents orphaned function_calls (from
# crashes, data corruption, etc.) from permanently breaking the chat.
tool_result_ids = {
m.get("tool_call_id", "")
for m in messages
if m.get("role") == "tool"
}

items = []
for m in messages:
role = m["role"]
Expand Down Expand Up @@ -504,8 +513,16 @@ def _to_responses_input(messages: list[dict], instructions: str) -> list[dict]:
for ri in m.get("reasoning_items", []):
items.append(ri)
for tc in m["tool_calls"]:
args = tc["function"].get("arguments", "{}")
call_id = tc.get("id", "")
# Skip function_calls that have no matching tool result
if call_id and call_id not in tool_result_ids:
logger.warning(
"[responses] Skipping orphaned function_call %s (%s) — no matching tool result",
call_id,
tc.get("function", {}).get("name", "?"),
)
continue
args = tc["function"].get("arguments", "{}")
# Responses API requires id to start with "fc_"
fc_id = tc.get("fc_id", "")
if not fc_id or not fc_id.startswith("fc_"):
Expand Down
239 changes: 208 additions & 31 deletions cptr/utils/chat_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,12 +594,56 @@ async def _load_message_history(chat_id: str, message_id: str) -> tuple[list[dic
elif text_content != entry["content"]:
entry["content"] = text_content

# Reconstruct tool calls from output items for the provider
# Reconstruct tool calls from output items for the provider.
#
# Output items accumulate across agentic-loop iterations within a
# single assistant message. The Responses API (reasoning models)
# requires that each function_call is paired with the reasoning
# items from the *same* API response. To reconstruct the correct
# interleaved history we group output items into "turns":
#
# Turn = [reasoning…, function_call…, function_call_output…]
#
# Each turn produces one assistant message (with tool_calls +
# reasoning_items) followed by the corresponding tool-result
# messages.
if m.output:
tool_calls = []
reasoning_items = []
# ── Collect the set of call_ids that have outputs ──
# This is needed to filter out orphaned function_calls
# (e.g. from crashes, partial persistence, or data corruption).
output_call_ids = {
item["call_id"]
for item in m.output
if item.get("type") == "function_call_output"
}

# ── Group output items into per-iteration turns ──
turns: list[dict] = [] # each: {reasoning: [], calls: [], outputs: []}
current_turn: dict = {"reasoning": [], "calls": [], "outputs": []}

for item in m.output:
if item.get("type") == "function_call" and item.get("status") == "completed":
itype = item.get("type")
if itype == "reasoning":
# A reasoning item after we already have outputs means
# a new API iteration started — flush the current turn.
if current_turn["outputs"]:
turns.append(current_turn)
current_turn = {"reasoning": [], "calls": [], "outputs": []}
current_turn["reasoning"].append(item)
elif itype == "function_call" and item.get("status") == "completed":
# Only include calls that have a matching output
if item["call_id"] not in output_call_ids:
logger.warning(
"[history] Skipping orphaned function_call %s (%s) — no matching output",
item.get("call_id", "?"),
item.get("name", "?"),
)
continue
# A new function_call after outputs means
# a new iteration (model saw results and called again).
if current_turn["outputs"]:
turns.append(current_turn)
current_turn = {"reasoning": [], "calls": [], "outputs": []}
tc = {
"id": item["call_id"],
"type": "function",
Expand All @@ -611,23 +655,42 @@ async def _load_message_history(chat_id: str, message_id: str) -> tuple[list[dic
# Preserve Responses API fc_ ID for round-tripping
if item.get("fc_id"):
tc["fc_id"] = item["fc_id"]
tool_calls.append(tc)
elif item.get("type") == "reasoning":
reasoning_items.append(item)
if tool_calls:
entry["tool_calls"] = tool_calls
if reasoning_items:
entry["reasoning_items"] = reasoning_items

# Add tool results as separate messages
for item in m.output:
if item.get("type") == "function_call_output":
result.append(entry)
entry = {
"role": "tool",
"tool_call_id": item["call_id"],
"content": item.get("output", ""),
}
current_turn["calls"].append(tc)
elif itype == "function_call_output":
current_turn["outputs"].append(item)
# Skip other types (message, artifact, pending calls, etc.)

# Don't forget the last turn
if current_turn["calls"] or current_turn["outputs"]:
turns.append(current_turn)

if not turns:
# No tool calls — keep entry as-is (plain text assistant)
pass
else:
for ti, turn in enumerate(turns):
if turn["calls"]:
if ti == 0:
# First turn: attach to the existing entry
entry["tool_calls"] = turn["calls"]
if turn["reasoning"]:
entry["reasoning_items"] = turn["reasoning"]
else:
# Subsequent turns: create a new assistant entry
entry = {
"role": "assistant",
"content": "",
"tool_calls": turn["calls"],
}
if turn["reasoning"]:
entry["reasoning_items"] = turn["reasoning"]
for out in turn["outputs"]:
result.append(entry)
entry = {
"role": "tool",
"tool_call_id": out["call_id"],
"content": out.get("output", ""),
}

result.append(entry)
return result, existing_summary
Expand Down Expand Up @@ -714,6 +777,96 @@ def _append_tool_to_messages(
)


def _append_batch_to_messages(
messages: list[dict],
call_results: list[tuple[dict, str]],
provider: str,
reasoning_items: list[dict] | None = None,
):
"""Append multiple tool calls + results as a single assistant message.

Unlike _append_tool_to_messages (which creates one assistant message per
call), this batches all calls into one assistant message. This is
required for the Responses API with reasoning models, where all
function_calls from one API response share the same reasoning items.
"""
if not call_results:
return

tool_calls = []
for event, _ in call_results:
tool_calls.append(
{
"id": event["call_id"],
"fc_id": event.get("id", ""),
"type": "function",
"function": {
"name": event["name"],
"arguments": json.dumps(event["arguments"]),
},
}
)

assistant_msg: dict = {
"role": "assistant",
"content": "",
"tool_calls": tool_calls,
}
if reasoning_items:
assistant_msg["reasoning_items"] = reasoning_items
messages.append(assistant_msg)

for event, result in call_results:
# Guard against oversized tool outputs
image = _parse_image_data_uri(result)
if not image:
if len(result) > CHAT_TOOL_MAX_CHARS:
half = CHAT_TOOL_MAX_CHARS // 2
result = result[:half] + "\n\n...(truncated)...\n\n" + result[-half:]

if image:
media_type, b64_data = image
path = event["arguments"].get("path", "image")
messages.append(
{
"role": "tool",
"tool_call_id": event["call_id"],
"content": [
{"type": "text", "text": f"Image file: {path}"},
{
"type": "image",
"media_type": media_type,
"base64": b64_data,
},
],
}
)
else:
messages.append(
{
"role": "tool",
"tool_call_id": event["call_id"],
"content": result,
}
)


def _scrub_incomplete_items(output_items: list[dict]) -> None:
"""Mark any in-progress function_call items as 'failed'.

Called before persisting output_items after an error or cancellation.
Without this, "in_progress" items stay in the DB. While
_load_message_history skips non-"completed" items today, marking
them explicitly prevents any future code from misinterpreting them.
"""
for item in output_items:
if item.get("type") == "function_call" and item.get("status") in (
"in_progress",
"pending",
):
item["status"] = "failed"


def _find_safe_split(messages: list[dict], target_keep: int) -> int:
"""Find a safe split index that doesn't break tool call pairs.

Expand Down Expand Up @@ -1172,20 +1325,26 @@ def _sync_state():
other_indices = [i for i, (tc, _) in enumerate(call_items) if tc["name"] != "delegate_task"]

# Execute non-delegate tools sequentially first
# Collect results for batch message construction
sequential_results: list[tuple[dict, str]] = [] # (event, result)
for idx in other_indices:
tc, item = call_items[idx]
if tc["name"] == "create_artifact":
result = await create_artifact(**tc["arguments"], workspace=workspace)
else:
result = await execute_tool(tc["name"], tc["arguments"], tool_ctx)

item["status"] = "completed"
# Append output BEFORE marking completed — if anything
# between here and persist fails, the call stays
# "in_progress" (safely skipped on reload) rather than
# "completed" without an output (corrupts history).
result_item = {
"type": "function_call_output",
"call_id": tc["call_id"],
"output": result,
}
output_items.append(result_item)
item["status"] = "completed"
await emit(output=item)
await emit(output=result_item)
_sync_state()
Expand All @@ -1196,10 +1355,16 @@ def _sync_state():
await emit(output=artifact_item)
_sync_state()

# Only attach reasoning to the first tool call message
ri = pending_reasoning if idx == other_indices[0] else None
_append_tool_to_messages(messages, tc, result, provider, reasoning_items=ri)
new_messages_since += 2
sequential_results.append((tc, result))

# Build a single combined assistant message for all sequential calls
# with their shared reasoning items (required for reasoning model round-tripping)
if sequential_results:
_append_batch_to_messages(
messages, sequential_results, provider,
reasoning_items=pending_reasoning,
)
new_messages_since += 1 + len(sequential_results)

# Execute delegate_task calls concurrently, emit each as it completes
if delegate_indices:
Expand All @@ -1212,6 +1377,7 @@ def _sync_state():
)
inflight[task] = idx

delegate_results: list[tuple[dict, str]] = []
while inflight:
done_set, _ = await asyncio.wait(
inflight.keys(), return_when=asyncio.FIRST_COMPLETED
Expand All @@ -1224,20 +1390,29 @@ def _sync_state():
except Exception as e:
result = f"Error: {e}"

item["status"] = "completed"
# Append output BEFORE marking completed (same
# ordering fix as sequential path above).
result_item = {
"type": "function_call_output",
"call_id": tc["call_id"],
"output": result,
}
output_items.append(result_item)
item["status"] = "completed"
await emit(output=item)
await emit(output=result_item)
_sync_state()
# Attach reasoning to first delegate call if no sequential calls consumed it
ri = pending_reasoning if not other_indices and idx == delegate_indices[0] else None
_append_tool_to_messages(messages, tc, result, provider, reasoning_items=ri)
new_messages_since += 2
delegate_results.append((tc, result))

# Build combined message for all delegate calls
if delegate_results:
# Only attach reasoning if sequential calls didn't consume it
ri = pending_reasoning if not other_indices else None
_append_batch_to_messages(
messages, delegate_results, provider,
reasoning_items=ri,
)
new_messages_since += 1 + len(delegate_results)

# Persist after all tool calls
await ChatMessage.update(message_id, content=content, output=output_items)
Expand Down Expand Up @@ -1277,6 +1452,7 @@ def _sync_state():

except asyncio.CancelledError:
_flush_text()
_scrub_incomplete_items(output_items)
await ChatMessage.update(message_id, content=content, output=output_items, done=True)
_task_state.pop(message_id, None)
await _emit_done()
Expand Down Expand Up @@ -1304,6 +1480,7 @@ def _sync_state():
flushed_item = _flush_text()
if flushed_item:
await emit(output=flushed_item)
_scrub_incomplete_items(output_items)
await ChatMessage.update(
message_id,
content=content,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "cptr"
version = "0.4.4"
version = "0.4.5"
description = "Your computer, from anywhere. Code, manage, and control your machine from the web."
license = {file = "LICENSE"}
readme = "README.md"
Expand Down
Loading