diff --git a/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go b/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go index 09b680c7c73..cc51cba2d21 100644 --- a/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go +++ b/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go @@ -42,14 +42,24 @@ func ResponsesToChatCompletionsRequest(req *ResponsesRequest) (*ChatCompletionsR return out, nil } +// responsesInputToChatMessages converts a Responses request's instructions + +// input[] into Chat Completions messages. It is a three-stage pipeline: +// +// parse — instructions become a system message; input[] is split into items +// build — buildChatMessagesFromItems walks items, attaching reasoning to the +// assistant message that produced a tool call, merging parallel tool +// calls into one assistant message, and skipping item types that have +// no Chat equivalent +// normalize — normalizeChatMessages enforces the invariants DeepSeek requires +// +// The build + normalize split keeps every protocol rule in one place rather than +// scattered across per-item cases, and makes unknown future codex item types +// fail safe instead of leaking into the upstream request. func responsesInputToChatMessages(instructions string, inputRaw json.RawMessage) ([]ChatMessage, error) { var messages []ChatMessage if strings.TrimSpace(instructions) != "" { content, _ := json.Marshal(instructions) - messages = append(messages, ChatMessage{ - Role: "system", - Content: content, - }) + messages = append(messages, ChatMessage{Role: "system", Content: content}) } inputRaw = bytesTrimSpace(inputRaw) @@ -57,13 +67,11 @@ func responsesInputToChatMessages(instructions string, inputRaw json.RawMessage) return messages, nil } + // Bare string input is a single user turn. var inputText string if err := json.Unmarshal(inputRaw, &inputText); err == nil { content, _ := json.Marshal(inputText) - messages = append(messages, ChatMessage{ - Role: "user", - Content: content, - }) + messages = append(messages, ChatMessage{Role: "user", Content: content}) return messages, nil } @@ -72,6 +80,24 @@ func responsesInputToChatMessages(instructions string, inputRaw json.RawMessage) return nil, fmt.Errorf("parse responses input: %w", err) } + built, err := buildChatMessagesFromItems(messages, rawItems) + if err != nil { + return nil, err + } + return normalizeChatMessages(built), nil +} + +// buildChatMessagesFromItems walks the Responses input items and appends the +// corresponding Chat messages. +func buildChatMessagesFromItems(messages []ChatMessage, rawItems []json.RawMessage) ([]ChatMessage, error) { + // pendingReasoning holds the reasoning text from a reasoning item until the + // assistant message it belongs to is emitted. DeepSeek's thinking mode + // requires the reasoning_content that produced a tool call to be passed back + // on that assistant message; dropping it yields a 400. It only survives + // across an assistant message (so a following tool call in the same turn + // still receives it); any other role ends the thinking span. + var pendingReasoning string + for _, raw := range rawItems { raw = bytesTrimSpace(raw) if len(raw) == 0 || string(raw) == "null" { @@ -84,6 +110,7 @@ func responsesInputToChatMessages(instructions string, inputRaw json.RawMessage) if textErr := json.Unmarshal(raw, &text); textErr == nil { content, _ := json.Marshal(text) messages = append(messages, ChatMessage{Role: "user", Content: content}) + pendingReasoning = "" continue } return nil, fmt.Errorf("parse responses input item: %w", err) @@ -92,22 +119,40 @@ func responsesInputToChatMessages(instructions string, inputRaw json.RawMessage) role := chatCompletionsBridgeRole(rawString(item["role"])) itemType := rawString(item["type"]) switch itemType { + case "reasoning": + if txt := extractResponsesReasoningText(item); txt != "" { + pendingReasoning = txt + } + continue case "function_call": arguments := rawString(item["arguments"]) if strings.TrimSpace(arguments) == "" { arguments = "{}" } - messages = append(messages, ChatMessage{ - Role: "assistant", - ToolCalls: []ChatToolCall{{ - ID: rawString(item["call_id"]), - Type: "function", - Function: ChatFunctionCall{ - Name: rawString(item["name"]), - Arguments: arguments, - }, - }}, - }) + toolCall := ChatToolCall{ + ID: rawString(item["call_id"]), + Type: "function", + Function: ChatFunctionCall{ + Name: rawString(item["name"]), + Arguments: arguments, + }, + } + // Parallel tool calls arrive as consecutive function_call items and + // must share one assistant message; the matching tool replies then + // follow it. Merge into the immediately preceding assistant message. + if n := len(messages); n > 0 && messages[n-1].Role == "assistant" { + messages[n-1].ToolCalls = append(messages[n-1].ToolCalls, toolCall) + if messages[n-1].ReasoningContent == "" { + messages[n-1].ReasoningContent = pendingReasoning + } + } else { + messages = append(messages, ChatMessage{ + Role: "assistant", + ToolCalls: []ChatToolCall{toolCall}, + ReasoningContent: pendingReasoning, + }) + } + pendingReasoning = "" continue case "function_call_output": content, _ := json.Marshal(rawString(item["output"])) @@ -116,10 +161,12 @@ func responsesInputToChatMessages(instructions string, inputRaw json.RawMessage) ToolCallID: rawString(item["call_id"]), Content: content, }) + pendingReasoning = "" continue case "input_text", "text": content, _ := json.Marshal(rawString(item["text"])) messages = append(messages, ChatMessage{Role: "user", Content: content}) + pendingReasoning = "" continue case "input_image": content, err := chatContentFromSingleResponsesPart(itemType, item) @@ -127,6 +174,18 @@ func responsesInputToChatMessages(instructions string, inputRaw json.RawMessage) return nil, err } messages = append(messages, ChatMessage{Role: "user", Content: content}) + pendingReasoning = "" + continue + } + + // Only genuine message items become chat messages. Codex emits other + // Responses item types with no Chat equivalent (web_search_call, + // local_shell_call, custom tool calls, file_search_call, ...). Converting + // them via the generic path would insert a spurious message between an + // assistant tool_calls message and its tool reply, which DeepSeek rejects + // ("insufficient tool messages following tool_calls message"). Skip them. + if itemType != "" && itemType != "message" { + pendingReasoning = "" continue } @@ -140,15 +199,128 @@ func responsesInputToChatMessages(instructions string, inputRaw json.RawMessage) if err != nil { return nil, err } - messages = append(messages, ChatMessage{ - Role: role, - Content: chatContent, - }) + messages = append(messages, ChatMessage{Role: role, Content: chatContent}) + // Reasoning only survives across an assistant text message. + if role != "assistant" { + pendingReasoning = "" + } } return messages, nil } +// normalizeChatMessages is the single place that enforces the tool-call +// invariant the DeepSeek / OpenAI Chat Completions schema requires: an assistant +// message with tool_calls must be immediately followed by one tool message per +// tool_call_id, in order, with nothing in between. +// +// Codex histories violate this in several ways that the builder alone can't fix: +// - a non-tool message lands between an assistant tool_calls message and its +// tool replies (e.g. an "Approved command prefix saved" system notice codex +// injects mid tool-execution); +// - a parallel tool_call's sibling output never arrives, or a call is left +// dangling by a mid-execution reconnect (unanswered tool_call); +// - a tool reply has no announcing assistant tool_call (orphan). +// +// It rebuilds the sequence so each assistant's answered tool_calls are followed +// directly by their replies (in call order); unanswered tool_calls are dropped +// (and an assistant left with neither tool_calls nor content is dropped); orphan +// tool replies and intervening messages are emitted in their natural position +// but never between an assistant tool_calls message and its replies. +func normalizeChatMessages(messages []ChatMessage) []ChatMessage { + // Index every tool reply by its tool_call_id (last wins on duplicates). + replies := make(map[string]ChatMessage) + for _, m := range messages { + if m.Role == "tool" && m.ToolCallID != "" { + replies[m.ToolCallID] = m + } + } + + out := make([]ChatMessage, 0, len(messages)) + for _, m := range messages { + switch { + case m.Role == "tool": + // A bare tool message with no tool_call_id is a direct Chat + // Completions passthrough; keep it in place. A tool reply whose id is + // announced by an assistant is emitted right after that assistant + // (skip the standalone occurrence). Any other tool reply is an orphan + // and is dropped. + if m.ToolCallID == "" { + out = append(out, m) + } + continue + case len(m.ToolCalls) > 0: + kept := make([]ChatToolCall, 0, len(m.ToolCalls)) + for _, tc := range m.ToolCalls { + if tc.ID == "" { + continue + } + if _, ok := replies[tc.ID]; ok { + kept = append(kept, tc) + } + } + if len(kept) == 0 { + // No answered tool_calls left: keep as a plain message if it has + // content, otherwise drop it entirely. + if isBlankChatContent(m.Content) { + continue + } + m.ToolCalls = nil + out = append(out, m) + continue + } + m.ToolCalls = kept + out = append(out, m) + for _, tc := range kept { + out = append(out, replies[tc.ID]) + } + default: + out = append(out, m) + } + } + return out +} + +// isBlankChatContent reports whether a chat message content holds no usable text. +func isBlankChatContent(raw json.RawMessage) bool { + raw = bytesTrimSpace(raw) + if len(raw) == 0 || string(raw) == "null" || string(raw) == `""` { + return true + } + return chatMessageContentText(raw) == "" +} + +// extractResponsesReasoningText pulls the reasoning text out of a Responses +// reasoning item. The Chat→Responses bridge writes the upstream reasoning_content +// verbatim into the summary_text parts (see closeChatReasoningItem), so codex +// round-trips it there; prefer summary[].text and fall back to content. +func extractResponsesReasoningText(item map[string]json.RawMessage) string { + var parts []string + collect := func(raw json.RawMessage) { + raw = bytesTrimSpace(raw) + if len(raw) == 0 || string(raw) == "null" { + return + } + var arr []map[string]json.RawMessage + if err := json.Unmarshal(raw, &arr); err == nil { + for _, p := range arr { + if t := rawString(p["text"]); t != "" { + parts = append(parts, t) + } + } + return + } + if t := rawString(raw); t != "" { + parts = append(parts, t) + } + } + collect(item["summary"]) + if len(parts) == 0 { + collect(item["content"]) + } + return strings.Join(parts, "\n") +} + func chatCompletionsBridgeRole(role string) string { trimmed := strings.TrimSpace(role) if trimmed == "" { @@ -448,10 +620,32 @@ type ChatCompletionsToResponsesStreamState struct { CreatedSent bool CompletedSent bool + // nextOutputIndex assigns sequential output_index values to items as they + // are opened (reasoning, message, tool calls), so the streamed indices match + // the order of items in the final response.output array. + nextOutputIndex int + + // Reasoning item lifecycle. DeepSeek-style upstreams stream all + // reasoning_content before any content, so reasoning is modeled as its own + // "reasoning" output item that must be opened (output_item.added) before any + // reasoning delta and closed before the message/tool items open. + ReasoningItemID string + ReasoningIndex int + ReasoningOpen bool + ReasoningDone bool + + // Message item + output_text content-part lifecycle. MessageItemID string - Text strings.Builder - Reasoning strings.Builder - ToolCalls map[int]*ChatToolCall + MessageIndex int + TextPartOpen bool + + Text strings.Builder + Reasoning strings.Builder + + // Tool-call lifecycle, keyed by the upstream tool_call index. + ToolCalls map[int]*ChatToolCall + ToolItemIDs map[int]string + ToolOutputIndex map[int]int FinishReason string Usage *ResponsesUsage @@ -460,13 +654,21 @@ type ChatCompletionsToResponsesStreamState struct { // NewChatCompletionsToResponsesStreamState returns an initialized stream state. func NewChatCompletionsToResponsesStreamState(model string) *ChatCompletionsToResponsesStreamState { return &ChatCompletionsToResponsesStreamState{ - ResponseID: generateResponsesID(), - Model: model, - Created: time.Now().Unix(), - ToolCalls: make(map[int]*ChatToolCall), + ResponseID: generateResponsesID(), + Model: model, + Created: time.Now().Unix(), + ToolCalls: make(map[int]*ChatToolCall), + ToolItemIDs: make(map[int]string), + ToolOutputIndex: make(map[int]int), } } +func (state *ChatCompletionsToResponsesStreamState) allocOutputIndex() int { + idx := state.nextOutputIndex + state.nextOutputIndex++ + return idx +} + // ChatCompletionsChunkToResponsesEvents converts one Chat Completions stream // chunk into zero or more Responses stream events. func ChatCompletionsChunkToResponsesEvents( @@ -490,24 +692,34 @@ func ChatCompletionsChunkToResponsesEvents( events = append(events, ensureChatToResponsesCreated(state)...) for _, choice := range chunk.Choices { - if choice.Delta.Content != nil { + // Reasoning is emitted as its own output item and must be opened + // (output_item.added + reasoning_summary_part.added) before the first + // delta, otherwise a strict client discards the delta. The leading + // empty-string reasoning delta upstreams send is filtered out. + if choice.Delta.ReasoningContent != nil && *choice.Delta.ReasoningContent != "" { + events = append(events, ensureChatReasoningItem(state)...) + _, _ = state.Reasoning.WriteString(*choice.Delta.ReasoningContent) + events = append(events, chatToResponsesEvent(state, "response.reasoning_summary_text.delta", &ResponsesStreamEvent{ + OutputIndex: state.ReasoningIndex, + SummaryIndex: 0, + Delta: *choice.Delta.ReasoningContent, + ItemID: state.ReasoningItemID, + })) + } + if choice.Delta.Content != nil && *choice.Delta.Content != "" { + // First real content closes the reasoning item, then opens the + // message item and its output_text content part. + events = append(events, closeChatReasoningItem(state)...) events = append(events, ensureChatToResponsesMessageItem(state)...) + events = append(events, ensureChatToResponsesTextPart(state)...) _, _ = state.Text.WriteString(*choice.Delta.Content) events = append(events, chatToResponsesEvent(state, "response.output_text.delta", &ResponsesStreamEvent{ - OutputIndex: 0, + OutputIndex: state.MessageIndex, ContentIndex: 0, Delta: *choice.Delta.Content, ItemID: state.MessageItemID, })) } - if choice.Delta.ReasoningContent != nil { - _, _ = state.Reasoning.WriteString(*choice.Delta.ReasoningContent) - events = append(events, chatToResponsesEvent(state, "response.reasoning_summary_text.delta", &ResponsesStreamEvent{ - OutputIndex: 0, - SummaryIndex: 0, - Delta: *choice.Delta.ReasoningContent, - })) - } for _, toolCall := range choice.Delta.ToolCalls { idx := 0 if toolCall.Index != nil { @@ -515,6 +727,8 @@ func ChatCompletionsChunkToResponsesEvents( } stored, ok := state.ToolCalls[idx] if !ok { + // A tool call closes any open reasoning item first. + events = append(events, closeChatReasoningItem(state)...) copyCall := toolCall if copyCall.ID == "" { copyCall.ID = generateItemID() @@ -522,11 +736,14 @@ func ChatCompletionsChunkToResponsesEvents( copyCall.Type = "function" state.ToolCalls[idx] = ©Call stored = ©Call + itemID := generateItemID() + state.ToolItemIDs[idx] = itemID + state.ToolOutputIndex[idx] = state.allocOutputIndex() events = append(events, chatToResponsesEvent(state, "response.output_item.added", &ResponsesStreamEvent{ - OutputIndex: idx + 1, + OutputIndex: state.ToolOutputIndex[idx], Item: &ResponsesOutput{ Type: "function_call", - ID: generateItemID(), + ID: itemID, CallID: stored.ID, Name: stored.Function.Name, Status: "in_progress", @@ -543,7 +760,8 @@ func ChatCompletionsChunkToResponsesEvents( if toolCall.Function.Arguments != "" { stored.Function.Arguments += toolCall.Function.Arguments events = append(events, chatToResponsesEvent(state, "response.function_call_arguments.delta", &ResponsesStreamEvent{ - OutputIndex: idx + 1, + OutputIndex: state.ToolOutputIndex[idx], + ItemID: state.ToolItemIDs[idx], Delta: toolCall.Function.Arguments, CallID: stored.ID, Name: stored.Function.Name, @@ -565,24 +783,44 @@ func FinalizeChatCompletionsResponsesStream(state *ChatCompletionsToResponsesStr } var events []ResponsesStreamEvent events = append(events, ensureChatToResponsesCreated(state)...) + + // Close a reasoning item that never transitioned to content (reasoning-only + // or empty completion). + events = append(events, closeChatReasoningItem(state)...) + if state.MessageItemID != "" { - events = append(events, chatToResponsesEvent(state, "response.output_text.done", &ResponsesStreamEvent{ - OutputIndex: 0, - ContentIndex: 0, - Text: state.Text.String(), - ItemID: state.MessageItemID, - })) + if state.TextPartOpen { + events = append(events, chatToResponsesEvent(state, "response.output_text.done", &ResponsesStreamEvent{ + OutputIndex: state.MessageIndex, + ContentIndex: 0, + Text: state.Text.String(), + ItemID: state.MessageItemID, + })) + events = append(events, chatToResponsesEvent(state, "response.content_part.done", &ResponsesStreamEvent{ + OutputIndex: state.MessageIndex, + ContentIndex: 0, + ItemID: state.MessageItemID, + Part: &ResponsesContentPart{Type: "output_text", Text: state.Text.String()}, + })) + } events = append(events, chatToResponsesEvent(state, "response.output_item.done", &ResponsesStreamEvent{ - OutputIndex: 0, + OutputIndex: state.MessageIndex, Item: &ResponsesOutput{ - Type: "message", - ID: state.MessageItemID, - Role: "assistant", - Status: "completed", + Type: "message", + ID: state.MessageItemID, + Role: "assistant", + Content: []ResponsesContentPart{{Type: "output_text", Text: state.Text.String()}}, + Status: "completed", }, })) } + // Close every function_call item opened during the stream. Codex finalizes a + // tool call only after function_call_arguments.done + output_item.done for + // that item; without them the call never completes and the session wedges. + // Mirrors cc-switch's finalize_tools. + events = append(events, closeChatToolItems(state)...) + status := "completed" var incompleteDetails *ResponsesIncompleteDetails if state.FinishReason == "length" { @@ -621,22 +859,142 @@ func ensureChatToResponsesCreated(state *ChatCompletionsToResponsesStreamState) })} } +// ensureChatReasoningItem opens the reasoning output item (output_item.added + +// reasoning_summary_part.added) before the first reasoning delta. Codex renders +// streaming reasoning only when this summary-part lifecycle is present. +func ensureChatReasoningItem(state *ChatCompletionsToResponsesStreamState) []ResponsesStreamEvent { + if state.ReasoningOpen || state.ReasoningDone { + return nil + } + state.ReasoningOpen = true + state.ReasoningItemID = generateItemID() + state.ReasoningIndex = state.allocOutputIndex() + return []ResponsesStreamEvent{ + chatToResponsesEvent(state, "response.output_item.added", &ResponsesStreamEvent{ + OutputIndex: state.ReasoningIndex, + Item: &ResponsesOutput{Type: "reasoning", ID: state.ReasoningItemID, Status: "in_progress"}, + }), + chatToResponsesEvent(state, "response.reasoning_summary_part.added", &ResponsesStreamEvent{ + OutputIndex: state.ReasoningIndex, + SummaryIndex: 0, + ItemID: state.ReasoningItemID, + Part: &ResponsesContentPart{Type: "summary_text"}, + }), + } +} + +// closeChatReasoningItem emits the reasoning item's terminal events +// (reasoning_summary_text.done + reasoning_summary_part.done + output_item.done). +func closeChatReasoningItem(state *ChatCompletionsToResponsesStreamState) []ResponsesStreamEvent { + if !state.ReasoningOpen { + return nil + } + state.ReasoningOpen = false + state.ReasoningDone = true + reasoning := state.Reasoning.String() + return []ResponsesStreamEvent{ + chatToResponsesEvent(state, "response.reasoning_summary_text.done", &ResponsesStreamEvent{ + OutputIndex: state.ReasoningIndex, + SummaryIndex: 0, + Text: reasoning, + ItemID: state.ReasoningItemID, + }), + chatToResponsesEvent(state, "response.reasoning_summary_part.done", &ResponsesStreamEvent{ + OutputIndex: state.ReasoningIndex, + SummaryIndex: 0, + ItemID: state.ReasoningItemID, + Part: &ResponsesContentPart{Type: "summary_text", Text: reasoning}, + }), + chatToResponsesEvent(state, "response.output_item.done", &ResponsesStreamEvent{ + OutputIndex: state.ReasoningIndex, + Item: &ResponsesOutput{ + Type: "reasoning", + ID: state.ReasoningItemID, + Status: "completed", + Summary: []ResponsesSummary{{Type: "summary_text", Text: reasoning}}, + }, + }), + } +} + func ensureChatToResponsesMessageItem(state *ChatCompletionsToResponsesStreamState) []ResponsesStreamEvent { if state.MessageItemID != "" { return nil } state.MessageItemID = generateItemID() + state.MessageIndex = state.allocOutputIndex() return []ResponsesStreamEvent{chatToResponsesEvent(state, "response.output_item.added", &ResponsesStreamEvent{ - OutputIndex: 0, + OutputIndex: state.MessageIndex, Item: &ResponsesOutput{ - Type: "message", - ID: state.MessageItemID, - Role: "assistant", - Status: "in_progress", + Type: "message", + ID: state.MessageItemID, + Role: "assistant", + Status: "in_progress", + Content: []ResponsesContentPart{{Type: "output_text"}}, }, })} } +func ensureChatToResponsesTextPart(state *ChatCompletionsToResponsesStreamState) []ResponsesStreamEvent { + if state.TextPartOpen { + return nil + } + state.TextPartOpen = true + return []ResponsesStreamEvent{chatToResponsesEvent(state, "response.content_part.added", &ResponsesStreamEvent{ + OutputIndex: state.MessageIndex, + ContentIndex: 0, + ItemID: state.MessageItemID, + Part: &ResponsesContentPart{Type: "output_text", Text: ""}, + })} +} + +// closeChatToolItems emits function_call_arguments.done + output_item.done for +// every tool call opened during the stream, carrying the full call_id/name/ +// arguments so codex can deserialize and execute the call. Mirrors cc-switch's +// finalize_tools. +func closeChatToolItems(state *ChatCompletionsToResponsesStreamState) []ResponsesStreamEvent { + if len(state.ToolCalls) == 0 { + return nil + } + var events []ResponsesStreamEvent + for i := 0; i < len(state.ToolCalls); i++ { + toolCall, ok := state.ToolCalls[i] + if !ok || toolCall == nil { + continue + } + itemID, opened := state.ToolItemIDs[i] + if !opened { + continue + } + arguments := toolCall.Function.Arguments + if strings.TrimSpace(arguments) == "" { + arguments = "{}" + } + outputIndex := state.ToolOutputIndex[i] + events = append(events, + chatToResponsesEvent(state, "response.function_call_arguments.done", &ResponsesStreamEvent{ + OutputIndex: outputIndex, + ItemID: itemID, + CallID: toolCall.ID, + Name: toolCall.Function.Name, + Arguments: arguments, + }), + chatToResponsesEvent(state, "response.output_item.done", &ResponsesStreamEvent{ + OutputIndex: outputIndex, + Item: &ResponsesOutput{ + Type: "function_call", + ID: itemID, + CallID: toolCall.ID, + Name: toolCall.Function.Name, + Arguments: arguments, + Status: "completed", + }, + }), + ) + } + return events +} + func (state *ChatCompletionsToResponsesStreamState) chatOutput() []ResponsesOutput { var outputs []ResponsesOutput if state.Reasoning.Len() > 0 { diff --git a/backend/internal/pkg/apicompat/chatcompletions_responses_request_invariants_test.go b/backend/internal/pkg/apicompat/chatcompletions_responses_request_invariants_test.go new file mode 100644 index 00000000000..e54a453279d --- /dev/null +++ b/backend/internal/pkg/apicompat/chatcompletions_responses_request_invariants_test.go @@ -0,0 +1,187 @@ +package apicompat + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" +) + +// assertChatInvariants enforces the DeepSeek / OpenAI Chat Completions message +// invariants that, when violated, surface as upstream 400s. Used to validate the +// request-direction converter against golden codex request shapes. +func assertChatInvariants(t *testing.T, messages []ChatMessage) { + t.Helper() + for i, m := range messages { + // Every assistant tool_calls message must be immediately followed by one + // tool message per tool_call_id, in order. + if len(m.ToolCalls) > 0 { + for j, tc := range m.ToolCalls { + k := i + 1 + j + require.Lessf(t, k, len(messages), "tool_call %s has no following tool message", tc.ID) + require.Equalf(t, "tool", messages[k].Role, "tool_call %s not followed by a tool message", tc.ID) + require.Equalf(t, tc.ID, messages[k].ToolCallID, "tool reply order mismatch for %s", tc.ID) + } + } + // No two consecutive assistant messages. + if i > 0 && m.Role == "assistant" && messages[i-1].Role == "assistant" { + t.Fatalf("consecutive assistant messages at %d", i) + } + // No orphan tool replies. + if m.Role == "tool" { + require.NotEmptyf(t, m.ToolCallID, "tool message without tool_call_id at %d", i) + } + } +} + +func convertGolden(t *testing.T, input string) []ChatMessage { + t.Helper() + msgs, err := responsesInputToChatMessages("You are a helpful assistant.", json.RawMessage(input)) + require.NoError(t, err) + return msgs +} + +// Golden sample: a single tool-call turn (codex runs one shell/curl command), +// the shape that produced the original "no response" / 400. +func TestGolden_SingleToolCall(t *testing.T) { + msgs := convertGolden(t, `[ + {"type":"message","role":"user","content":[{"type":"input_text","text":"latest sha?"}]}, + {"type":"reasoning","summary":[{"type":"summary_text","text":"need to run curl"}]}, + {"type":"function_call","call_id":"call_a","name":"exec_command","arguments":"{\"cmd\":\"curl x\"}"}, + {"type":"function_call_output","call_id":"call_a","output":"deadbeef"} + ]`) + assertChatInvariants(t, msgs) + // reasoning_content must ride on the assistant tool-call message. + var asst *ChatMessage + for i := range msgs { + if len(msgs[i].ToolCalls) > 0 { + asst = &msgs[i] + } + } + require.NotNil(t, asst) + require.Equal(t, "need to run curl", asst.ReasoningContent) +} + +// Golden sample: parallel tool calls (codex runs git log + git tag at once). +func TestGolden_ParallelToolCalls(t *testing.T) { + msgs := convertGolden(t, `[ + {"type":"message","role":"user","content":[{"type":"input_text","text":"features?"}]}, + {"type":"reasoning","summary":[{"type":"summary_text","text":"inspect repo"}]}, + {"type":"function_call","call_id":"c0","name":"exec_command","arguments":"{\"cmd\":\"git log\"}"}, + {"type":"function_call","call_id":"c1","name":"exec_command","arguments":"{\"cmd\":\"git tag\"}"}, + {"type":"function_call_output","call_id":"c0","output":"log"}, + {"type":"function_call_output","call_id":"c1","output":"tags"} + ]`) + assertChatInvariants(t, msgs) + // Both parallel calls share ONE assistant message. + var toolMsgs int + for _, m := range msgs { + if len(m.ToolCalls) == 2 { + require.Equal(t, "c0", m.ToolCalls[0].ID) + require.Equal(t, "c1", m.ToolCalls[1].ID) + } + if m.Role == "tool" { + toolMsgs++ + } + } + require.Equal(t, 2, toolMsgs) +} + +// Golden sample: an unknown item type (web_search_call from a 联网查询) sitting +// between a function_call and its output must not break tool↔reply adjacency. +func TestGolden_UnknownItemBetweenToolCallAndOutput(t *testing.T) { + msgs := convertGolden(t, `[ + {"type":"message","role":"user","content":[{"type":"input_text","text":"search"}]}, + {"type":"reasoning","summary":[{"type":"summary_text","text":"let me search"}]}, + {"type":"function_call","call_id":"c0","name":"exec_command","arguments":"{}"}, + {"type":"web_search_call","id":"ws_1","status":"completed","action":{"type":"search","query":"x"}}, + {"type":"function_call_output","call_id":"c0","output":"result"} + ]`) + assertChatInvariants(t, msgs) +} + +// Sequential tool calls (a tool reply between two calls) must stay in distinct +// assistant messages. +func TestRequest_SequentialToolCallsStaySeparate(t *testing.T) { + msgs := convertGolden(t, `[ + {"type":"function_call","call_id":"c1","name":"exec","arguments":"{}"}, + {"type":"function_call_output","call_id":"c1","output":"r1"}, + {"type":"function_call","call_id":"c2","name":"exec","arguments":"{}"}, + {"type":"function_call_output","call_id":"c2","output":"r2"} + ]`) + assertChatInvariants(t, msgs) + assistants := 0 + for _, m := range msgs { + if len(m.ToolCalls) == 1 { + assistants++ + } + } + require.Equal(t, 2, assistants) +} + +// Golden sample: codex injects a message (e.g. an "Approved command prefix +// saved" notice) between a function_call and its output. The intervening message +// must be moved after the tool reply so the assistant tool_calls is immediately +// followed by its reply. +func TestGolden_MessageBetweenToolCallAndOutput(t *testing.T) { + msgs := convertGolden(t, `[ + {"type":"message","role":"user","content":[{"type":"input_text","text":"do it"}]}, + {"type":"reasoning","summary":[{"type":"summary_text","text":"run cmd"}]}, + {"type":"function_call","call_id":"A","name":"exec","arguments":"{}"}, + {"type":"message","role":"developer","content":[{"type":"input_text","text":"Approved command prefix saved"}]}, + {"type":"function_call_output","call_id":"A","output":"ok"} + ]`) + assertChatInvariants(t, msgs) + // The assistant tool_calls message is immediately followed by its tool reply. + for i, m := range msgs { + if len(m.ToolCalls) > 0 { + require.Equal(t, "tool", msgs[i+1].Role) + require.Equal(t, "A", msgs[i+1].ToolCallID) + } + } +} + +// Golden sample: a parallel tool call where one sibling's output is missing +// (codex interrupted/reconnected mid-execution). The unanswered tool_call must +// be dropped so the remaining assistant tool_calls are all answered. +func TestGolden_PartialParallelDropsUnansweredCall(t *testing.T) { + msgs := convertGolden(t, `[ + {"type":"message","role":"user","content":[{"type":"input_text","text":"q"}]}, + {"type":"reasoning","summary":[{"type":"summary_text","text":"r"}]}, + {"type":"function_call","call_id":"A","name":"exec","arguments":"{}"}, + {"type":"function_call","call_id":"B","name":"exec","arguments":"{}"}, + {"type":"function_call_output","call_id":"A","output":"oa"} + ]`) + assertChatInvariants(t, msgs) + for _, m := range msgs { + for _, tc := range m.ToolCalls { + require.NotEqual(t, "B", tc.ID, "unanswered tool_call B should have been dropped") + } + } +} + +// Golden sample: a dangling tool_call at the end of the history (no output yet). +// The assistant message holding only that call must be dropped entirely. +func TestGolden_DanglingToolCallDropped(t *testing.T) { + msgs := convertGolden(t, `[ + {"type":"message","role":"user","content":[{"type":"input_text","text":"q"}]}, + {"type":"reasoning","summary":[{"type":"summary_text","text":"r"}]}, + {"type":"function_call","call_id":"A","name":"exec","arguments":"{}"} + ]`) + assertChatInvariants(t, msgs) + for _, m := range msgs { + require.Empty(t, m.ToolCalls, "dangling unanswered tool_call should have been dropped") + } +} + +// normalizeChatMessages drops an orphan tool reply whose tool_call was never +// announced. +func TestNormalize_DropsOrphanToolReply(t *testing.T) { + msgs := convertGolden(t, `[ + {"type":"message","role":"user","content":[{"type":"input_text","text":"q"}]}, + {"type":"function_call_output","call_id":"ghost","output":"orphan"} + ]`) + for _, m := range msgs { + require.NotEqualf(t, "tool", m.Role, "orphan tool reply should have been dropped") + } +} diff --git a/backend/internal/pkg/apicompat/chatcompletions_responses_stream_lifecycle_test.go b/backend/internal/pkg/apicompat/chatcompletions_responses_stream_lifecycle_test.go new file mode 100644 index 00000000000..beb473038bc --- /dev/null +++ b/backend/internal/pkg/apicompat/chatcompletions_responses_stream_lifecycle_test.go @@ -0,0 +1,103 @@ +package apicompat + +import ( + "encoding/json" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func collectStreamEvents(t *testing.T, chunks []string) []ResponsesStreamEvent { + t.Helper() + state := NewChatCompletionsToResponsesStreamState("deepseek-v4-pro") + var events []ResponsesStreamEvent + for _, payload := range chunks { + var chunk ChatCompletionsChunk + require.NoError(t, json.Unmarshal([]byte(payload), &chunk)) + events = append(events, ChatCompletionsChunkToResponsesEvents(&chunk, state)...) + } + events = append(events, FinalizeChatCompletionsResponsesStream(state)...) + return events +} + +// TestStream_ReasoningOpensItemBeforeDelta guards the bug where a strict client +// (Codex) drops reasoning deltas that reference an item not yet opened. +func TestStream_ReasoningOpensItemBeforeDelta(t *testing.T) { + events := collectStreamEvents(t, []string{ + `{"choices":[{"index":0,"delta":{"role":"assistant","content":null,"reasoning_content":""}}]}`, + `{"choices":[{"index":0,"delta":{"reasoning_content":"think"}}]}`, + `{"choices":[{"index":0,"delta":{"content":"hello"}}]}`, + `{"choices":[{"index":0,"delta":{"content":""},"finish_reason":"stop"}],"usage":{"prompt_tokens":1,"completion_tokens":2,"total_tokens":3}}`, + }) + + open := map[int]string{} // output_index -> item type + for _, e := range events { + switch e.Type { + case "response.output_item.added": + require.NotNil(t, e.Item) + open[e.OutputIndex] = e.Item.Type + case "response.reasoning_summary_text.delta": + require.Equalf(t, "reasoning", open[e.OutputIndex], "reasoning delta before its item was opened") + case "response.output_text.delta": + require.Equalf(t, "message", open[e.OutputIndex], "text delta before its item was opened") + } + } +} + +// TestStream_ToolCallLifecycleComplete guards that a tool call is fully closed +// (function_call_arguments.done + output_item.done with full arguments), which +// codex needs to execute the call. +func TestStream_ToolCallLifecycleComplete(t *testing.T) { + events := collectStreamEvents(t, []string{ + `{"choices":[{"index":0,"delta":{"role":"assistant","reasoning_content":"plan"}}]}`, + `{"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"id":"call_a","type":"function","function":{"name":"exec","arguments":""}}]}}]}`, + `{"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"function":{"arguments":"{\"cmd\":\"ls\"}"}}]}}]}`, + `{"choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}],"usage":{"prompt_tokens":1,"completion_tokens":2,"total_tokens":3}}`, + }) + + var sawAdded, sawArgsDone, sawItemDone bool + for _, e := range events { + switch e.Type { + case "response.output_item.added": + if e.Item != nil && e.Item.Type == "function_call" { + sawAdded = true + } + case "response.function_call_arguments.done": + sawArgsDone = true + require.Equal(t, `{"cmd":"ls"}`, e.Arguments) + case "response.output_item.done": + if e.Item != nil && e.Item.Type == "function_call" { + sawItemDone = true + require.Equal(t, `{"cmd":"ls"}`, e.Item.Arguments) + require.Equal(t, "call_a", e.Item.CallID) + } + } + } + require.True(t, sawAdded, "function_call output_item.added missing") + require.True(t, sawArgsDone, "function_call_arguments.done missing") + require.True(t, sawItemDone, "function_call output_item.done missing") +} + +// TestStream_SSEWireComplete drives the full stream through SSE encoding and +// asserts the function_call events carry complete fields on the wire. +func TestStream_SSEWireComplete(t *testing.T) { + events := collectStreamEvents(t, []string{ + `{"choices":[{"index":0,"delta":{"role":"assistant","reasoning_content":"plan"}}]}`, + `{"choices":[{"index":0,"delta":{"tool_calls":[{"index":0,"id":"call_a","type":"function","function":{"name":"exec","arguments":"{}"}}]}}]}`, + `{"choices":[{"index":0,"delta":{},"finish_reason":"tool_calls"}]}`, + }) + + var addedLine string + for _, e := range events { + sse, err := ResponsesEventToSSE(e) + require.NoError(t, err) + if e.Type == "response.output_item.added" && e.Item != nil && e.Item.Type == "function_call" { + addedLine = sse + } + } + require.NotEmpty(t, addedLine) + // The function_call added event must carry arguments:"" on the wire. + require.True(t, strings.Contains(addedLine, `"arguments":""`), "added line missing arguments: %s", addedLine) + require.Contains(t, addedLine, `"call_id":"call_a"`) +} diff --git a/backend/internal/pkg/apicompat/responses_stream_event_wire.go b/backend/internal/pkg/apicompat/responses_stream_event_wire.go new file mode 100644 index 00000000000..df7a82e3937 --- /dev/null +++ b/backend/internal/pkg/apicompat/responses_stream_event_wire.go @@ -0,0 +1,199 @@ +package apicompat + +import "encoding/json" + +// MarshalJSON renders a ResponsesStreamEvent into its wire form. +// +// The OpenAI Responses streaming protocol requires several fields to be present +// even when they hold a zero value: output_index/content_index/summary_index are +// meaningful at 0, a function_call item must always carry call_id/name/arguments +// (arguments may be ""), a message item must carry content:[] and an output_text +// part must carry text/annotations/logprobs. Go's `omitempty` drops exactly those +// zero values, and strict clients (Codex CLI) reject items/deltas whose required +// fields are missing. +// +// Rather than marshalling with omitempty and patching the JSON afterwards, every +// streamed event type is constructed explicitly here — the Go analogue of the +// reference gateways' (cc-switch, CCX) per-event object construction. This is the +// single source of truth for Responses SSE field presence and applies uniformly +// to every emitter (Chat→Responses bridge and Anthropic→Responses converter). +// +// Event types not listed fall back to the default struct marshalling, which +// bounds the blast radius of this method to the streamed item/part/text/tool +// events. +func (e ResponsesStreamEvent) MarshalJSON() ([]byte, error) { + switch e.Type { + case "response.output_text.delta", "response.output_text.done": + m := e.wireBase() + e.putItemID(m) + m["output_index"] = e.OutputIndex + m["content_index"] = e.ContentIndex + if e.Type == "response.output_text.done" { + m["text"] = e.Text + } else { + m["delta"] = e.Delta + } + return json.Marshal(m) + + case "response.content_part.added", "response.content_part.done": + m := e.wireBase() + e.putItemID(m) + m["output_index"] = e.OutputIndex + m["content_index"] = e.ContentIndex + m["part"] = outputTextPartWire(e.Part) + return json.Marshal(m) + + case "response.reasoning_summary_text.delta", "response.reasoning_summary_text.done": + m := e.wireBase() + e.putItemID(m) + m["output_index"] = e.OutputIndex + m["summary_index"] = e.SummaryIndex + if e.Type == "response.reasoning_summary_text.done" { + m["text"] = e.Text + } else { + m["delta"] = e.Delta + } + return json.Marshal(m) + + case "response.reasoning_summary_part.added", "response.reasoning_summary_part.done": + m := e.wireBase() + e.putItemID(m) + m["output_index"] = e.OutputIndex + m["summary_index"] = e.SummaryIndex + m["part"] = summaryTextPartWire(e.Part) + return json.Marshal(m) + + case "response.output_item.added", "response.output_item.done": + m := e.wireBase() + m["output_index"] = e.OutputIndex + m["item"] = responsesItemWire(e.Item) + return json.Marshal(m) + + case "response.function_call_arguments.delta", "response.function_call_arguments.done": + m := e.wireBase() + e.putItemID(m) + m["output_index"] = e.OutputIndex + if e.CallID != "" { + m["call_id"] = e.CallID + } + if e.Name != "" { + m["name"] = e.Name + } + if e.Type == "response.function_call_arguments.done" { + m["arguments"] = e.Arguments + } else { + m["delta"] = e.Delta + } + return json.Marshal(m) + + default: + // response.created / completed / done / failed / incomplete and any + // event type not shaped above keep the default struct marshalling. + type alias ResponsesStreamEvent + return json.Marshal(alias(e)) + } +} + +func (e ResponsesStreamEvent) wireBase() map[string]any { + m := map[string]any{ + "type": e.Type, + "sequence_number": e.SequenceNumber, + } + return m +} + +func (e ResponsesStreamEvent) putItemID(m map[string]any) { + if e.ItemID != "" { + m["item_id"] = e.ItemID + } +} + +// outputTextPartWire renders a content part for a message's output_text, always +// carrying text/annotations/logprobs (matching cc-switch's push_text_delta). +func outputTextPartWire(part *ResponsesContentPart) map[string]any { + text := "" + if part != nil { + text = part.Text + } + return map[string]any{ + "type": "output_text", + "text": text, + "annotations": []any{}, + "logprobs": []any{}, + } +} + +// summaryTextPartWire renders a reasoning summary part. +func summaryTextPartWire(part *ResponsesContentPart) map[string]any { + text := "" + if part != nil { + text = part.Text + } + return map[string]any{ + "type": "summary_text", + "text": text, + } +} + +// responsesItemWire renders an output_item with every field the item's type +// requires to be present, including the empty arrays/strings that omitempty +// would otherwise drop. Mirrors cc-switch's response_function_call_item and the +// message/reasoning item shapes codex expects. +func responsesItemWire(item *ResponsesOutput) map[string]any { + if item == nil { + return map[string]any{} + } + m := map[string]any{ + "type": item.Type, + "id": item.ID, + } + if item.Status != "" { + m["status"] = item.Status + } + switch item.Type { + case "message": + role := item.Role + if role == "" { + role = "assistant" + } + m["role"] = role + m["content"] = messageContentWire(item.Content) + case "reasoning": + m["summary"] = reasoningSummaryWire(item.Summary) + if item.EncryptedContent != "" { + m["encrypted_content"] = item.EncryptedContent + } + case "function_call": + m["call_id"] = item.CallID + m["name"] = item.Name + m["arguments"] = item.Arguments + } + return m +} + +// messageContentWire renders a message item's content array; always an array +// (never null), with each output_text part carrying its text. +func messageContentWire(parts []ResponsesContentPart) []map[string]any { + out := make([]map[string]any, 0, len(parts)) + for _, p := range parts { + typ := p.Type + if typ == "" { + typ = "output_text" + } + out = append(out, map[string]any{"type": typ, "text": p.Text}) + } + return out +} + +// reasoningSummaryWire renders a reasoning item's summary array; always an array. +func reasoningSummaryWire(summary []ResponsesSummary) []map[string]any { + out := make([]map[string]any, 0, len(summary)) + for _, s := range summary { + typ := s.Type + if typ == "" { + typ = "summary_text" + } + out = append(out, map[string]any{"type": typ, "text": s.Text}) + } + return out +} diff --git a/backend/internal/pkg/apicompat/responses_stream_event_wire_test.go b/backend/internal/pkg/apicompat/responses_stream_event_wire_test.go new file mode 100644 index 00000000000..b4f6871d5f4 --- /dev/null +++ b/backend/internal/pkg/apicompat/responses_stream_event_wire_test.go @@ -0,0 +1,113 @@ +package apicompat + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" +) + +// marshalEvent marshals through the custom MarshalJSON and returns the decoded +// object plus the set of top-level keys. +func marshalEvent(t *testing.T, e ResponsesStreamEvent) map[string]any { + t.Helper() + b, err := json.Marshal(e) + require.NoError(t, err) + var m map[string]any + require.NoError(t, json.Unmarshal(b, &m)) + return m +} + +// TestWire_IndexFieldsPresentAtZero guards the omitempty trap: output_index/ +// content_index/summary_index must serialize even when 0. +func TestWire_IndexFieldsPresentAtZero(t *testing.T) { + m := marshalEvent(t, ResponsesStreamEvent{ + Type: "response.output_text.delta", OutputIndex: 0, ContentIndex: 0, ItemID: "msg_1", Delta: "hi", + }) + require.Contains(t, m, "output_index") + require.Contains(t, m, "content_index") + require.EqualValues(t, 0, m["output_index"]) + + r := marshalEvent(t, ResponsesStreamEvent{ + Type: "response.reasoning_summary_text.delta", OutputIndex: 0, SummaryIndex: 0, ItemID: "rs_1", Delta: "think", + }) + require.Contains(t, r, "output_index") + require.Contains(t, r, "summary_index") +} + +// TestWire_FunctionCallItemAlwaysComplete guards that a function_call item +// always carries call_id/name/arguments, including arguments:"" on .added. +func TestWire_FunctionCallItemAlwaysComplete(t *testing.T) { + added := marshalEvent(t, ResponsesStreamEvent{ + Type: "response.output_item.added", + OutputIndex: 1, + Item: &ResponsesOutput{Type: "function_call", ID: "fc_1", CallID: "call_a", Name: "exec", Status: "in_progress"}, + }) + item, ok := added["item"].(map[string]any) + require.True(t, ok, "item must be an object") + for _, k := range []string{"call_id", "name", "arguments"} { + require.Containsf(t, item, k, "function_call item missing %q", k) + } + require.Equal(t, "", item["arguments"]) +} + +// TestWire_MessageItemContentAlwaysArray guards content:[] presence. +func TestWire_MessageItemContentAlwaysArray(t *testing.T) { + m := marshalEvent(t, ResponsesStreamEvent{ + Type: "response.output_item.added", + OutputIndex: 0, + Item: &ResponsesOutput{Type: "message", ID: "msg_1", Role: "assistant", Status: "in_progress"}, + }) + item, ok := m["item"].(map[string]any) + require.True(t, ok, "item must be an object") + require.Contains(t, item, "content") + _, ok = item["content"].([]any) + require.True(t, ok, "content must be an array") +} + +// TestWire_ReasoningItemSummaryAlwaysArray guards summary:[] presence. +func TestWire_ReasoningItemSummaryAlwaysArray(t *testing.T) { + m := marshalEvent(t, ResponsesStreamEvent{ + Type: "response.output_item.added", + OutputIndex: 0, + Item: &ResponsesOutput{Type: "reasoning", ID: "rs_1", Status: "in_progress"}, + }) + item, ok := m["item"].(map[string]any) + require.True(t, ok, "item must be an object") + require.Contains(t, item, "summary") + _, ok = item["summary"].([]any) + require.True(t, ok, "summary must be an array") +} + +// TestWire_ContentPartCarriesAnnotationsLogprobs guards the output_text part shape. +func TestWire_ContentPartCarriesAnnotationsLogprobs(t *testing.T) { + m := marshalEvent(t, ResponsesStreamEvent{ + Type: "response.content_part.added", OutputIndex: 0, ContentIndex: 0, ItemID: "msg_1", + Part: &ResponsesContentPart{Type: "output_text", Text: ""}, + }) + part, ok := m["part"].(map[string]any) + require.True(t, ok, "part must be an object") + require.Equal(t, "output_text", part["type"]) + require.Contains(t, part, "text") + require.Contains(t, part, "annotations") + require.Contains(t, part, "logprobs") +} + +// TestWire_ArgumentsDonePresentEvenEmpty guards arguments presence on done. +func TestWire_ArgumentsDonePresentEvenEmpty(t *testing.T) { + m := marshalEvent(t, ResponsesStreamEvent{ + Type: "response.function_call_arguments.done", OutputIndex: 1, ItemID: "fc_1", CallID: "call_a", Name: "exec", Arguments: "", + }) + require.Contains(t, m, "arguments") + require.Equal(t, "", m["arguments"]) +} + +// TestWire_UnknownEventFallsBackToDefault ensures non-streamed event types keep +// default marshalling (the response object is preserved). +func TestWire_UnknownEventFallsBackToDefault(t *testing.T) { + m := marshalEvent(t, ResponsesStreamEvent{ + Type: "response.completed", + Response: &ResponsesResponse{ID: "resp_1", Object: "response", Status: "completed"}, + }) + require.Contains(t, m, "response") +} diff --git a/backend/internal/pkg/apicompat/types.go b/backend/internal/pkg/apicompat/types.go index b4451f235bb..d2937802789 100644 --- a/backend/internal/pkg/apicompat/types.go +++ b/backend/internal/pkg/apicompat/types.go @@ -406,6 +406,10 @@ type ResponsesStreamEvent struct { // Reuses Text/Delta fields above, SummaryIndex identifies which summary part SummaryIndex int `json:"summary_index,omitempty"` + // response.content_part.added / done and + // response.reasoning_summary_part.added / done + Part *ResponsesContentPart `json:"part,omitempty"` + // error event fields Code string `json:"code,omitempty"` Param string `json:"param,omitempty"`