fix: preserve SSE event boundaries for Responses streams#2398
fix: preserve SSE event boundaries for Responses streams#23987RPH wants to merge 4 commits intorouter-for-me:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a helper function, writeResponsesSSEChunk, to standardize the formatting of Server-Sent Events (SSE) by ensuring proper newline separation between chunks. This logic is integrated into the streaming response handlers to replace inline formatting. A new test case, TestForwardResponsesStreamSeparatesDataOnlySSEChunks, verifies that consecutive data chunks are correctly delimited. Review feedback suggests improving the robustness of the new helper by handling write errors and recommends moving the new test to a more suitable file to maintain clear separation between success and error test scenarios.
| _, _ = w.Write(chunk) | ||
| switch { | ||
| case bytes.HasSuffix(chunk, []byte("\n\n")): | ||
| return | ||
| case bytes.HasSuffix(chunk, []byte("\n")): | ||
| _, _ = w.Write([]byte("\n")) | ||
| default: | ||
| _, _ = w.Write([]byte("\n\n")) | ||
| } |
There was a problem hiding this comment.
This function ignores errors from w.Write. While the surrounding code may handle connection closures, it's best practice to check for write errors and stop processing to make the function more robust and prevent further writes on a broken connection. The logic can also be slightly simplified by replacing the switch with if/else.
if _, err := w.Write(chunk); err != nil {
return
}
if bytes.HasSuffix(chunk, []byte("\n\n")) {
return
}
if bytes.HasSuffix(chunk, []byte("\n")) {
_, _ = w.Write([]byte("\n"))
} else {
_, _ = w.Write([]byte("\n\n"))
}| func TestForwardResponsesStreamSeparatesDataOnlySSEChunks(t *testing.T) { | ||
| gin.SetMode(gin.TestMode) | ||
| base := handlers.NewBaseAPIHandlers(&sdkconfig.SDKConfig{}, nil) | ||
| h := NewOpenAIResponsesAPIHandler(base) | ||
|
|
||
| recorder := httptest.NewRecorder() | ||
| c, _ := gin.CreateTestContext(recorder) | ||
| c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", nil) | ||
|
|
||
| flusher, ok := c.Writer.(http.Flusher) | ||
| if !ok { | ||
| t.Fatalf("expected gin writer to implement http.Flusher") | ||
| } | ||
|
|
||
| data := make(chan []byte, 2) | ||
| errs := make(chan *interfaces.ErrorMessage) | ||
| data <- []byte("data: {\"type\":\"response.output_item.done\",\"item\":{\"type\":\"function_call\",\"arguments\":\"{}\"}}") | ||
| data <- []byte("data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp-1\",\"output\":[]}}") | ||
| close(data) | ||
| close(errs) | ||
|
|
||
| h.forwardResponsesStream(c, flusher, func(error) {}, data, errs) | ||
| body := recorder.Body.String() | ||
|
|
||
| if !strings.Contains(body, "data: {\"type\":\"response.output_item.done\"") { | ||
| t.Fatalf("expected first SSE data chunk, got: %q", body) | ||
| } | ||
| if !strings.Contains(body, "\n\ndata: {\"type\":\"response.completed\"") { | ||
| t.Fatalf("expected blank-line separation before second SSE event, got: %q", body) | ||
| } | ||
| if strings.Contains(body, "arguments\":\"{}\"}}data: {\"type\":\"response.completed\"") { | ||
| t.Fatalf("second SSE event was concatenated onto first event body: %q", body) | ||
| } | ||
| } |
There was a problem hiding this comment.
For better test organization, consider moving TestForwardResponsesStreamSeparatesDataOnlySSEChunks to a test file that handles non-error streaming cases, such as openai_responses_handlers_stream_test.go. The current file openai_responses_handlers_stream_error_test.go seems dedicated to error scenarios, and mixing in success cases could reduce clarity for future maintenance.
|
Updated per review.
|
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a writeResponsesSSEChunk helper function to standardize SSE chunk formatting and adds a unit test to verify correct event separation. Feedback suggests improving the robustness of the helper function by consistently handling all write errors and refining the test assertions to validate the exact response structure rather than relying on loose string containment.
| if bytes.HasSuffix(chunk, []byte("\n")) { | ||
| _, _ = w.Write([]byte("\n")) | ||
| } else { | ||
| _, _ = w.Write([]byte("\n\n")) | ||
| } |
There was a problem hiding this comment.
There's inconsistent error handling for w.Write calls. The first write on line 29 checks for an error, but the writes on lines 36 and 38 ignore potential errors. It's better to handle all write errors consistently to improve robustness. If any write fails (e.g., due to a closed client connection), we should stop trying to write more data.
if bytes.HasSuffix(chunk, []byte("\n")) {
if _, err := w.Write([]byte("\n")); err != nil {
return
}
} else {
if _, err := w.Write([]byte("\n\n")); err != nil {
return
}
}| if !strings.Contains(body, "data: {\"type\":\"response.output_item.done\"") { | ||
| t.Fatalf("expected first SSE data chunk, got: %q", body) | ||
| } | ||
| if !strings.Contains(body, "\n\ndata: {\"type\":\"response.completed\"") { | ||
| t.Fatalf("expected blank-line separation before second SSE event, got: %q", body) | ||
| } | ||
| if strings.Contains(body, "arguments\":\"{}\"}}data: {\"type\":\"response.completed\"") { | ||
| t.Fatalf("second SSE event was concatenated onto first event body: %q", body) | ||
| } |
There was a problem hiding this comment.
The test assertions using strings.Contains are a bit loose and could potentially pass even if the output is not perfectly correct. For better robustness and to prevent future regressions, it's recommended to use more specific assertions. You could split the response body into individual SSE events and verify each one against the expected content.
parts := strings.Split(strings.TrimSpace(body), "\n\n")
if len(parts) != 2 {
t.Fatalf("expected 2 SSE events, got %d. Body: %q", len(parts), body)
}
expectedPart1 := "data: {\"type\":\"response.output_item.done\",\"item\":{\"type\":\"function_call\",\"arguments\":\"{}\"}}"
if parts[0] != expectedPart1 {
t.Errorf("unexpected first event.\nGot: %q\nWant: %q", parts[0], expectedPart1)
}
expectedPart2 := "data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp-1\",\"output\":[]}}"
if parts[1] != expectedPart2 {
t.Errorf("unexpected second event.\nGot: %q\nWant: %q", parts[1], expectedPart2)
}|
Updated per follow-up review.
|
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a helper function writeResponsesSSEChunk to standardize the formatting of Server-Sent Events (SSE) chunks, ensuring they are correctly terminated with newlines. The handleStreamingResponse and forwardResponsesStream methods were refactored to use this new utility, and a unit test was added to verify the separation of SSE events. A review comment suggests simplifying the logic within writeResponsesSSEChunk to improve readability by consolidating the suffix writing logic.
| func writeResponsesSSEChunk(w io.Writer, chunk []byte) { | ||
| if w == nil || len(chunk) == 0 { | ||
| return | ||
| } | ||
| if _, err := w.Write(chunk); err != nil { | ||
| return | ||
| } | ||
| if bytes.HasSuffix(chunk, []byte("\n\n")) { | ||
| return | ||
| } | ||
| if bytes.HasSuffix(chunk, []byte("\n")) { | ||
| if _, err := w.Write([]byte("\n")); err != nil { | ||
| return | ||
| } | ||
| } else { | ||
| if _, err := w.Write([]byte("\n\n")); err != nil { | ||
| return | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The logic in writeResponsesSSEChunk is correct, but it can be simplified to improve readability and maintainability. The current nested if/else structure can be streamlined by determining the required suffix (\n or \n\n) and then performing a single write operation.
| func writeResponsesSSEChunk(w io.Writer, chunk []byte) { | |
| if w == nil || len(chunk) == 0 { | |
| return | |
| } | |
| if _, err := w.Write(chunk); err != nil { | |
| return | |
| } | |
| if bytes.HasSuffix(chunk, []byte("\n\n")) { | |
| return | |
| } | |
| if bytes.HasSuffix(chunk, []byte("\n")) { | |
| if _, err := w.Write([]byte("\n")); err != nil { | |
| return | |
| } | |
| } else { | |
| if _, err := w.Write([]byte("\n\n")); err != nil { | |
| return | |
| } | |
| } | |
| } | |
| func writeResponsesSSEChunk(w io.Writer, chunk []byte) { | |
| if w == nil || len(chunk) == 0 { | |
| return | |
| } | |
| if _, err := w.Write(chunk); err != nil { | |
| return | |
| } | |
| if bytes.HasSuffix(chunk, []byte("\n\n")) { | |
| return | |
| } | |
| suffix := []byte("\n\n") | |
| if bytes.HasSuffix(chunk, []byte("\n")) { | |
| suffix = []byte("\n") | |
| } | |
| if _, err := w.Write(suffix); err != nil { | |
| return | |
| } | |
| } |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a helper function, writeResponsesSSEChunk, to standardize the writing of Server-Sent Events (SSE) chunks within the OpenAI responses handler. This refactoring replaces duplicated inline logic in handleStreamingResponse and forwardResponsesStream, ensuring consistent application of newline delimiters for SSE events. A new unit test, TestForwardResponsesStreamSeparatesDataOnlySSEChunks, has also been added to verify that chunks are correctly separated. I have no feedback to provide as there were no review comments.
Summary
/v1/responsesstreaming when chunks contain onlydata:linesresponse.output_item.doneandresponse.completedchunks from being concatenated into one client parse unitTest Plan
go test ./sdk/api/handlers/openai -run TestForwardResponsesStreamSeparatesDataOnlySSEChunks -count=1go test ./sdk/api/handlers/openai -run "TestForwardResponsesStream" -count=1go test ./internal/translator/openai/openai/responses -count=1