From f10bca81559b18fa003c3b1bcc4fedf148ddbd7f Mon Sep 17 00:00:00 2001 From: visa2 Date: Sun, 31 May 2026 16:07:52 +0800 Subject: [PATCH 1/2] =?UTF-8?q?refactor(apicompat):=20redesign=20the=20Cod?= =?UTF-8?q?ex=20Responses=20=E2=86=94=20Chat=20Completions=20bridge?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex CLI speaks the OpenAI Responses protocol (streaming, store:false), while many upstreams (e.g. DeepSeek in thinking mode) only expose Chat Completions. The bridge that translates between the two had grown field by field and leaned on Go's serialization defaults, which both the Responses client (Codex) and the Chat upstream reject in ways the official OpenAI endpoints tolerate. Problems this fixes (all observed running Codex CLI against a DeepSeek upstream): - Streaming reasoning was never shown in the Codex TUI (the answer appeared with no visible thinking): reasoning deltas were emitted before the reasoning item was opened, so the strict client discarded them. - A tool-using turn could wedge the session into a "no response" state: the function_call stream was never closed (no function_call_arguments.done / output_item.done), so Codex never saw the tool call complete. - Parallel tool calls were rejected upstream (400/502): each function_call became its own assistant message, producing consecutive assistant messages with mismatched tool replies. - A tool turn was rejected with "reasoning_content in the thinking mode must be passed back": the reasoning that produced the tool call was dropped instead of being returned on the assistant message. - Items with no Chat equivalent (web_search_call, ...) and Codex's command-approval notice landed between an assistant tool_calls message and its tool reply, triggering "An assistant message with 'tool_calls' must be followed by tool messages responding to each 'tool_call_id'". - Interrupt/reconnect left an unanswered or dangling tool_call in the history, triggering the same 400. The shared root cause is reliance on serialization defaults — omitempty dropping protocol-required zero values, and unrecognized item types falling through a generic path — rather than deliberately reproducing the target protocol. The bridge is reworked into two explicit layers. Request direction (Responses input -> Chat messages): a parse -> build -> normalize pipeline. - reasoning_content is carried back on the assistant message that produced a tool call (DeepSeek thinking mode requires it to continue the same thought) - consecutive function_call items (parallel tool calls) are merged into a single assistant message's tool_calls array - item types with no Chat equivalent are skipped instead of leaking through a generic path - normalizeChatMessages is the single invariant gate: it guarantees every assistant tool_calls message is immediately followed by one tool reply per tool_call_id — reordering any intervening message (such as a command-approval notice) to after the replies, dropping unanswered tool_calls and orphan tool replies, and preserving bare passthrough tool messages. Response direction (Chat SSE -> Responses SSE): ResponsesStreamEvent.MarshalJSON constructs each streamed event explicitly so protocol-required fields are always present (output_index/content_index/summary_index at 0, message content:[], reasoning summary:[], function_call call_id/name/arguments, output_text part text/annotations/logprobs). This is a single source of truth that removes any post-hoc JSON patching. Reasoning is emitted as its own output item, opened before its deltas, and tool calls are fully closed (function_call_arguments.done + output_item.done with complete arguments). Tests cover request-direction message invariants against golden Codex request shapes (parallel calls, unknown items, intervening messages, partial/dangling calls), per-event wire completeness, and streaming lifecycle ordering. Co-Authored-By: Claude Opus 4.8 --- .../chatcompletions_responses_bridge.go | 476 +++++++++++++++--- ...tions_responses_request_invariants_test.go | 187 +++++++ ...letions_responses_stream_lifecycle_test.go | 103 ++++ .../apicompat/responses_stream_event_wire.go | 199 ++++++++ .../responses_stream_event_wire_test.go | 109 ++++ backend/internal/pkg/apicompat/types.go | 4 + 6 files changed, 1019 insertions(+), 59 deletions(-) create mode 100644 backend/internal/pkg/apicompat/chatcompletions_responses_request_invariants_test.go create mode 100644 backend/internal/pkg/apicompat/chatcompletions_responses_stream_lifecycle_test.go create mode 100644 backend/internal/pkg/apicompat/responses_stream_event_wire.go create mode 100644 backend/internal/pkg/apicompat/responses_stream_event_wire_test.go diff --git a/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go b/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go index 09b680c7c73..cc51cba2d21 100644 --- a/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go +++ b/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go @@ -42,14 +42,24 @@ func ResponsesToChatCompletionsRequest(req *ResponsesRequest) (*ChatCompletionsR return out, nil } +// responsesInputToChatMessages converts a Responses request's instructions + +// input[] into Chat Completions messages. It is a three-stage pipeline: +// +// parse — instructions become a system message; input[] is split into items +// build — buildChatMessagesFromItems walks items, attaching reasoning to the +// assistant message that produced a tool call, merging parallel tool +// calls into one assistant message, and skipping item types that have +// no Chat equivalent +// normalize — normalizeChatMessages enforces the invariants DeepSeek requires +// +// The build + normalize split keeps every protocol rule in one place rather than +// scattered across per-item cases, and makes unknown future codex item types +// fail safe instead of leaking into the upstream request. func responsesInputToChatMessages(instructions string, inputRaw json.RawMessage) ([]ChatMessage, error) { var messages []ChatMessage if strings.TrimSpace(instructions) != "" { content, _ := json.Marshal(instructions) - messages = append(messages, ChatMessage{ - Role: "system", - Content: content, - }) + messages = append(messages, ChatMessage{Role: "system", Content: content}) } inputRaw = bytesTrimSpace(inputRaw) @@ -57,13 +67,11 @@ func responsesInputToChatMessages(instructions string, inputRaw json.RawMessage) return messages, nil } + // Bare string input is a single user turn. var inputText string if err := json.Unmarshal(inputRaw, &inputText); err == nil { content, _ := json.Marshal(inputText) - messages = append(messages, ChatMessage{ - Role: "user", - Content: content, - }) + messages = append(messages, ChatMessage{Role: "user", Content: content}) return messages, nil } @@ -72,6 +80,24 @@ func responsesInputToChatMessages(instructions string, inputRaw json.RawMessage) return nil, fmt.Errorf("parse responses input: %w", err) } + built, err := buildChatMessagesFromItems(messages, rawItems) + if err != nil { + return nil, err + } + return normalizeChatMessages(built), nil +} + +// buildChatMessagesFromItems walks the Responses input items and appends the +// corresponding Chat messages. +func buildChatMessagesFromItems(messages []ChatMessage, rawItems []json.RawMessage) ([]ChatMessage, error) { + // pendingReasoning holds the reasoning text from a reasoning item until the + // assistant message it belongs to is emitted. DeepSeek's thinking mode + // requires the reasoning_content that produced a tool call to be passed back + // on that assistant message; dropping it yields a 400. It only survives + // across an assistant message (so a following tool call in the same turn + // still receives it); any other role ends the thinking span. + var pendingReasoning string + for _, raw := range rawItems { raw = bytesTrimSpace(raw) if len(raw) == 0 || string(raw) == "null" { @@ -84,6 +110,7 @@ func responsesInputToChatMessages(instructions string, inputRaw json.RawMessage) if textErr := json.Unmarshal(raw, &text); textErr == nil { content, _ := json.Marshal(text) messages = append(messages, ChatMessage{Role: "user", Content: content}) + pendingReasoning = "" continue } return nil, fmt.Errorf("parse responses input item: %w", err) @@ -92,22 +119,40 @@ func responsesInputToChatMessages(instructions string, inputRaw json.RawMessage) role := chatCompletionsBridgeRole(rawString(item["role"])) itemType := rawString(item["type"]) switch itemType { + case "reasoning": + if txt := extractResponsesReasoningText(item); txt != "" { + pendingReasoning = txt + } + continue case "function_call": arguments := rawString(item["arguments"]) if strings.TrimSpace(arguments) == "" { arguments = "{}" } - messages = append(messages, ChatMessage{ - Role: "assistant", - ToolCalls: []ChatToolCall{{ - ID: rawString(item["call_id"]), - Type: "function", - Function: ChatFunctionCall{ - Name: rawString(item["name"]), - Arguments: arguments, - }, - }}, - }) + toolCall := ChatToolCall{ + ID: rawString(item["call_id"]), + Type: "function", + Function: ChatFunctionCall{ + Name: rawString(item["name"]), + Arguments: arguments, + }, + } + // Parallel tool calls arrive as consecutive function_call items and + // must share one assistant message; the matching tool replies then + // follow it. Merge into the immediately preceding assistant message. + if n := len(messages); n > 0 && messages[n-1].Role == "assistant" { + messages[n-1].ToolCalls = append(messages[n-1].ToolCalls, toolCall) + if messages[n-1].ReasoningContent == "" { + messages[n-1].ReasoningContent = pendingReasoning + } + } else { + messages = append(messages, ChatMessage{ + Role: "assistant", + ToolCalls: []ChatToolCall{toolCall}, + ReasoningContent: pendingReasoning, + }) + } + pendingReasoning = "" continue case "function_call_output": content, _ := json.Marshal(rawString(item["output"])) @@ -116,10 +161,12 @@ func responsesInputToChatMessages(instructions string, inputRaw json.RawMessage) ToolCallID: rawString(item["call_id"]), Content: content, }) + pendingReasoning = "" continue case "input_text", "text": content, _ := json.Marshal(rawString(item["text"])) messages = append(messages, ChatMessage{Role: "user", Content: content}) + pendingReasoning = "" continue case "input_image": content, err := chatContentFromSingleResponsesPart(itemType, item) @@ -127,6 +174,18 @@ func responsesInputToChatMessages(instructions string, inputRaw json.RawMessage) return nil, err } messages = append(messages, ChatMessage{Role: "user", Content: content}) + pendingReasoning = "" + continue + } + + // Only genuine message items become chat messages. Codex emits other + // Responses item types with no Chat equivalent (web_search_call, + // local_shell_call, custom tool calls, file_search_call, ...). Converting + // them via the generic path would insert a spurious message between an + // assistant tool_calls message and its tool reply, which DeepSeek rejects + // ("insufficient tool messages following tool_calls message"). Skip them. + if itemType != "" && itemType != "message" { + pendingReasoning = "" continue } @@ -140,15 +199,128 @@ func responsesInputToChatMessages(instructions string, inputRaw json.RawMessage) if err != nil { return nil, err } - messages = append(messages, ChatMessage{ - Role: role, - Content: chatContent, - }) + messages = append(messages, ChatMessage{Role: role, Content: chatContent}) + // Reasoning only survives across an assistant text message. + if role != "assistant" { + pendingReasoning = "" + } } return messages, nil } +// normalizeChatMessages is the single place that enforces the tool-call +// invariant the DeepSeek / OpenAI Chat Completions schema requires: an assistant +// message with tool_calls must be immediately followed by one tool message per +// tool_call_id, in order, with nothing in between. +// +// Codex histories violate this in several ways that the builder alone can't fix: +// - a non-tool message lands between an assistant tool_calls message and its +// tool replies (e.g. an "Approved command prefix saved" system notice codex +// injects mid tool-execution); +// - a parallel tool_call's sibling output never arrives, or a call is left +// dangling by a mid-execution reconnect (unanswered tool_call); +// - a tool reply has no announcing assistant tool_call (orphan). +// +// It rebuilds the sequence so each assistant's answered tool_calls are followed +// directly by their replies (in call order); unanswered tool_calls are dropped +// (and an assistant left with neither tool_calls nor content is dropped); orphan +// tool replies and intervening messages are emitted in their natural position +// but never between an assistant tool_calls message and its replies. +func normalizeChatMessages(messages []ChatMessage) []ChatMessage { + // Index every tool reply by its tool_call_id (last wins on duplicates). + replies := make(map[string]ChatMessage) + for _, m := range messages { + if m.Role == "tool" && m.ToolCallID != "" { + replies[m.ToolCallID] = m + } + } + + out := make([]ChatMessage, 0, len(messages)) + for _, m := range messages { + switch { + case m.Role == "tool": + // A bare tool message with no tool_call_id is a direct Chat + // Completions passthrough; keep it in place. A tool reply whose id is + // announced by an assistant is emitted right after that assistant + // (skip the standalone occurrence). Any other tool reply is an orphan + // and is dropped. + if m.ToolCallID == "" { + out = append(out, m) + } + continue + case len(m.ToolCalls) > 0: + kept := make([]ChatToolCall, 0, len(m.ToolCalls)) + for _, tc := range m.ToolCalls { + if tc.ID == "" { + continue + } + if _, ok := replies[tc.ID]; ok { + kept = append(kept, tc) + } + } + if len(kept) == 0 { + // No answered tool_calls left: keep as a plain message if it has + // content, otherwise drop it entirely. + if isBlankChatContent(m.Content) { + continue + } + m.ToolCalls = nil + out = append(out, m) + continue + } + m.ToolCalls = kept + out = append(out, m) + for _, tc := range kept { + out = append(out, replies[tc.ID]) + } + default: + out = append(out, m) + } + } + return out +} + +// isBlankChatContent reports whether a chat message content holds no usable text. +func isBlankChatContent(raw json.RawMessage) bool { + raw = bytesTrimSpace(raw) + if len(raw) == 0 || string(raw) == "null" || string(raw) == `""` { + return true + } + return chatMessageContentText(raw) == "" +} + +// extractResponsesReasoningText pulls the reasoning text out of a Responses +// reasoning item. The Chat→Responses bridge writes the upstream reasoning_content +// verbatim into the summary_text parts (see closeChatReasoningItem), so codex +// round-trips it there; prefer summary[].text and fall back to content. +func extractResponsesReasoningText(item map[string]json.RawMessage) string { + var parts []string + collect := func(raw json.RawMessage) { + raw = bytesTrimSpace(raw) + if len(raw) == 0 || string(raw) == "null" { + return + } + var arr []map[string]json.RawMessage + if err := json.Unmarshal(raw, &arr); err == nil { + for _, p := range arr { + if t := rawString(p["text"]); t != "" { + parts = append(parts, t) + } + } + return + } + if t := rawString(raw); t != "" { + parts = append(parts, t) + } + } + collect(item["summary"]) + if len(parts) == 0 { + collect(item["content"]) + } + return strings.Join(parts, "\n") +} + func chatCompletionsBridgeRole(role string) string { trimmed := strings.TrimSpace(role) if trimmed == "" { @@ -448,10 +620,32 @@ type ChatCompletionsToResponsesStreamState struct { CreatedSent bool CompletedSent bool + // nextOutputIndex assigns sequential output_index values to items as they + // are opened (reasoning, message, tool calls), so the streamed indices match + // the order of items in the final response.output array. + nextOutputIndex int + + // Reasoning item lifecycle. DeepSeek-style upstreams stream all + // reasoning_content before any content, so reasoning is modeled as its own + // "reasoning" output item that must be opened (output_item.added) before any + // reasoning delta and closed before the message/tool items open. + ReasoningItemID string + ReasoningIndex int + ReasoningOpen bool + ReasoningDone bool + + // Message item + output_text content-part lifecycle. MessageItemID string - Text strings.Builder - Reasoning strings.Builder - ToolCalls map[int]*ChatToolCall + MessageIndex int + TextPartOpen bool + + Text strings.Builder + Reasoning strings.Builder + + // Tool-call lifecycle, keyed by the upstream tool_call index. + ToolCalls map[int]*ChatToolCall + ToolItemIDs map[int]string + ToolOutputIndex map[int]int FinishReason string Usage *ResponsesUsage @@ -460,13 +654,21 @@ type ChatCompletionsToResponsesStreamState struct { // NewChatCompletionsToResponsesStreamState returns an initialized stream state. func NewChatCompletionsToResponsesStreamState(model string) *ChatCompletionsToResponsesStreamState { return &ChatCompletionsToResponsesStreamState{ - ResponseID: generateResponsesID(), - Model: model, - Created: time.Now().Unix(), - ToolCalls: make(map[int]*ChatToolCall), + ResponseID: generateResponsesID(), + Model: model, + Created: time.Now().Unix(), + ToolCalls: make(map[int]*ChatToolCall), + ToolItemIDs: make(map[int]string), + ToolOutputIndex: make(map[int]int), } } +func (state *ChatCompletionsToResponsesStreamState) allocOutputIndex() int { + idx := state.nextOutputIndex + state.nextOutputIndex++ + return idx +} + // ChatCompletionsChunkToResponsesEvents converts one Chat Completions stream // chunk into zero or more Responses stream events. func ChatCompletionsChunkToResponsesEvents( @@ -490,24 +692,34 @@ func ChatCompletionsChunkToResponsesEvents( events = append(events, ensureChatToResponsesCreated(state)...) for _, choice := range chunk.Choices { - if choice.Delta.Content != nil { + // Reasoning is emitted as its own output item and must be opened + // (output_item.added + reasoning_summary_part.added) before the first + // delta, otherwise a strict client discards the delta. The leading + // empty-string reasoning delta upstreams send is filtered out. + if choice.Delta.ReasoningContent != nil && *choice.Delta.ReasoningContent != "" { + events = append(events, ensureChatReasoningItem(state)...) + _, _ = state.Reasoning.WriteString(*choice.Delta.ReasoningContent) + events = append(events, chatToResponsesEvent(state, "response.reasoning_summary_text.delta", &ResponsesStreamEvent{ + OutputIndex: state.ReasoningIndex, + SummaryIndex: 0, + Delta: *choice.Delta.ReasoningContent, + ItemID: state.ReasoningItemID, + })) + } + if choice.Delta.Content != nil && *choice.Delta.Content != "" { + // First real content closes the reasoning item, then opens the + // message item and its output_text content part. + events = append(events, closeChatReasoningItem(state)...) events = append(events, ensureChatToResponsesMessageItem(state)...) + events = append(events, ensureChatToResponsesTextPart(state)...) _, _ = state.Text.WriteString(*choice.Delta.Content) events = append(events, chatToResponsesEvent(state, "response.output_text.delta", &ResponsesStreamEvent{ - OutputIndex: 0, + OutputIndex: state.MessageIndex, ContentIndex: 0, Delta: *choice.Delta.Content, ItemID: state.MessageItemID, })) } - if choice.Delta.ReasoningContent != nil { - _, _ = state.Reasoning.WriteString(*choice.Delta.ReasoningContent) - events = append(events, chatToResponsesEvent(state, "response.reasoning_summary_text.delta", &ResponsesStreamEvent{ - OutputIndex: 0, - SummaryIndex: 0, - Delta: *choice.Delta.ReasoningContent, - })) - } for _, toolCall := range choice.Delta.ToolCalls { idx := 0 if toolCall.Index != nil { @@ -515,6 +727,8 @@ func ChatCompletionsChunkToResponsesEvents( } stored, ok := state.ToolCalls[idx] if !ok { + // A tool call closes any open reasoning item first. + events = append(events, closeChatReasoningItem(state)...) copyCall := toolCall if copyCall.ID == "" { copyCall.ID = generateItemID() @@ -522,11 +736,14 @@ func ChatCompletionsChunkToResponsesEvents( copyCall.Type = "function" state.ToolCalls[idx] = ©Call stored = ©Call + itemID := generateItemID() + state.ToolItemIDs[idx] = itemID + state.ToolOutputIndex[idx] = state.allocOutputIndex() events = append(events, chatToResponsesEvent(state, "response.output_item.added", &ResponsesStreamEvent{ - OutputIndex: idx + 1, + OutputIndex: state.ToolOutputIndex[idx], Item: &ResponsesOutput{ Type: "function_call", - ID: generateItemID(), + ID: itemID, CallID: stored.ID, Name: stored.Function.Name, Status: "in_progress", @@ -543,7 +760,8 @@ func ChatCompletionsChunkToResponsesEvents( if toolCall.Function.Arguments != "" { stored.Function.Arguments += toolCall.Function.Arguments events = append(events, chatToResponsesEvent(state, "response.function_call_arguments.delta", &ResponsesStreamEvent{ - OutputIndex: idx + 1, + OutputIndex: state.ToolOutputIndex[idx], + ItemID: state.ToolItemIDs[idx], Delta: toolCall.Function.Arguments, CallID: stored.ID, Name: stored.Function.Name, @@ -565,24 +783,44 @@ func FinalizeChatCompletionsResponsesStream(state *ChatCompletionsToResponsesStr } var events []ResponsesStreamEvent events = append(events, ensureChatToResponsesCreated(state)...) + + // Close a reasoning item that never transitioned to content (reasoning-only + // or empty completion). + events = append(events, closeChatReasoningItem(state)...) + if state.MessageItemID != "" { - events = append(events, chatToResponsesEvent(state, "response.output_text.done", &ResponsesStreamEvent{ - OutputIndex: 0, - ContentIndex: 0, - Text: state.Text.String(), - ItemID: state.MessageItemID, - })) + if state.TextPartOpen { + events = append(events, chatToResponsesEvent(state, "response.output_text.done", &ResponsesStreamEvent{ + OutputIndex: state.MessageIndex, + ContentIndex: 0, + Text: state.Text.String(), + ItemID: state.MessageItemID, + })) + events = append(events, chatToResponsesEvent(state, "response.content_part.done", &ResponsesStreamEvent{ + OutputIndex: state.MessageIndex, + ContentIndex: 0, + ItemID: state.MessageItemID, + Part: &ResponsesContentPart{Type: "output_text", Text: state.Text.String()}, + })) + } events = append(events, chatToResponsesEvent(state, "response.output_item.done", &ResponsesStreamEvent{ - OutputIndex: 0, + OutputIndex: state.MessageIndex, Item: &ResponsesOutput{ - Type: "message", - ID: state.MessageItemID, - Role: "assistant", - Status: "completed", + Type: "message", + ID: state.MessageItemID, + Role: "assistant", + Content: []ResponsesContentPart{{Type: "output_text", Text: state.Text.String()}}, + Status: "completed", }, })) } + // Close every function_call item opened during the stream. Codex finalizes a + // tool call only after function_call_arguments.done + output_item.done for + // that item; without them the call never completes and the session wedges. + // Mirrors cc-switch's finalize_tools. + events = append(events, closeChatToolItems(state)...) + status := "completed" var incompleteDetails *ResponsesIncompleteDetails if state.FinishReason == "length" { @@ -621,22 +859,142 @@ func ensureChatToResponsesCreated(state *ChatCompletionsToResponsesStreamState) })} } +// ensureChatReasoningItem opens the reasoning output item (output_item.added + +// reasoning_summary_part.added) before the first reasoning delta. Codex renders +// streaming reasoning only when this summary-part lifecycle is present. +func ensureChatReasoningItem(state *ChatCompletionsToResponsesStreamState) []ResponsesStreamEvent { + if state.ReasoningOpen || state.ReasoningDone { + return nil + } + state.ReasoningOpen = true + state.ReasoningItemID = generateItemID() + state.ReasoningIndex = state.allocOutputIndex() + return []ResponsesStreamEvent{ + chatToResponsesEvent(state, "response.output_item.added", &ResponsesStreamEvent{ + OutputIndex: state.ReasoningIndex, + Item: &ResponsesOutput{Type: "reasoning", ID: state.ReasoningItemID, Status: "in_progress"}, + }), + chatToResponsesEvent(state, "response.reasoning_summary_part.added", &ResponsesStreamEvent{ + OutputIndex: state.ReasoningIndex, + SummaryIndex: 0, + ItemID: state.ReasoningItemID, + Part: &ResponsesContentPart{Type: "summary_text"}, + }), + } +} + +// closeChatReasoningItem emits the reasoning item's terminal events +// (reasoning_summary_text.done + reasoning_summary_part.done + output_item.done). +func closeChatReasoningItem(state *ChatCompletionsToResponsesStreamState) []ResponsesStreamEvent { + if !state.ReasoningOpen { + return nil + } + state.ReasoningOpen = false + state.ReasoningDone = true + reasoning := state.Reasoning.String() + return []ResponsesStreamEvent{ + chatToResponsesEvent(state, "response.reasoning_summary_text.done", &ResponsesStreamEvent{ + OutputIndex: state.ReasoningIndex, + SummaryIndex: 0, + Text: reasoning, + ItemID: state.ReasoningItemID, + }), + chatToResponsesEvent(state, "response.reasoning_summary_part.done", &ResponsesStreamEvent{ + OutputIndex: state.ReasoningIndex, + SummaryIndex: 0, + ItemID: state.ReasoningItemID, + Part: &ResponsesContentPart{Type: "summary_text", Text: reasoning}, + }), + chatToResponsesEvent(state, "response.output_item.done", &ResponsesStreamEvent{ + OutputIndex: state.ReasoningIndex, + Item: &ResponsesOutput{ + Type: "reasoning", + ID: state.ReasoningItemID, + Status: "completed", + Summary: []ResponsesSummary{{Type: "summary_text", Text: reasoning}}, + }, + }), + } +} + func ensureChatToResponsesMessageItem(state *ChatCompletionsToResponsesStreamState) []ResponsesStreamEvent { if state.MessageItemID != "" { return nil } state.MessageItemID = generateItemID() + state.MessageIndex = state.allocOutputIndex() return []ResponsesStreamEvent{chatToResponsesEvent(state, "response.output_item.added", &ResponsesStreamEvent{ - OutputIndex: 0, + OutputIndex: state.MessageIndex, Item: &ResponsesOutput{ - Type: "message", - ID: state.MessageItemID, - Role: "assistant", - Status: "in_progress", + Type: "message", + ID: state.MessageItemID, + Role: "assistant", + Status: "in_progress", + Content: []ResponsesContentPart{{Type: "output_text"}}, }, })} } +func ensureChatToResponsesTextPart(state *ChatCompletionsToResponsesStreamState) []ResponsesStreamEvent { + if state.TextPartOpen { + return nil + } + state.TextPartOpen = true + return []ResponsesStreamEvent{chatToResponsesEvent(state, "response.content_part.added", &ResponsesStreamEvent{ + OutputIndex: state.MessageIndex, + ContentIndex: 0, + ItemID: state.MessageItemID, + Part: &ResponsesContentPart{Type: "output_text", Text: ""}, + })} +} + +// closeChatToolItems emits function_call_arguments.done + output_item.done for +// every tool call opened during the stream, carrying the full call_id/name/ +// arguments so codex can deserialize and execute the call. Mirrors cc-switch's +// finalize_tools. +func closeChatToolItems(state *ChatCompletionsToResponsesStreamState) []ResponsesStreamEvent { + if len(state.ToolCalls) == 0 { + return nil + } + var events []ResponsesStreamEvent + for i := 0; i < len(state.ToolCalls); i++ { + toolCall, ok := state.ToolCalls[i] + if !ok || toolCall == nil { + continue + } + itemID, opened := state.ToolItemIDs[i] + if !opened { + continue + } + arguments := toolCall.Function.Arguments + if strings.TrimSpace(arguments) == "" { + arguments = "{}" + } + outputIndex := state.ToolOutputIndex[i] + events = append(events, + chatToResponsesEvent(state, "response.function_call_arguments.done", &ResponsesStreamEvent{ + OutputIndex: outputIndex, + ItemID: itemID, + CallID: toolCall.ID, + Name: toolCall.Function.Name, + Arguments: arguments, + }), + chatToResponsesEvent(state, "response.output_item.done", &ResponsesStreamEvent{ + OutputIndex: outputIndex, + Item: &ResponsesOutput{ + Type: "function_call", + ID: itemID, + CallID: toolCall.ID, + Name: toolCall.Function.Name, + Arguments: arguments, + Status: "completed", + }, + }), + ) + } + return events +} + func (state *ChatCompletionsToResponsesStreamState) chatOutput() []ResponsesOutput { var outputs []ResponsesOutput if state.Reasoning.Len() > 0 { diff --git a/backend/internal/pkg/apicompat/chatcompletions_responses_request_invariants_test.go b/backend/internal/pkg/apicompat/chatcompletions_responses_request_invariants_test.go new file mode 100644 index 00000000000..e54a453279d --- /dev/null +++ b/backend/internal/pkg/apicompat/chatcompletions_responses_request_invariants_test.go @@ -0,0 +1,187 @@ +package apicompat + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" +) + +// assertChatInvariants enforces the DeepSeek / OpenAI Chat Completions message +// invariants that, when violated, surface as upstream 400s. Used to validate the +// request-direction converter against golden codex request shapes. +func assertChatInvariants(t *testing.T, messages []ChatMessage) { + t.Helper() + for i, m := range messages { + // Every assistant tool_calls message must be immediately followed by one + // tool message per tool_call_id, in order. + if len(m.ToolCalls) > 0 { + for j, tc := range m.ToolCalls { + k := i + 1 + j + require.Lessf(t, k, len(messages), "tool_call %s has no following tool message", tc.ID) + require.Equalf(t, "tool", messages[k].Role, "tool_call %s not followed by a tool message", tc.ID) + require.Equalf(t, tc.ID, messages[k].ToolCallID, "tool reply order mismatch for %s", tc.ID) + } + } + // No two consecutive assistant messages. + if i > 0 && m.Role == "assistant" && messages[i-1].Role == "assistant" { + t.Fatalf("consecutive assistant messages at %d", i) + } + // No orphan tool replies. + if m.Role == "tool" { + require.NotEmptyf(t, m.ToolCallID, "tool message without tool_call_id at %d", i) + } + } +} + +func convertGolden(t *testing.T, input string) []ChatMessage { + t.Helper() + msgs, err := responsesInputToChatMessages("You are a helpful assistant.", json.RawMessage(input)) + require.NoError(t, err) + return msgs +} + +// Golden sample: a single tool-call turn (codex runs one shell/curl command), +// the shape that produced the original "no response" / 400. +func TestGolden_SingleToolCall(t *testing.T) { + msgs := convertGolden(t, `[ + {"type":"message","role":"user","content":[{"type":"input_text","text":"latest sha?"}]}, + {"type":"reasoning","summary":[{"type":"summary_text","text":"need to run curl"}]}, + {"type":"function_call","call_id":"call_a","name":"exec_command","arguments":"{\"cmd\":\"curl x\"}"}, + {"type":"function_call_output","call_id":"call_a","output":"deadbeef"} + ]`) + assertChatInvariants(t, msgs) + // reasoning_content must ride on the assistant tool-call message. + var asst *ChatMessage + for i := range msgs { + if len(msgs[i].ToolCalls) > 0 { + asst = &msgs[i] + } + } + require.NotNil(t, asst) + require.Equal(t, "need to run curl", asst.ReasoningContent) +} + +// Golden sample: parallel tool calls (codex runs git log + git tag at once). +func TestGolden_ParallelToolCalls(t *testing.T) { + msgs := convertGolden(t, `[ + {"type":"message","role":"user","content":[{"type":"input_text","text":"features?"}]}, + {"type":"reasoning","summary":[{"type":"summary_text","text":"inspect repo"}]}, + {"type":"function_call","call_id":"c0","name":"exec_command","arguments":"{\"cmd\":\"git log\"}"}, + {"type":"function_call","call_id":"c1","name":"exec_command","arguments":"{\"cmd\":\"git tag\"}"}, + {"type":"function_call_output","call_id":"c0","output":"log"}, + {"type":"function_call_output","call_id":"c1","output":"tags"} + ]`) + assertChatInvariants(t, msgs) + // Both parallel calls share ONE assistant message. + var toolMsgs int + for _, m := range msgs { + if len(m.ToolCalls) == 2 { + require.Equal(t, "c0", m.ToolCalls[0].ID) + require.Equal(t, "c1", m.ToolCalls[1].ID) + } + if m.Role == "tool" { + toolMsgs++ + } + } + require.Equal(t, 2, toolMsgs) +} + +// Golden sample: an unknown item type (web_search_call from a 联网查询) sitting +// between a function_call and its output must not break tool↔reply adjacency. +func TestGolden_UnknownItemBetweenToolCallAndOutput(t *testing.T) { + msgs := convertGolden(t, `[ + {"type":"message","role":"user","content":[{"type":"input_text","text":"search"}]}, + {"type":"reasoning","summary":[{"type":"summary_text","text":"let me search"}]}, + {"type":"function_call","call_id":"c0","name":"exec_command","arguments":"{}"}, + {"type":"web_search_call","id":"ws_1","status":"completed","action":{"type":"search","query":"x"}}, + {"type":"function_call_output","call_id":"c0","output":"result"} + ]`) + assertChatInvariants(t, msgs) +} + +// Sequential tool calls (a tool reply between two calls) must stay in distinct +// assistant messages. +func TestRequest_SequentialToolCallsStaySeparate(t *testing.T) { + msgs := convertGolden(t, `[ + {"type":"function_call","call_id":"c1","name":"exec","arguments":"{}"}, + {"type":"function_call_output","call_id":"c1","output":"r1"}, + {"type":"function_call","call_id":"c2","name":"exec","arguments":"{}"}, + {"type":"function_call_output","call_id":"c2","output":"r2"} + ]`) + assertChatInvariants(t, msgs) + assistants := 0 + for _, m := range msgs { + if len(m.ToolCalls) == 1 { + assistants++ + } + } + require.Equal(t, 2, assistants) +} + +// Golden sample: codex injects a message (e.g. an "Approved command prefix +// saved" notice) between a function_call and its output. The intervening message +// must be moved after the tool reply so the assistant tool_calls is immediately +// followed by its reply. +func TestGolden_MessageBetweenToolCallAndOutput(t *testing.T) { + msgs := convertGolden(t, `[ + {"type":"message","role":"user","content":[{"type":"input_text","text":"do it"}]}, + {"type":"reasoning","summary":[{"type":"summary_text","text":"run cmd"}]}, + {"type":"function_call","call_id":"A","name":"exec","arguments":"{}"}, + {"type":"message","role":"developer","content":[{"type":"input_text","text":"Approved command prefix saved"}]}, + {"type":"function_call_output","call_id":"A","output":"ok"} + ]`) + assertChatInvariants(t, msgs) + // The assistant tool_calls message is immediately followed by its tool reply. + for i, m := range msgs { + if len(m.ToolCalls) > 0 { + require.Equal(t, "tool", msgs[i+1].Role) + require.Equal(t, "A", msgs[i+1].ToolCallID) + } + } +} + +// Golden sample: a parallel tool call where one sibling's output is missing +// (codex interrupted/reconnected mid-execution). The unanswered tool_call must +// be dropped so the remaining assistant tool_calls are all answered. +func TestGolden_PartialParallelDropsUnansweredCall(t *testing.T) { + msgs := convertGolden(t, `[ + {"type":"message","role":"user","content":[{"type":"input_text","text":"q"}]}, + {"type":"reasoning","summary":[{"type":"summary_text","text":"r"}]}, + {"type":"function_call","call_id":"A","name":"exec","arguments":"{}"}, + {"type":"function_call","call_id":"B","name":"exec","arguments":"{}"}, + {"type":"function_call_output","call_id":"A","output":"oa"} + ]`) + assertChatInvariants(t, msgs) + for _, m := range msgs { + for _, tc := range m.ToolCalls { + require.NotEqual(t, "B", tc.ID, "unanswered tool_call B should have been dropped") + } + } +} + +// Golden sample: a dangling tool_call at the end of the history (no output yet). +// The assistant message holding only that call must be dropped entirely. +func TestGolden_DanglingToolCallDropped(t *testing.T) { + msgs := convertGolden(t, `[ + {"type":"message","role":"user","content":[{"type":"input_text","text":"q"}]}, + {"type":"reasoning","summary":[{"type":"summary_text","text":"r"}]}, + {"type":"function_call","call_id":"A","name":"exec","arguments":"{}"} + ]`) + assertChatInvariants(t, msgs) + for _, m := range msgs { + require.Empty(t, m.ToolCalls, "dangling unanswered tool_call should have been dropped") + } +} + +// normalizeChatMessages drops an orphan tool reply whose tool_call was never +// announced. +func TestNormalize_DropsOrphanToolReply(t *testing.T) { + msgs := convertGolden(t, `[ + {"type":"message","role":"user","content":[{"type":"input_text","text":"q"}]}, + {"type":"function_call_output","call_id":"ghost","output":"orphan"} + ]`) + for _, m := range msgs { + require.NotEqualf(t, "tool", m.Role, "orphan tool reply should have been dropped") + } +} diff --git a/backend/internal/pkg/apicompat/chatcompletions_responses_stream_lifecycle_test.go b/backend/internal/pkg/apicompat/chatcompletions_responses_stream_lifecycle_test.go new file mode 100644 index 00000000000..beb473038bc --- /dev/null +++ b/backend/internal/pkg/apicompat/chatcompletions_responses_stream_lifecycle_test.go @@ -0,0 +1,103 @@ +package apicompat + +import ( + "encoding/json" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func collectStreamEvents(t *testing.T, chunks []string) []ResponsesStreamEvent { + t.Helper() + state := NewChatCompletionsToResponsesStreamState("deepseek-v4-pro") + var events []ResponsesStreamEvent + for _, payload := range chunks { + var chunk ChatCompletionsChunk + require.NoError(t, json.Unmarshal([]byte(payload), &chunk)) + events = append(events, ChatCompletionsChunkToResponsesEvents(&chunk, state)...) + } + events = append(events, FinalizeChatCompletionsResponsesStream(state)...) + return events +} + +// TestStream_ReasoningOpensItemBeforeDelta guards the bug where a strict client +// (Codex) drops reasoning deltas that reference an item not yet opened. +func TestStream_ReasoningOpensItemBeforeDelta(t *testing.T) { + events := collectStreamEvents(t, []string{ + `{"choices":[{"index":0,"delta":{"role":"assistant","content":null,"reasoning_content":""}}]}`, + `{"choices":[{"index":0,"delta":{"reasoning_content":"think"}}]}`, + `{"choices":[{"index":0,"delta":{"content":"hello"}}]}`, + `{"choices":[{"index":0,"delta":{"content":""},"finish_reason":"stop"}],"usage":{"prompt_tokens":1,"completion_tokens":2,"total_tokens":3}}`, + }) + + open := map[int]string{} // output_index -> item type + for _, e := range events { + switch e.Type { + case "response.output_item.added": + require.NotNil(t, e.Item) + open[e.OutputIndex] = e.Item.Type + case "response.reasoning_summary_text.delta": + require.Equalf(t, "reasoning", open[e.OutputIndex], "reasoning delta before its item was opened") + case "response.output_text.delta": + require.Equalf(t, "message", open[e.OutputIndex], "text delta before its item was opened") + } + } +} + +// TestStream_ToolCallLifecycleComplete guards that a tool call is fully closed +// (function_call_arguments.done + output_item.done with full arguments), which +// codex needs to execute the call. +func TestStream_ToolCallLifecycleComplete(t *testing.T) { + events := collectStreamEvents(t, []string{ + `{"choices":[{"index":0,"delta":{"role":"assistant","reasoning_content":"plan"}}]}`, + `{"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"id":"call_a","type":"function","function":{"name":"exec","arguments":""}}]}}]}`, + `{"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"cmd\":\"ls\"}"}}]}}]}`, + `{"choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}],"usage":{"prompt_tokens":1,"completion_tokens":2,"total_tokens":3}}`, + }) + + var sawAdded, sawArgsDone, sawItemDone bool + for _, e := range events { + switch e.Type { + case "response.output_item.added": + if e.Item != nil && e.Item.Type == "function_call" { + sawAdded = true + } + case "response.function_call_arguments.done": + sawArgsDone = true + require.Equal(t, `{"cmd":"ls"}`, e.Arguments) + case "response.output_item.done": + if e.Item != nil && e.Item.Type == "function_call" { + sawItemDone = true + require.Equal(t, `{"cmd":"ls"}`, e.Item.Arguments) + require.Equal(t, "call_a", e.Item.CallID) + } + } + } + require.True(t, sawAdded, "function_call output_item.added missing") + require.True(t, sawArgsDone, "function_call_arguments.done missing") + require.True(t, sawItemDone, "function_call output_item.done missing") +} + +// TestStream_SSEWireComplete drives the full stream through SSE encoding and +// asserts the function_call events carry complete fields on the wire. +func TestStream_SSEWireComplete(t *testing.T) { + events := collectStreamEvents(t, []string{ + `{"choices":[{"index":0,"delta":{"role":"assistant","reasoning_content":"plan"}}]}`, + `{"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"id":"call_a","type":"function","function":{"name":"exec","arguments":"{}"}}]}}]}`, + `{"choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}`, + }) + + var addedLine string + for _, e := range events { + sse, err := ResponsesEventToSSE(e) + require.NoError(t, err) + if e.Type == "response.output_item.added" && e.Item != nil && e.Item.Type == "function_call" { + addedLine = sse + } + } + require.NotEmpty(t, addedLine) + // The function_call added event must carry arguments:"" on the wire. + require.True(t, strings.Contains(addedLine, `"arguments":""`), "added line missing arguments: %s", addedLine) + require.Contains(t, addedLine, `"call_id":"call_a"`) +} diff --git a/backend/internal/pkg/apicompat/responses_stream_event_wire.go b/backend/internal/pkg/apicompat/responses_stream_event_wire.go new file mode 100644 index 00000000000..df7a82e3937 --- /dev/null +++ b/backend/internal/pkg/apicompat/responses_stream_event_wire.go @@ -0,0 +1,199 @@ +package apicompat + +import "encoding/json" + +// MarshalJSON renders a ResponsesStreamEvent into its wire form. +// +// The OpenAI Responses streaming protocol requires several fields to be present +// even when they hold a zero value: output_index/content_index/summary_index are +// meaningful at 0, a function_call item must always carry call_id/name/arguments +// (arguments may be ""), a message item must carry content:[] and an output_text +// part must carry text/annotations/logprobs. Go's `omitempty` drops exactly those +// zero values, and strict clients (Codex CLI) reject items/deltas whose required +// fields are missing. +// +// Rather than marshalling with omitempty and patching the JSON afterwards, every +// streamed event type is constructed explicitly here — the Go analogue of the +// reference gateways' (cc-switch, CCX) per-event object construction. This is the +// single source of truth for Responses SSE field presence and applies uniformly +// to every emitter (Chat→Responses bridge and Anthropic→Responses converter). +// +// Event types not listed fall back to the default struct marshalling, which +// bounds the blast radius of this method to the streamed item/part/text/tool +// events. +func (e ResponsesStreamEvent) MarshalJSON() ([]byte, error) { + switch e.Type { + case "response.output_text.delta", "response.output_text.done": + m := e.wireBase() + e.putItemID(m) + m["output_index"] = e.OutputIndex + m["content_index"] = e.ContentIndex + if e.Type == "response.output_text.done" { + m["text"] = e.Text + } else { + m["delta"] = e.Delta + } + return json.Marshal(m) + + case "response.content_part.added", "response.content_part.done": + m := e.wireBase() + e.putItemID(m) + m["output_index"] = e.OutputIndex + m["content_index"] = e.ContentIndex + m["part"] = outputTextPartWire(e.Part) + return json.Marshal(m) + + case "response.reasoning_summary_text.delta", "response.reasoning_summary_text.done": + m := e.wireBase() + e.putItemID(m) + m["output_index"] = e.OutputIndex + m["summary_index"] = e.SummaryIndex + if e.Type == "response.reasoning_summary_text.done" { + m["text"] = e.Text + } else { + m["delta"] = e.Delta + } + return json.Marshal(m) + + case "response.reasoning_summary_part.added", "response.reasoning_summary_part.done": + m := e.wireBase() + e.putItemID(m) + m["output_index"] = e.OutputIndex + m["summary_index"] = e.SummaryIndex + m["part"] = summaryTextPartWire(e.Part) + return json.Marshal(m) + + case "response.output_item.added", "response.output_item.done": + m := e.wireBase() + m["output_index"] = e.OutputIndex + m["item"] = responsesItemWire(e.Item) + return json.Marshal(m) + + case "response.function_call_arguments.delta", "response.function_call_arguments.done": + m := e.wireBase() + e.putItemID(m) + m["output_index"] = e.OutputIndex + if e.CallID != "" { + m["call_id"] = e.CallID + } + if e.Name != "" { + m["name"] = e.Name + } + if e.Type == "response.function_call_arguments.done" { + m["arguments"] = e.Arguments + } else { + m["delta"] = e.Delta + } + return json.Marshal(m) + + default: + // response.created / completed / done / failed / incomplete and any + // event type not shaped above keep the default struct marshalling. + type alias ResponsesStreamEvent + return json.Marshal(alias(e)) + } +} + +func (e ResponsesStreamEvent) wireBase() map[string]any { + m := map[string]any{ + "type": e.Type, + "sequence_number": e.SequenceNumber, + } + return m +} + +func (e ResponsesStreamEvent) putItemID(m map[string]any) { + if e.ItemID != "" { + m["item_id"] = e.ItemID + } +} + +// outputTextPartWire renders a content part for a message's output_text, always +// carrying text/annotations/logprobs (matching cc-switch's push_text_delta). +func outputTextPartWire(part *ResponsesContentPart) map[string]any { + text := "" + if part != nil { + text = part.Text + } + return map[string]any{ + "type": "output_text", + "text": text, + "annotations": []any{}, + "logprobs": []any{}, + } +} + +// summaryTextPartWire renders a reasoning summary part. +func summaryTextPartWire(part *ResponsesContentPart) map[string]any { + text := "" + if part != nil { + text = part.Text + } + return map[string]any{ + "type": "summary_text", + "text": text, + } +} + +// responsesItemWire renders an output_item with every field the item's type +// requires to be present, including the empty arrays/strings that omitempty +// would otherwise drop. Mirrors cc-switch's response_function_call_item and the +// message/reasoning item shapes codex expects. +func responsesItemWire(item *ResponsesOutput) map[string]any { + if item == nil { + return map[string]any{} + } + m := map[string]any{ + "type": item.Type, + "id": item.ID, + } + if item.Status != "" { + m["status"] = item.Status + } + switch item.Type { + case "message": + role := item.Role + if role == "" { + role = "assistant" + } + m["role"] = role + m["content"] = messageContentWire(item.Content) + case "reasoning": + m["summary"] = reasoningSummaryWire(item.Summary) + if item.EncryptedContent != "" { + m["encrypted_content"] = item.EncryptedContent + } + case "function_call": + m["call_id"] = item.CallID + m["name"] = item.Name + m["arguments"] = item.Arguments + } + return m +} + +// messageContentWire renders a message item's content array; always an array +// (never null), with each output_text part carrying its text. +func messageContentWire(parts []ResponsesContentPart) []map[string]any { + out := make([]map[string]any, 0, len(parts)) + for _, p := range parts { + typ := p.Type + if typ == "" { + typ = "output_text" + } + out = append(out, map[string]any{"type": typ, "text": p.Text}) + } + return out +} + +// reasoningSummaryWire renders a reasoning item's summary array; always an array. +func reasoningSummaryWire(summary []ResponsesSummary) []map[string]any { + out := make([]map[string]any, 0, len(summary)) + for _, s := range summary { + typ := s.Type + if typ == "" { + typ = "summary_text" + } + out = append(out, map[string]any{"type": typ, "text": s.Text}) + } + return out +} diff --git a/backend/internal/pkg/apicompat/responses_stream_event_wire_test.go b/backend/internal/pkg/apicompat/responses_stream_event_wire_test.go new file mode 100644 index 00000000000..fbef45af454 --- /dev/null +++ b/backend/internal/pkg/apicompat/responses_stream_event_wire_test.go @@ -0,0 +1,109 @@ +package apicompat + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" +) + +// marshalEvent marshals through the custom MarshalJSON and returns the decoded +// object plus the set of top-level keys. +func marshalEvent(t *testing.T, e ResponsesStreamEvent) map[string]any { + t.Helper() + b, err := json.Marshal(e) + require.NoError(t, err) + var m map[string]any + require.NoError(t, json.Unmarshal(b, &m)) + return m +} + +// TestWire_IndexFieldsPresentAtZero guards the omitempty trap: output_index/ +// content_index/summary_index must serialize even when 0. +func TestWire_IndexFieldsPresentAtZero(t *testing.T) { + m := marshalEvent(t, ResponsesStreamEvent{ + Type: "response.output_text.delta", OutputIndex: 0, ContentIndex: 0, ItemID: "msg_1", Delta: "hi", + }) + require.Contains(t, m, "output_index") + require.Contains(t, m, "content_index") + require.EqualValues(t, 0, m["output_index"]) + + r := marshalEvent(t, ResponsesStreamEvent{ + Type: "response.reasoning_summary_text.delta", OutputIndex: 0, SummaryIndex: 0, ItemID: "rs_1", Delta: "think", + }) + require.Contains(t, r, "output_index") + require.Contains(t, r, "summary_index") +} + +// TestWire_FunctionCallItemAlwaysComplete guards that a function_call item +// always carries call_id/name/arguments, including arguments:"" on .added. +func TestWire_FunctionCallItemAlwaysComplete(t *testing.T) { + added := marshalEvent(t, ResponsesStreamEvent{ + Type: "response.output_item.added", + OutputIndex: 1, + Item: &ResponsesOutput{Type: "function_call", ID: "fc_1", CallID: "call_a", Name: "exec", Status: "in_progress"}, + }) + item := added["item"].(map[string]any) + for _, k := range []string{"call_id", "name", "arguments"} { + require.Containsf(t, item, k, "function_call item missing %q", k) + } + require.Equal(t, "", item["arguments"]) +} + +// TestWire_MessageItemContentAlwaysArray guards content:[] presence. +func TestWire_MessageItemContentAlwaysArray(t *testing.T) { + m := marshalEvent(t, ResponsesStreamEvent{ + Type: "response.output_item.added", + OutputIndex: 0, + Item: &ResponsesOutput{Type: "message", ID: "msg_1", Role: "assistant", Status: "in_progress"}, + }) + item := m["item"].(map[string]any) + require.Contains(t, item, "content") + _, ok := item["content"].([]any) + require.True(t, ok, "content must be an array") +} + +// TestWire_ReasoningItemSummaryAlwaysArray guards summary:[] presence. +func TestWire_ReasoningItemSummaryAlwaysArray(t *testing.T) { + m := marshalEvent(t, ResponsesStreamEvent{ + Type: "response.output_item.added", + OutputIndex: 0, + Item: &ResponsesOutput{Type: "reasoning", ID: "rs_1", Status: "in_progress"}, + }) + item := m["item"].(map[string]any) + require.Contains(t, item, "summary") + _, ok := item["summary"].([]any) + require.True(t, ok, "summary must be an array") +} + +// TestWire_ContentPartCarriesAnnotationsLogprobs guards the output_text part shape. +func TestWire_ContentPartCarriesAnnotationsLogprobs(t *testing.T) { + m := marshalEvent(t, ResponsesStreamEvent{ + Type: "response.content_part.added", OutputIndex: 0, ContentIndex: 0, ItemID: "msg_1", + Part: &ResponsesContentPart{Type: "output_text", Text: ""}, + }) + part := m["part"].(map[string]any) + require.Equal(t, "output_text", part["type"]) + require.Contains(t, part, "text") + require.Contains(t, part, "annotations") + require.Contains(t, part, "logprobs") +} + +// TestWire_ArgumentsDonePresentEvenEmpty guards arguments presence on done. +func TestWire_ArgumentsDonePresentEvenEmpty(t *testing.T) { + m := marshalEvent(t, ResponsesStreamEvent{ + Type: "response.function_call_arguments.done", OutputIndex: 1, ItemID: "fc_1", CallID: "call_a", Name: "exec", Arguments: "", + }) + require.Contains(t, m, "arguments") + require.Equal(t, "", m["arguments"]) +} + +// TestWire_UnknownEventFallsBackToDefault ensures non-streamed event types keep +// default marshalling (the response object is preserved). +func TestWire_UnknownEventFallsBackToDefault(t *testing.T) { + m := marshalEvent(t, ResponsesStreamEvent{ + Type: "response.completed", + Response: &ResponsesResponse{ID: "resp_1", Object: "response", Status: "completed"}, + }) + require.Contains(t, m, "response") +} diff --git a/backend/internal/pkg/apicompat/types.go b/backend/internal/pkg/apicompat/types.go index b4451f235bb..d2937802789 100644 --- a/backend/internal/pkg/apicompat/types.go +++ b/backend/internal/pkg/apicompat/types.go @@ -406,6 +406,10 @@ type ResponsesStreamEvent struct { // Reuses Text/Delta fields above, SummaryIndex identifies which summary part SummaryIndex int `json:"summary_index,omitempty"` + // response.content_part.added / done and + // response.reasoning_summary_part.added / done + Part *ResponsesContentPart `json:"part,omitempty"` + // error event fields Code string `json:"code,omitempty"` Param string `json:"param,omitempty"` From 003b2786dacfd9c1c5342c499ae57590979fef8d Mon Sep 17 00:00:00 2001 From: visa2 Date: Mon, 1 Jun 2026 12:03:15 +0800 Subject: [PATCH 2/2] test(apicompat): check type assertions in responses stream wire tests errcheck (check-type-assertions) flagged unchecked single-value type assertions; switch to the comma-ok form so golangci-lint passes. Co-Authored-By: Claude Opus 4.8 --- .../responses_stream_event_wire_test.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/backend/internal/pkg/apicompat/responses_stream_event_wire_test.go b/backend/internal/pkg/apicompat/responses_stream_event_wire_test.go index fbef45af454..b4f6871d5f4 100644 --- a/backend/internal/pkg/apicompat/responses_stream_event_wire_test.go +++ b/backend/internal/pkg/apicompat/responses_stream_event_wire_test.go @@ -43,7 +43,8 @@ func TestWire_FunctionCallItemAlwaysComplete(t *testing.T) { OutputIndex: 1, Item: &ResponsesOutput{Type: "function_call", ID: "fc_1", CallID: "call_a", Name: "exec", Status: "in_progress"}, }) - item := added["item"].(map[string]any) + item, ok := added["item"].(map[string]any) + require.True(t, ok, "item must be an object") for _, k := range []string{"call_id", "name", "arguments"} { require.Containsf(t, item, k, "function_call item missing %q", k) } @@ -57,9 +58,10 @@ func TestWire_MessageItemContentAlwaysArray(t *testing.T) { OutputIndex: 0, Item: &ResponsesOutput{Type: "message", ID: "msg_1", Role: "assistant", Status: "in_progress"}, }) - item := m["item"].(map[string]any) + item, ok := m["item"].(map[string]any) + require.True(t, ok, "item must be an object") require.Contains(t, item, "content") - _, ok := item["content"].([]any) + _, ok = item["content"].([]any) require.True(t, ok, "content must be an array") } @@ -70,9 +72,10 @@ func TestWire_ReasoningItemSummaryAlwaysArray(t *testing.T) { OutputIndex: 0, Item: &ResponsesOutput{Type: "reasoning", ID: "rs_1", Status: "in_progress"}, }) - item := m["item"].(map[string]any) + item, ok := m["item"].(map[string]any) + require.True(t, ok, "item must be an object") require.Contains(t, item, "summary") - _, ok := item["summary"].([]any) + _, ok = item["summary"].([]any) require.True(t, ok, "summary must be an array") } @@ -82,7 +85,8 @@ func TestWire_ContentPartCarriesAnnotationsLogprobs(t *testing.T) { Type: "response.content_part.added", OutputIndex: 0, ContentIndex: 0, ItemID: "msg_1", Part: &ResponsesContentPart{Type: "output_text", Text: ""}, }) - part := m["part"].(map[string]any) + part, ok := m["part"].(map[string]any) + require.True(t, ok, "part must be an object") require.Equal(t, "output_text", part["type"]) require.Contains(t, part, "text") require.Contains(t, part, "annotations")