diff --git a/internal/converter/coverage_misc_sse_test.go b/internal/converter/coverage_misc_sse_test.go index 683f9afa..8aa37edd 100644 --- a/internal/converter/coverage_misc_sse_test.go +++ b/internal/converter/coverage_misc_sse_test.go @@ -42,3 +42,26 @@ func TestSSE_FormatStringData(t *testing.T) { t.Fatalf("expected string data") } } + +func TestSSE_FormatMultilineDataPrefixesEveryLine(t *testing.T) { + out := FormatSSE("response.completed", []byte("{\n \"type\": \"response.completed\"\n}")) + outStr := string(out) + for _, want := range []string{ + "event: response.completed", + "data: {", + "data: \"type\": \"response.completed\"", + "data: }", + } { + if !strings.Contains(outStr, want) { + t.Fatalf("expected %q in %q", want, outStr) + } + } + + events, remaining := ParseSSE(outStr) + if remaining != "" || len(events) != 1 { + t.Fatalf("expected one complete parsed event, remaining=%q events=%d", remaining, len(events)) + } + if events[0].Event != "response.completed" || !strings.Contains(string(events[0].Data), "response.completed") { + t.Fatalf("unexpected parsed event: %+v", events[0]) + } +} diff --git a/internal/converter/coverage_openai_stream_test.go b/internal/converter/coverage_openai_stream_test.go index fd8e2710..6631497e 100644 --- a/internal/converter/coverage_openai_stream_test.go +++ b/internal/converter/coverage_openai_stream_test.go @@ -180,6 +180,219 @@ func TestOpenAIToCodexRequestAndStream(t *testing.T) { if !strings.Contains(string(streamOut), "response.completed") { t.Fatalf("missing completed") } + if !strings.Contains(string(streamOut), "data: [DONE]") { + t.Fatalf("missing terminal done sentinel") + } +} + +func TestOpenAIToCodexStreamForwardsDoneWhenSplitFromCompletion(t *testing.T) { + state := NewTransformState() + respConv := &openaiToCodexResponse{} + chunk := OpenAIStreamChunk{ID: "chat_1", Model: "gpt", Choices: []OpenAIChoice{{ + Delta: &OpenAIMessage{Content: "hi"}, + FinishReason: "stop", + }}} + chunkBody, _ := json.Marshal(chunk) + firstOut, err := respConv.TransformChunk(FormatSSE("", json.RawMessage(chunkBody)), state) + if err != nil { + t.Fatalf("TransformChunk first: %v", err) + } + if !strings.Contains(string(firstOut), "response.completed") { + t.Fatalf("missing completed before done: %s", string(firstOut)) + } + doneOut, err := respConv.TransformChunk(FormatDone(), state) + if err != nil { + t.Fatalf("TransformChunk done: %v", err) + } + if string(doneOut) != string(FormatDone()) { + t.Fatalf("expected terminal done sentinel, got: %q", string(doneOut)) + } + duplicateOut, err := respConv.TransformChunk(FormatDone(), state) + if err != nil { + t.Fatalf("TransformChunk duplicate done: %v", err) + } + if len(duplicateOut) != 0 { + t.Fatalf("expected duplicate done to be suppressed, got: %q", string(duplicateOut)) + } +} + +func TestOpenAIToCodexStreamForwardsNativeResponsesEvents(t *testing.T) { + state := NewTransformState() + respConv := &openaiToCodexResponse{} + + created := map[string]interface{}{ + "type": "response.created", + "response": map[string]interface{}{ + "id": "resp_native", + "object": "response", + "created_at": int64(1), + "status": "in_progress", + }, + } + delta := map[string]interface{}{ + "type": "response.output_text.delta", + "item_id": "msg_resp_native_0", + "output_index": 0, + "content_index": 0, + "delta": "hi", + } + completed := map[string]interface{}{ + "type": "response.completed", + "response": map[string]interface{}{ + "id": "resp_native", + "object": "response", + "status": "completed", + }, + } + + stream := append(FormatSSE("", created), FormatSSE("", delta)...) + stream = append(stream, FormatSSE("", completed)...) + stream = append(stream, FormatDone()...) + + out, err := respConv.TransformChunk(stream, state) + if err != nil { + t.Fatalf("TransformChunk: %v", err) + } + outStr := string(out) + for _, want := range []string{"event: response.created", "event: response.output_text.delta", "event: response.completed", "data: [DONE]"} { + if !strings.Contains(outStr, want) { + t.Fatalf("missing %s in forwarded responses stream: %s", want, outStr) + } + } + if strings.Count(outStr, "response.completed") != 2 { + t.Fatalf("expected one completed event/data pair, got: %s", outStr) + } +} + +func TestOpenAIToCodexStreamNormalizesNativeResponsesEventName(t *testing.T) { + state := NewTransformState() + respConv := &openaiToCodexResponse{} + + completed := map[string]interface{}{ + "type": "response.completed", + "response": map[string]interface{}{ + "id": "resp_native", + "object": "response", + "status": "completed", + }, + } + + out, err := respConv.TransformChunk(FormatSSE("unexpected.event", completed), state) + if err != nil { + t.Fatalf("TransformChunk: %v", err) + } + outStr := string(out) + if !strings.Contains(outStr, "event: response.completed") { + t.Fatalf("expected event name normalized from data.type, got: %s", outStr) + } + if strings.Contains(outStr, "event: unexpected.event") { + t.Fatalf("unexpected mismatched event name preserved: %s", outStr) + } +} + +func TestOpenAIToCodexStreamForwardsErrorEvents(t *testing.T) { + state := NewTransformState() + respConv := &openaiToCodexResponse{} + + errorEvent := map[string]interface{}{ + "error": map[string]interface{}{ + "message": "boom", + }, + } + typedError := map[string]interface{}{ + "type": "error", + "message": "typed boom", + } + + stream := append(FormatSSE("error", errorEvent), FormatSSE("", typedError)...) + out, err := respConv.TransformChunk(stream, state) + if err != nil { + t.Fatalf("TransformChunk: %v", err) + } + outStr := string(out) + if strings.Count(outStr, "event: error") != 2 { + t.Fatalf("expected both error events forwarded, got: %s", outStr) + } + if !strings.Contains(outStr, "boom") || !strings.Contains(outStr, "typed boom") { + t.Fatalf("expected error payloads preserved, got: %s", outStr) + } +} + +func TestOpenAIToCodexStreamForwardsEventOnlyNativeResponsesEvent(t *testing.T) { + state := NewTransformState() + respConv := &openaiToCodexResponse{} + + completed := map[string]interface{}{ + "response": map[string]interface{}{ + "id": "resp_event_only", + "object": "response", + "status": "completed", + }, + } + + out, err := respConv.TransformChunk(FormatSSE("response.completed", completed), state) + if err != nil { + t.Fatalf("TransformChunk: %v", err) + } + outStr := string(out) + if !strings.Contains(outStr, "event: response.completed") || !strings.Contains(outStr, `"type":"response.completed"`) { + t.Fatalf("expected event-only response.completed to be forwarded with data.type injected, got: %s", outStr) + } +} + +func TestOpenAIToCodexStreamErrorEventWinsOverResponseType(t *testing.T) { + state := NewTransformState() + respConv := &openaiToCodexResponse{} + + errorEvent := map[string]interface{}{ + "type": "response.completed", + "error": map[string]interface{}{ + "message": "boom", + }, + } + + out, err := respConv.TransformChunk(FormatSSE("error", errorEvent), state) + if err != nil { + t.Fatalf("TransformChunk: %v", err) + } + outStr := string(out) + if !strings.Contains(outStr, "event: error") || !strings.Contains(outStr, `"type":"error"`) { + t.Fatalf("expected error event to keep error semantics, got: %s", outStr) + } + if strings.Contains(outStr, `"type":"response.completed"`) { + t.Fatalf("error event leaked response.completed type: %s", outStr) + } +} + +func TestOpenAIToCodexStreamSynthesizesCompletedOnDoneWithoutFinishReason(t *testing.T) { + state := NewTransformState() + respConv := &openaiToCodexResponse{} + + chunk := OpenAIStreamChunk{ID: "chat_done_only", Model: "gpt", Choices: []OpenAIChoice{{ + Delta: &OpenAIMessage{Content: "hi"}, + }}} + chunkBody, _ := json.Marshal(chunk) + stream := append(FormatSSE("", json.RawMessage(chunkBody)), FormatDone()...) + + out, err := respConv.TransformChunk(stream, state) + if err != nil { + t.Fatalf("TransformChunk: %v", err) + } + outStr := string(out) + if !strings.Contains(outStr, "response.output_text.done") || !strings.Contains(outStr, "response.completed") { + t.Fatalf("expected synthetic completion before done, got: %s", outStr) + } + if strings.Index(outStr, "response.completed") > strings.Index(outStr, "data: [DONE]") { + t.Fatalf("response.completed should be emitted before [DONE], got: %s", outStr) + } + + duplicateOut, err := respConv.TransformChunk(FormatDone(), state) + if err != nil { + t.Fatalf("TransformChunk duplicate done: %v", err) + } + if len(duplicateOut) != 0 { + t.Fatalf("expected duplicate done to be suppressed, got: %q", string(duplicateOut)) + } } func TestClaudeToOpenAIStreamToolCalls(t *testing.T) { diff --git a/internal/converter/openai_to_codex.go b/internal/converter/openai_to_codex.go index acb0149f..ef04a5df 100644 --- a/internal/converter/openai_to_codex.go +++ b/internal/converter/openai_to_codex.go @@ -356,8 +356,24 @@ func (c *openaiToCodexResponse) TransformChunk(chunk []byte, state *TransformSta var output []byte for _, event := range events { if event.Event == "done" { + st := ensureOpenAIToResponsesState(state) + if st.Started && !st.CompletedSent { + for _, item := range finalizeOpenAIToResponsesStream(st, state) { + output = append(output, item...) + } + } + if !st.DoneSent { + output = append(output, FormatDone()...) + st.DoneSent = true + } continue } + + if forwarded := forwardOpenAIResponsesStreamEvent(event); len(forwarded) > 0 { + output = append(output, forwarded...) + continue + } + for _, item := range convertOpenAIChatCompletionsChunkToResponses(event.Data, state) { output = append(output, item...) } @@ -395,14 +411,68 @@ type openaiToResponsesState struct { TotalTokens int64 ReasoningTokens int64 UsageSeen bool - NextOutputIndex int // global counter for unique output_index across messages and function calls - MsgOutputIndex map[int]int // choice idx -> assigned output_index - FuncOutputIndex map[int]int // callIndex -> assigned output_index - CompletedSent bool // guards against duplicate response.completed + NextOutputIndex int // global counter for unique output_index across messages and function calls + MsgOutputIndex map[int]int // choice idx -> assigned output_index + FuncOutputIndex map[int]int // callIndex -> assigned output_index + CompletedSent bool // guards against duplicate response.completed + DoneSent bool // guards against duplicate terminal [DONE] forwarding } var responseIDCounter uint64 +func ensureOpenAIToResponsesState(state *TransformState) *openaiToResponsesState { + st, ok := state.Custom.(*openaiToResponsesState) + if ok && st != nil { + return st + } + st = &openaiToResponsesState{ + FuncArgsBuf: make(map[int]*strings.Builder), + FuncNames: make(map[int]string), + FuncCallIDs: make(map[int]string), + MsgTextBuf: make(map[int]*strings.Builder), + MsgItemAdded: make(map[int]bool), + MsgContentAdded: make(map[int]bool), + MsgItemDone: make(map[int]bool), + FuncArgsDone: make(map[int]bool), + FuncItemDone: make(map[int]bool), + Reasonings: make([]openaiToResponsesStateReasoning, 0), + MsgOutputIndex: make(map[int]int), + FuncOutputIndex: make(map[int]int), + } + state.Custom = st + return st +} + +func forwardOpenAIResponsesStreamEvent(event SSEEvent) []byte { + root := gjson.ParseBytes(event.Data) + if !root.Exists() { + return nil + } + eventType := root.Get("type").String() + if event.Event == "error" || eventType == "error" || (!root.Get("choices").Exists() && root.Get("error").Exists()) { + return FormatSSE("error", normalizeSSEEventType(event.Data, "error")) + } + if strings.HasPrefix(eventType, "response.") { + return FormatSSE(eventType, event.Data) + } + if strings.HasPrefix(event.Event, "response.") { + return FormatSSE(event.Event, normalizeSSEEventType(event.Data, event.Event)) + } + return nil +} + +func normalizeSSEEventType(data []byte, eventType string) []byte { + root := gjson.ParseBytes(data) + if !root.IsObject() { + return data + } + normalized, err := sjson.SetBytes(data, "type", eventType) + if err != nil { + return data + } + return normalized +} + func synthesizeResponseID() string { return fmt.Sprintf("resp_%x_%d", time.Now().UnixNano(), atomic.AddUint64(&responseIDCounter, 1)) } @@ -427,28 +497,170 @@ func (st *openaiToResponsesState) funcOutIdx(callIndex int) int { return oi } +func finalizeOpenAIToResponsesStream(st *openaiToResponsesState, state *TransformState) [][]byte { + if st == nil || !st.Started || st.CompletedSent { + return nil + } + nextSeq := func() int { st.Seq++; return st.Seq } + var out [][]byte + + if st.ReasoningID != "" { + text := st.ReasoningBuf.String() + + textDone := `{"type":"response.reasoning_summary_text.done","sequence_number":0,"item_id":"","output_index":0,"summary_index":0,"text":""}` + textDone, _ = sjson.Set(textDone, "sequence_number", nextSeq()) + textDone, _ = sjson.Set(textDone, "item_id", st.ReasoningID) + textDone, _ = sjson.Set(textDone, "output_index", st.ReasoningIndex) + textDone, _ = sjson.Set(textDone, "text", text) + out = append(out, FormatSSE("response.reasoning_summary_text.done", []byte(textDone))) + + partDone := `{"type":"response.reasoning_summary_part.done","sequence_number":0,"item_id":"","output_index":0,"summary_index":0,"part":{"type":"summary_text","text":""}}` + partDone, _ = sjson.Set(partDone, "sequence_number", nextSeq()) + partDone, _ = sjson.Set(partDone, "item_id", st.ReasoningID) + partDone, _ = sjson.Set(partDone, "output_index", st.ReasoningIndex) + partDone, _ = sjson.Set(partDone, "part.text", text) + out = append(out, FormatSSE("response.reasoning_summary_part.done", []byte(partDone))) + + outputItemDone := `{"type":"response.output_item.done","item":{"id":"","type":"reasoning","encrypted_content":"","summary":[{"type":"summary_text","text":""}]},"output_index":0,"sequence_number":0}` + outputItemDone, _ = sjson.Set(outputItemDone, "sequence_number", nextSeq()) + outputItemDone, _ = sjson.Set(outputItemDone, "item.id", st.ReasoningID) + outputItemDone, _ = sjson.Set(outputItemDone, "output_index", st.ReasoningIndex) + outputItemDone, _ = sjson.Set(outputItemDone, "item.summary.0.text", text) + out = append(out, FormatSSE("response.output_item.done", []byte(outputItemDone))) + + st.Reasonings = append(st.Reasonings, openaiToResponsesStateReasoning{ReasoningID: st.ReasoningID, ReasoningData: text}) + st.ReasoningID = "" + st.ReasoningBuf.Reset() + } + + for _, i := range sortedKeys(st.MsgItemAdded) { + if !st.MsgItemAdded[i] || st.MsgItemDone[i] { + continue + } + fullText := "" + if b := st.MsgTextBuf[i]; b != nil { + fullText = b.String() + } + done := `{"type":"response.output_text.done","sequence_number":0,"item_id":"","output_index":0,"content_index":0,"text":"","logprobs":[]}` + done, _ = sjson.Set(done, "sequence_number", nextSeq()) + done, _ = sjson.Set(done, "item_id", fmt.Sprintf("msg_%s_%d", st.ResponseID, i)) + done, _ = sjson.Set(done, "output_index", st.msgOutIdx(i)) + done, _ = sjson.Set(done, "content_index", 0) + done, _ = sjson.Set(done, "text", fullText) + out = append(out, FormatSSE("response.output_text.done", []byte(done))) + + partDone := `{"type":"response.content_part.done","sequence_number":0,"item_id":"","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}}` + partDone, _ = sjson.Set(partDone, "sequence_number", nextSeq()) + partDone, _ = sjson.Set(partDone, "item_id", fmt.Sprintf("msg_%s_%d", st.ResponseID, i)) + partDone, _ = sjson.Set(partDone, "output_index", st.msgOutIdx(i)) + partDone, _ = sjson.Set(partDone, "content_index", 0) + partDone, _ = sjson.Set(partDone, "part.text", fullText) + out = append(out, FormatSSE("response.content_part.done", []byte(partDone))) + + itemDone := `{"type":"response.output_item.done","sequence_number":0,"output_index":0,"item":{"id":"","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":""}],"role":"assistant"}}` + itemDone, _ = sjson.Set(itemDone, "sequence_number", nextSeq()) + itemDone, _ = sjson.Set(itemDone, "output_index", st.msgOutIdx(i)) + itemDone, _ = sjson.Set(itemDone, "item.id", fmt.Sprintf("msg_%s_%d", st.ResponseID, i)) + itemDone, _ = sjson.Set(itemDone, "item.content.0.text", fullText) + out = append(out, FormatSSE("response.output_item.done", []byte(itemDone))) + st.MsgItemDone[i] = true + } + + for _, i := range sortedKeys(st.FuncCallIDs) { + callID := st.FuncCallIDs[i] + if callID == "" || st.FuncItemDone[i] { + continue + } + args := "{}" + if b := st.FuncArgsBuf[i]; b != nil && b.Len() > 0 { + args = b.String() + } + fcDone := `{"type":"response.function_call_arguments.done","sequence_number":0,"item_id":"","output_index":0,"arguments":""}` + fcDone, _ = sjson.Set(fcDone, "sequence_number", nextSeq()) + fcDone, _ = sjson.Set(fcDone, "item_id", fmt.Sprintf("fc_%s", callID)) + fcDone, _ = sjson.Set(fcDone, "output_index", st.funcOutIdx(i)) + fcDone, _ = sjson.Set(fcDone, "arguments", args) + out = append(out, FormatSSE("response.function_call_arguments.done", []byte(fcDone))) + + itemDone := `{"type":"response.output_item.done","sequence_number":0,"output_index":0,"item":{"id":"","type":"function_call","status":"completed","arguments":"","call_id":"","name":""}}` + itemDone, _ = sjson.Set(itemDone, "sequence_number", nextSeq()) + itemDone, _ = sjson.Set(itemDone, "output_index", st.funcOutIdx(i)) + itemDone, _ = sjson.Set(itemDone, "item.id", fmt.Sprintf("fc_%s", callID)) + itemDone, _ = sjson.Set(itemDone, "item.arguments", args) + itemDone, _ = sjson.Set(itemDone, "item.call_id", callID) + itemDone, _ = sjson.Set(itemDone, "item.name", st.FuncNames[i]) + out = append(out, FormatSSE("response.output_item.done", []byte(itemDone))) + st.FuncItemDone[i] = true + st.FuncArgsDone[i] = true + } + + st.CompletedSent = true + completed := `{"type":"response.completed","sequence_number":0,"response":{"id":"","object":"response","created_at":0,"status":"completed","background":false,"error":null}}` + completed, _ = sjson.Set(completed, "sequence_number", nextSeq()) + completed, _ = sjson.Set(completed, "response.id", st.ResponseID) + completed, _ = sjson.Set(completed, "response.created_at", st.Created) + + outputsWrapper := `{"arr":[]}` + for _, r := range st.Reasonings { + item := `{"id":"","type":"reasoning","summary":[{"type":"summary_text","text":""}]}` + item, _ = sjson.Set(item, "id", r.ReasoningID) + item, _ = sjson.Set(item, "summary.0.text", r.ReasoningData) + outputsWrapper, _ = sjson.SetRaw(outputsWrapper, "arr.-1", item) + } + for _, i := range sortedKeys(st.MsgItemAdded) { + txt := "" + if b := st.MsgTextBuf[i]; b != nil { + txt = b.String() + } + item := `{"id":"","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":""}],"role":"assistant"}` + item, _ = sjson.Set(item, "id", fmt.Sprintf("msg_%s_%d", st.ResponseID, i)) + item, _ = sjson.Set(item, "content.0.text", txt) + outputsWrapper, _ = sjson.SetRaw(outputsWrapper, "arr.-1", item) + } + for _, i := range sortedKeys(st.FuncCallIDs) { + callID := st.FuncCallIDs[i] + if callID == "" { + continue + } + args := "" + if b := st.FuncArgsBuf[i]; b != nil { + args = b.String() + } + item := `{"id":"","type":"function_call","status":"completed","arguments":"","call_id":"","name":""}` + item, _ = sjson.Set(item, "id", fmt.Sprintf("fc_%s", callID)) + item, _ = sjson.Set(item, "arguments", args) + item, _ = sjson.Set(item, "call_id", callID) + item, _ = sjson.Set(item, "name", st.FuncNames[i]) + outputsWrapper, _ = sjson.SetRaw(outputsWrapper, "arr.-1", item) + } + if gjson.Get(outputsWrapper, "arr.#").Int() > 0 { + completed, _ = sjson.SetRaw(completed, "response.output", gjson.Get(outputsWrapper, "arr").Raw) + } + if st.UsageSeen { + completed, _ = sjson.Set(completed, "response.usage.input_tokens", st.PromptTokens) + completed, _ = sjson.Set(completed, "response.usage.input_tokens_details.cached_tokens", st.CachedTokens) + completed, _ = sjson.Set(completed, "response.usage.output_tokens", st.CompletionTokens) + if st.ReasoningTokens > 0 { + completed, _ = sjson.Set(completed, "response.usage.output_tokens_details.reasoning_tokens", st.ReasoningTokens) + } + total := st.TotalTokens + if total == 0 { + total = st.PromptTokens + st.CompletionTokens + } + completed, _ = sjson.Set(completed, "response.usage.total_tokens", total) + } + if state != nil && len(state.OriginalRequestBody) > 0 { + completed = applyRequestEchoToResponse(completed, "response.", state.OriginalRequestBody) + } + out = append(out, FormatSSE("response.completed", []byte(completed))) + return out +} + func convertOpenAIChatCompletionsChunkToResponses(rawJSON []byte, state *TransformState) [][]byte { if state == nil { return nil } - st, ok := state.Custom.(*openaiToResponsesState) - if !ok || st == nil { - st = &openaiToResponsesState{ - FuncArgsBuf: make(map[int]*strings.Builder), - FuncNames: make(map[int]string), - FuncCallIDs: make(map[int]string), - MsgTextBuf: make(map[int]*strings.Builder), - MsgItemAdded: make(map[int]bool), - MsgContentAdded: make(map[int]bool), - MsgItemDone: make(map[int]bool), - FuncArgsDone: make(map[int]bool), - FuncItemDone: make(map[int]bool), - Reasonings: make([]openaiToResponsesStateReasoning, 0), - MsgOutputIndex: make(map[int]int), - FuncOutputIndex: make(map[int]int), - } - state.Custom = st - } + st := ensureOpenAIToResponsesState(state) root := gjson.ParseBytes(rawJSON) obj := root.Get("object") @@ -487,6 +699,7 @@ func convertOpenAIChatCompletionsChunkToResponses(rawJSON []byte, state *Transfo st.FuncOutputIndex = make(map[int]int) st.NextOutputIndex = 0 st.CompletedSent = false + st.DoneSent = false st.PromptTokens = 0 st.CachedTokens = 0 st.CompletionTokens = 0 @@ -608,7 +821,7 @@ func convertOpenAIChatCompletionsChunkToResponses(rawJSON []byte, state *Transfo if st.ReasoningID == "" { st.ReasoningID = fmt.Sprintf("rs_%s_%d", st.ResponseID, idx) st.ReasoningIndex = st.NextOutputIndex - st.NextOutputIndex++ + st.NextOutputIndex++ item := `{"type":"response.output_item.added","sequence_number":0,"output_index":0,"item":{"id":"","type":"reasoning","status":"in_progress","summary":[]}}` item, _ = sjson.Set(item, "sequence_number", nextSeq()) item, _ = sjson.Set(item, "output_index", st.ReasoningIndex) @@ -716,83 +929,12 @@ func convertOpenAIChatCompletionsChunkToResponses(rawJSON []byte, state *Transfo } } - if fr := choice.Get("finish_reason"); fr.Exists() && fr.String() != "" { - if len(st.MsgItemAdded) > 0 { - for _, i := range sortedKeys(st.MsgItemAdded) { - if st.MsgItemAdded[i] && !st.MsgItemDone[i] { - fullText := "" - if b := st.MsgTextBuf[i]; b != nil { - fullText = b.String() - } - done := `{"type":"response.output_text.done","sequence_number":0,"item_id":"","output_index":0,"content_index":0,"text":"","logprobs":[]}` - done, _ = sjson.Set(done, "sequence_number", nextSeq()) - done, _ = sjson.Set(done, "item_id", fmt.Sprintf("msg_%s_%d", st.ResponseID, i)) - done, _ = sjson.Set(done, "output_index", st.msgOutIdx(i)) - done, _ = sjson.Set(done, "content_index", 0) - done, _ = sjson.Set(done, "text", fullText) - out = append(out, FormatSSE("response.output_text.done", []byte(done))) - - partDone := `{"type":"response.content_part.done","sequence_number":0,"item_id":"","output_index":0,"content_index":0,"part":{"type":"output_text","annotations":[],"logprobs":[],"text":""}}` - partDone, _ = sjson.Set(partDone, "sequence_number", nextSeq()) - partDone, _ = sjson.Set(partDone, "item_id", fmt.Sprintf("msg_%s_%d", st.ResponseID, i)) - partDone, _ = sjson.Set(partDone, "output_index", st.msgOutIdx(i)) - partDone, _ = sjson.Set(partDone, "content_index", 0) - partDone, _ = sjson.Set(partDone, "part.text", fullText) - out = append(out, FormatSSE("response.content_part.done", []byte(partDone))) - - itemDone := `{"type":"response.output_item.done","sequence_number":0,"output_index":0,"item":{"id":"","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":""}],"role":"assistant"}}` - itemDone, _ = sjson.Set(itemDone, "sequence_number", nextSeq()) - itemDone, _ = sjson.Set(itemDone, "output_index", st.msgOutIdx(i)) - itemDone, _ = sjson.Set(itemDone, "item.id", fmt.Sprintf("msg_%s_%d", st.ResponseID, i)) - itemDone, _ = sjson.Set(itemDone, "item.content.0.text", fullText) - out = append(out, FormatSSE("response.output_item.done", []byte(itemDone))) - st.MsgItemDone[i] = true - } - } - } - - if st.ReasoningID != "" { - stopReasoning(st.ReasoningBuf.String()) - st.ReasoningBuf.Reset() - } - - if len(st.FuncCallIDs) > 0 { - for _, i := range sortedKeys(st.FuncCallIDs) { - callID := st.FuncCallIDs[i] - if callID == "" || st.FuncItemDone[i] { - continue - } - args := "{}" - if b := st.FuncArgsBuf[i]; b != nil && b.Len() > 0 { - args = b.String() - } - fcDone := `{"type":"response.function_call_arguments.done","sequence_number":0,"item_id":"","output_index":0,"arguments":""}` - fcDone, _ = sjson.Set(fcDone, "sequence_number", nextSeq()) - fcDone, _ = sjson.Set(fcDone, "item_id", fmt.Sprintf("fc_%s", callID)) - fcDone, _ = sjson.Set(fcDone, "output_index", st.funcOutIdx(i)) - fcDone, _ = sjson.Set(fcDone, "arguments", args) - out = append(out, FormatSSE("response.function_call_arguments.done", []byte(fcDone))) - - itemDone := `{"type":"response.output_item.done","sequence_number":0,"output_index":0,"item":{"id":"","type":"function_call","status":"completed","arguments":"","call_id":"","name":""}}` - itemDone, _ = sjson.Set(itemDone, "sequence_number", nextSeq()) - itemDone, _ = sjson.Set(itemDone, "output_index", st.funcOutIdx(i)) - itemDone, _ = sjson.Set(itemDone, "item.id", fmt.Sprintf("fc_%s", callID)) - itemDone, _ = sjson.Set(itemDone, "item.arguments", args) - itemDone, _ = sjson.Set(itemDone, "item.call_id", callID) - itemDone, _ = sjson.Set(itemDone, "item.name", st.FuncNames[i]) - out = append(out, FormatSSE("response.output_item.done", []byte(itemDone))) - st.FuncItemDone[i] = true - st.FuncArgsDone[i] = true - } - } - } return true }) } - // Emit response.completed once after all choices have been processed + // Emit response.completed once after all choices have been processed. if !st.CompletedSent { - // Check if any choice had a finish_reason hasFinish := false if choices := root.Get("choices"); choices.Exists() && choices.IsArray() { choices.ForEach(func(_, choice gjson.Result) bool { @@ -804,69 +946,7 @@ func convertOpenAIChatCompletionsChunkToResponses(rawJSON []byte, state *Transfo }) } if hasFinish { - st.CompletedSent = true - completed := `{"type":"response.completed","sequence_number":0,"response":{"id":"","object":"response","created_at":0,"status":"completed","background":false,"error":null}}` - completed, _ = sjson.Set(completed, "sequence_number", nextSeq()) - completed, _ = sjson.Set(completed, "response.id", st.ResponseID) - completed, _ = sjson.Set(completed, "response.created_at", st.Created) - - outputsWrapper := `{"arr":[]}` - if len(st.Reasonings) > 0 { - for _, r := range st.Reasonings { - item := `{"id":"","type":"reasoning","summary":[{"type":"summary_text","text":""}]}` - item, _ = sjson.Set(item, "id", r.ReasoningID) - item, _ = sjson.Set(item, "summary.0.text", r.ReasoningData) - outputsWrapper, _ = sjson.SetRaw(outputsWrapper, "arr.-1", item) - } - } - if len(st.MsgItemAdded) > 0 { - for _, i := range sortedKeys(st.MsgItemAdded) { - txt := "" - if b := st.MsgTextBuf[i]; b != nil { - txt = b.String() - } - item := `{"id":"","type":"message","status":"completed","content":[{"type":"output_text","annotations":[],"logprobs":[],"text":""}],"role":"assistant"}` - item, _ = sjson.Set(item, "id", fmt.Sprintf("msg_%s_%d", st.ResponseID, i)) - item, _ = sjson.Set(item, "content.0.text", txt) - outputsWrapper, _ = sjson.SetRaw(outputsWrapper, "arr.-1", item) - } - } - if len(st.FuncCallIDs) > 0 { - for _, i := range sortedKeys(st.FuncCallIDs) { - args := "" - if b := st.FuncArgsBuf[i]; b != nil { - args = b.String() - } - callID := st.FuncCallIDs[i] - name := st.FuncNames[i] - item := `{"id":"","type":"function_call","status":"completed","arguments":"","call_id":"","name":""}` - item, _ = sjson.Set(item, "id", fmt.Sprintf("fc_%s", callID)) - item, _ = sjson.Set(item, "arguments", args) - item, _ = sjson.Set(item, "call_id", callID) - item, _ = sjson.Set(item, "name", name) - outputsWrapper, _ = sjson.SetRaw(outputsWrapper, "arr.-1", item) - } - } - if gjson.Get(outputsWrapper, "arr.#").Int() > 0 { - completed, _ = sjson.SetRaw(completed, "response.output", gjson.Get(outputsWrapper, "arr").Raw) - } - if st.UsageSeen { - completed, _ = sjson.Set(completed, "response.usage.input_tokens", st.PromptTokens) - completed, _ = sjson.Set(completed, "response.usage.input_tokens_details.cached_tokens", st.CachedTokens) - completed, _ = sjson.Set(completed, "response.usage.output_tokens", st.CompletionTokens) - if st.ReasoningTokens > 0 { - completed, _ = sjson.Set(completed, "response.usage.output_tokens_details.reasoning_tokens", st.ReasoningTokens) - } - total := st.TotalTokens - if total == 0 { - total = st.PromptTokens + st.CompletionTokens - } - completed, _ = sjson.Set(completed, "response.usage.total_tokens", total) - } - if len(state.OriginalRequestBody) > 0 { - completed = applyRequestEchoToResponse(completed, "response.", state.OriginalRequestBody) - } - out = append(out, FormatSSE("response.completed", []byte(completed))) + out = append(out, finalizeOpenAIToResponsesStream(st, state)...) } } diff --git a/internal/converter/sse.go b/internal/converter/sse.go index edfb666c..023c3bf7 100644 --- a/internal/converter/sse.go +++ b/internal/converter/sse.go @@ -98,9 +98,16 @@ func FormatSSE(event string, data interface{}) []byte { dataBytes, _ = json.Marshal(v) } - sb.WriteString("data: ") - sb.Write(dataBytes) - sb.WriteString("\n\n") + dataLines := strings.Split(string(dataBytes), "\n") + if len(dataLines) == 0 { + dataLines = []string{""} + } + for _, line := range dataLines { + sb.WriteString("data: ") + sb.WriteString(line) + sb.WriteString("\n") + } + sb.WriteString("\n") return []byte(sb.String()) }