From 4402e24185bcf49716a9303e3374a2e5fb984871 Mon Sep 17 00:00:00 2001 From: "zhibin.an" Date: Fri, 29 May 2026 13:49:42 +0800 Subject: [PATCH 1/2] fix: emit responses content part lifecycle events --- .../pkg/apicompat/anthropic_responses_test.go | 49 +++++++++++++++++++ .../anthropic_to_responses_response.go | 40 ++++++++++++++- .../chatcompletions_responses_bridge.go | 30 ++++++++++++ .../chatcompletions_responses_test.go | 38 ++++++++++++++ backend/internal/pkg/apicompat/types.go | 3 ++ ...ai_gateway_responses_chat_fallback_test.go | 2 + 6 files changed, 161 insertions(+), 1 deletion(-) diff --git a/backend/internal/pkg/apicompat/anthropic_responses_test.go b/backend/internal/pkg/apicompat/anthropic_responses_test.go index 8997835c2aa..adf4174679e 100644 --- a/backend/internal/pkg/apicompat/anthropic_responses_test.go +++ b/backend/internal/pkg/apicompat/anthropic_responses_test.go @@ -497,6 +497,55 @@ func TestStreamingTextOnly(t *testing.T) { assert.Equal(t, "message_stop", events[1].Type) } +func TestAnthropicToResponsesEvents_TextStreamingIncludesContentPartLifecycle(t *testing.T) { + state := NewAnthropicEventToResponsesState() + + created := AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "message_start", + Message: &AnthropicResponse{ + ID: "msg_stream", + Model: "claude-sonnet-4-5", + }, + }, state) + require.Len(t, created, 1) + assert.Equal(t, "response.created", created[0].Type) + + events := AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_start", + ContentBlock: &AnthropicContentBlock{ + Type: "text", + }, + }, state) + require.Len(t, events, 2) + assert.Equal(t, "response.output_item.added", events[0].Type) + assert.Equal(t, "response.content_part.added", events[1].Type) + require.NotNil(t, events[1].Part) + assert.Equal(t, "output_text", events[1].Part.Type) + assert.Equal(t, "", events[1].Part.Text) + + delta := AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_delta", + Delta: &AnthropicDelta{ + Type: "text_delta", + Text: "Hello", + }, + }, state) + require.Len(t, delta, 1) + assert.Equal(t, "response.output_text.delta", delta[0].Type) + assert.Equal(t, events[1].ItemID, delta[0].ItemID) + + done := AnthropicEventToResponsesEvents(&AnthropicStreamEvent{ + Type: "content_block_stop", + }, state) + require.Len(t, done, 2) + assert.Equal(t, "response.output_text.done", done[0].Type) + assert.Equal(t, "response.content_part.done", done[1].Type) + require.NotNil(t, done[1].Part) + assert.Equal(t, "output_text", done[1].Part.Type) + assert.Equal(t, "Hello", done[1].Part.Text) + assert.Equal(t, events[1].ItemID, done[1].ItemID) +} + func TestResponsesEventToAnthropicEvents_ResponseDone(t *testing.T) { state := NewResponsesEventToAnthropicState() state.Model = "gpt-4o" diff --git a/backend/internal/pkg/apicompat/anthropic_to_responses_response.go b/backend/internal/pkg/apicompat/anthropic_to_responses_response.go index de8ab78df89..f2351dc677d 100644 --- a/backend/internal/pkg/apicompat/anthropic_to_responses_response.go +++ b/backend/internal/pkg/apicompat/anthropic_to_responses_response.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "strings" "time" ) @@ -151,6 +152,8 @@ type AnthropicEventToResponsesState struct { // For message output: accumulate text parts ContentIndex int + TextBuffer strings.Builder + TextPartOpen bool // For function_call: track per-output info CurrentCallID string @@ -278,6 +281,8 @@ func anthToResHandleContentBlockStart(evt *AnthropicStreamEvent, state *Anthropi state.CurrentItemID = generateItemID() state.CurrentItemType = "message" state.ContentIndex = 0 + state.TextBuffer.Reset() + state.TextPartOpen = false events = append(events, makeResponsesEvent(state, "response.output_item.added", &ResponsesStreamEvent{ OutputIndex: state.OutputIndex, @@ -290,6 +295,19 @@ func anthToResHandleContentBlockStart(evt *AnthropicStreamEvent, state *Anthropi })) } + if !state.TextPartOpen { + state.TextPartOpen = true + events = append(events, makeResponsesEvent(state, "response.content_part.added", &ResponsesStreamEvent{ + OutputIndex: state.OutputIndex, + ContentIndex: state.ContentIndex, + ItemID: state.CurrentItemID, + Part: &ResponsesContentPart{ + Type: "output_text", + Text: "", + }, + })) + } + case "tool_use": // Close previous item if any events = append(events, closeCurrentResponsesItem(state)...) @@ -324,6 +342,9 @@ func anthToResHandleContentBlockDelta(evt *AnthropicStreamEvent, state *Anthropi if evt.Delta.Text == "" { return nil } + if state.CurrentItemType == "message" { + _, _ = state.TextBuffer.WriteString(evt.Delta.Text) + } return []ResponsesStreamEvent{makeResponsesEvent(state, "response.output_text.delta", &ResponsesStreamEvent{ OutputIndex: state.OutputIndex, ContentIndex: state.ContentIndex, @@ -391,13 +412,28 @@ func anthToResHandleContentBlockStop(evt *AnthropicStreamEvent, state *Anthropic case "message": // Emit output_text.done (text block is done, but message item stays open for potential more blocks) - return []ResponsesStreamEvent{ + events := []ResponsesStreamEvent{ makeResponsesEvent(state, "response.output_text.done", &ResponsesStreamEvent{ OutputIndex: state.OutputIndex, ContentIndex: state.ContentIndex, + Text: state.TextBuffer.String(), ItemID: state.CurrentItemID, }), } + if state.TextPartOpen { + events = append(events, makeResponsesEvent(state, "response.content_part.done", &ResponsesStreamEvent{ + OutputIndex: state.OutputIndex, + ContentIndex: state.ContentIndex, + ItemID: state.CurrentItemID, + Part: &ResponsesContentPart{ + Type: "output_text", + Text: state.TextBuffer.String(), + }, + })) + state.TextPartOpen = false + } + state.TextBuffer.Reset() + return events } return nil @@ -458,6 +494,8 @@ func closeCurrentResponsesItem(state *AnthropicEventToResponsesState) []Response state.CurrentName = "" state.OutputIndex++ state.ContentIndex = 0 + state.TextBuffer.Reset() + state.TextPartOpen = false return []ResponsesStreamEvent{makeResponsesEvent(state, "response.output_item.done", &ResponsesStreamEvent{ OutputIndex: state.OutputIndex - 1, // Use the index before increment diff --git a/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go b/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go index 09b680c7c73..e37ec1e30cb 100644 --- a/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go +++ b/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go @@ -449,6 +449,7 @@ type ChatCompletionsToResponsesStreamState struct { CompletedSent bool MessageItemID string + TextPartOpen bool Text strings.Builder Reasoning strings.Builder ToolCalls map[int]*ChatToolCall @@ -492,6 +493,7 @@ func ChatCompletionsChunkToResponsesEvents( for _, choice := range chunk.Choices { if choice.Delta.Content != nil { events = append(events, ensureChatToResponsesMessageItem(state)...) + events = append(events, ensureChatToResponsesTextPart(state)...) _, _ = state.Text.WriteString(*choice.Delta.Content) events = append(events, chatToResponsesEvent(state, "response.output_text.delta", &ResponsesStreamEvent{ OutputIndex: 0, @@ -572,6 +574,18 @@ func FinalizeChatCompletionsResponsesStream(state *ChatCompletionsToResponsesStr Text: state.Text.String(), ItemID: state.MessageItemID, })) + if state.TextPartOpen { + events = append(events, chatToResponsesEvent(state, "response.content_part.done", &ResponsesStreamEvent{ + OutputIndex: 0, + ContentIndex: 0, + ItemID: state.MessageItemID, + Part: &ResponsesContentPart{ + Type: "output_text", + Text: state.Text.String(), + }, + })) + state.TextPartOpen = false + } events = append(events, chatToResponsesEvent(state, "response.output_item.done", &ResponsesStreamEvent{ OutputIndex: 0, Item: &ResponsesOutput{ @@ -637,6 +651,22 @@ func ensureChatToResponsesMessageItem(state *ChatCompletionsToResponsesStreamSta })} } +func ensureChatToResponsesTextPart(state *ChatCompletionsToResponsesStreamState) []ResponsesStreamEvent { + if state.TextPartOpen || state.MessageItemID == "" { + return nil + } + state.TextPartOpen = true + return []ResponsesStreamEvent{chatToResponsesEvent(state, "response.content_part.added", &ResponsesStreamEvent{ + OutputIndex: 0, + ContentIndex: 0, + ItemID: state.MessageItemID, + Part: &ResponsesContentPart{ + Type: "output_text", + Text: "", + }, + })} +} + func (state *ChatCompletionsToResponsesStreamState) chatOutput() []ResponsesOutput { var outputs []ResponsesOutput if state.Reasoning.Len() > 0 { diff --git a/backend/internal/pkg/apicompat/chatcompletions_responses_test.go b/backend/internal/pkg/apicompat/chatcompletions_responses_test.go index b03b012fc7a..9d1d5ac3262 100644 --- a/backend/internal/pkg/apicompat/chatcompletions_responses_test.go +++ b/backend/internal/pkg/apicompat/chatcompletions_responses_test.go @@ -826,6 +826,44 @@ func TestResponsesEventToChatChunks_TextDelta(t *testing.T) { assert.Equal(t, "Hello", *chunks[0].Choices[0].Delta.Content) } +func TestChatCompletionsToResponsesEvents_TextStreamingIncludesContentPartLifecycle(t *testing.T) { + state := NewChatCompletionsToResponsesStreamState("gpt-4o") + content := "Hello" + chunk := &ChatCompletionsChunk{ + ID: "chatcmpl_stream", + Model: "gpt-4o", + Choices: []ChatChunkChoice{ + { + Delta: ChatDelta{Content: &content}, + }, + }, + } + + events := ChatCompletionsChunkToResponsesEvents(chunk, state) + require.Len(t, events, 4) + assert.Equal(t, "response.created", events[0].Type) + assert.Equal(t, "response.output_item.added", events[1].Type) + assert.Equal(t, "response.content_part.added", events[2].Type) + assert.Equal(t, "response.output_text.delta", events[3].Type) + require.NotNil(t, events[2].Part) + assert.Equal(t, "output_text", events[2].Part.Type) + assert.Equal(t, "", events[2].Part.Text) + assert.Equal(t, events[1].Item.ID, events[2].ItemID) + assert.Equal(t, events[2].ItemID, events[3].ItemID) + assert.Equal(t, 0, events[2].ContentIndex) + + done := FinalizeChatCompletionsResponsesStream(state) + require.Len(t, done, 4) + assert.Equal(t, "response.output_text.done", done[0].Type) + assert.Equal(t, "response.content_part.done", done[1].Type) + assert.Equal(t, "response.output_item.done", done[2].Type) + assert.Equal(t, "response.completed", done[3].Type) + require.NotNil(t, done[1].Part) + assert.Equal(t, "output_text", done[1].Part.Type) + assert.Equal(t, "Hello", done[1].Part.Text) + assert.Equal(t, events[2].ItemID, done[1].ItemID) +} + func TestResponsesEventToChatChunks_ToolCallDelta(t *testing.T) { state := NewResponsesEventToChatState() state.Model = "gpt-4o" diff --git a/backend/internal/pkg/apicompat/types.go b/backend/internal/pkg/apicompat/types.go index b4451f235bb..1dc8918c92f 100644 --- a/backend/internal/pkg/apicompat/types.go +++ b/backend/internal/pkg/apicompat/types.go @@ -390,6 +390,9 @@ type ResponsesStreamEvent struct { // response.output_item.added / response.output_item.done Item *ResponsesOutput `json:"item,omitempty"` + // response.content_part.added / response.content_part.done + Part *ResponsesContentPart `json:"part,omitempty"` + // response.output_text.delta / response.output_text.done OutputIndex int `json:"output_index,omitempty"` ContentIndex int `json:"content_index,omitempty"` diff --git a/backend/internal/service/openai_gateway_responses_chat_fallback_test.go b/backend/internal/service/openai_gateway_responses_chat_fallback_test.go index abb645e8338..79eb9357913 100644 --- a/backend/internal/service/openai_gateway_responses_chat_fallback_test.go +++ b/backend/internal/service/openai_gateway_responses_chat_fallback_test.go @@ -91,7 +91,9 @@ func TestForwardResponses_ForceChatCompletionsRoutesStreamingToChatCompletions(t require.NotNil(t, result) require.Equal(t, "http://upstream.example/v1/chat/completions", upstream.lastReq.URL.String()) require.True(t, gjson.GetBytes(upstream.lastBody, "stream_options.include_usage").Bool()) + require.Contains(t, rec.Body.String(), "event: response.content_part.added") require.Contains(t, rec.Body.String(), "event: response.output_text.delta") + require.Contains(t, rec.Body.String(), "event: response.content_part.done") require.Contains(t, rec.Body.String(), `"delta":"he"`) require.Contains(t, rec.Body.String(), "event: response.completed") require.Contains(t, rec.Body.String(), `"input_tokens":4`) From 871a98cba61a4cafe3c458f4ba514a056a981b20 Mon Sep 17 00:00:00 2001 From: "zhibin.an" Date: Mon, 1 Jun 2026 10:55:12 +0800 Subject: [PATCH 2/2] Fix reasoning stream ordering for chat completions --- .../chatcompletions_responses_bridge.go | 3 +- .../chatcompletions_responses_test.go | 41 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go b/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go index e37ec1e30cb..01f0f4040f4 100644 --- a/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go +++ b/backend/internal/pkg/apicompat/chatcompletions_responses_bridge.go @@ -502,7 +502,8 @@ func ChatCompletionsChunkToResponsesEvents( ItemID: state.MessageItemID, })) } - if choice.Delta.ReasoningContent != nil { + if choice.Delta.ReasoningContent != nil && *choice.Delta.ReasoningContent != "" { + events = append(events, ensureChatToResponsesMessageItem(state)...) _, _ = state.Reasoning.WriteString(*choice.Delta.ReasoningContent) events = append(events, chatToResponsesEvent(state, "response.reasoning_summary_text.delta", &ResponsesStreamEvent{ OutputIndex: 0, diff --git a/backend/internal/pkg/apicompat/chatcompletions_responses_test.go b/backend/internal/pkg/apicompat/chatcompletions_responses_test.go index 9d1d5ac3262..218449b6191 100644 --- a/backend/internal/pkg/apicompat/chatcompletions_responses_test.go +++ b/backend/internal/pkg/apicompat/chatcompletions_responses_test.go @@ -864,6 +864,47 @@ func TestChatCompletionsToResponsesEvents_TextStreamingIncludesContentPartLifecy assert.Equal(t, events[2].ItemID, done[1].ItemID) } +func TestChatCompletionsToResponsesEvents_ReasoningStreamingCreatesMessageItemBeforeDelta(t *testing.T) { + state := NewChatCompletionsToResponsesStreamState("deepseek-v4-pro") + emptyReasoning := "" + + emptyChunk := &ChatCompletionsChunk{ + ID: "chatcmpl_reasoning", + Model: "deepseek-v4-pro", + Choices: []ChatChunkChoice{ + { + Delta: ChatDelta{ReasoningContent: &emptyReasoning}, + }, + }, + } + + events := ChatCompletionsChunkToResponsesEvents(emptyChunk, state) + require.Len(t, events, 1) + assert.Equal(t, "response.created", events[0].Type) + + reasoning := "internal plan" + reasoningChunk := &ChatCompletionsChunk{ + ID: "chatcmpl_reasoning", + Model: "deepseek-v4-pro", + Choices: []ChatChunkChoice{ + { + Delta: ChatDelta{ReasoningContent: &reasoning}, + }, + }, + } + + events = ChatCompletionsChunkToResponsesEvents(reasoningChunk, state) + require.Len(t, events, 2) + assert.Equal(t, "response.output_item.added", events[0].Type) + assert.Equal(t, "response.reasoning_summary_text.delta", events[1].Type) + require.NotNil(t, events[0].Item) + assert.Equal(t, "message", events[0].Item.Type) + assert.Equal(t, 0, events[1].OutputIndex) + assert.Equal(t, 0, events[1].SummaryIndex) + assert.Equal(t, "internal plan", events[1].Delta) + assert.Equal(t, events[0].Item.ID, state.MessageItemID) +} + func TestResponsesEventToChatChunks_ToolCallDelta(t *testing.T) { state := NewResponsesEventToChatState() state.Model = "gpt-4o"