From 5c5bbf1d868cdb8ddc96176f72e7be9e3f64f51d Mon Sep 17 00:00:00 2001 From: Camilo Aguilar Date: Thu, 30 Oct 2025 21:27:08 -0400 Subject: [PATCH] fix(ssestream): skip events with empty data to prevent JSON unmarshal errors Fix crash when parsing SSE streams that contain empty events from retry: directives or comment lines. ## Problem The eventStreamDecoder creates events with empty Data when it encounters empty lines after non-data SSE fields (like "retry: 3000"). Stream.Next() then attempts json.Unmarshal on empty bytes, causing "unexpected end of JSON input" error. This breaks streaming with any SSE server using the retry directive. ## Root Cause Per the SSE specification [1], events are dispatched when empty lines are encountered, regardless of whether data was present. The spec states for empty line handling: > "If the line is empty (a blank line) [Dispatch the event], as defined below." And for the retry field: > "If the field value consists of only ASCII digits, then interpret the field > value as an integer in base ten, and set the event stream's reconnection time > to that integer. Otherwise, ignore the field." For empty data handling: > "If the data buffer is an empty string, set the data buffer and the event > type buffer to the empty string and return." This means that a sequence like: ``` retry: 3000 ``` Creates a valid empty event according to the spec. Servers commonly send this for reconnection configuration, but the SDK assumed all events contain JSON data. [1] https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events ## Solution Check if event.Data is empty before attempting to unmarshal. Skip empty events and continue processing the stream. This maintains compatibility with OpenAI API while supporting standard SSE practices per spec. ## Tests Added - TestStream_EmptyEvents: Verifies handling of retry directive with empty event - TestStream_OnlyRetryDirective: Tests stream with only retry (no data) - TestStream_MultipleEmptyEvents: Tests multiple empty events interspersed with data All tests pass: ``` === RUN TestStream_EmptyEvents --- PASS: TestStream_EmptyEvents (0.00s) === RUN TestStream_OnlyRetryDirective --- PASS: TestStream_OnlyRetryDirective (0.00s) === RUN TestStream_MultipleEmptyEvents --- PASS: TestStream_MultipleEmptyEvents (0.00s) PASS ``` ## Impact - Enables compatibility with SSE servers using retry: directive (common practice) - No breaking changes - only adds resilience to spec-compliant edge case - Verified with streaming function calling through Anthropic API gateway ## Real-World Testing Tested with Anthropic Claude 3.5 streaming API via AI Gateway: - Before: "Stream error: unexpected end of JSON input" - After: Successfully receives and processes all streaming chunks Fixes stream crashes with "unexpected end of JSON input" when encountering SSE streams with retry directives or comment lines. --- packages/ssestream/ssestream.go | 5 + packages/ssestream/ssestream_test.go | 154 +++++++++++++++++++++++++++ 2 files changed, 159 insertions(+) create mode 100644 packages/ssestream/ssestream_test.go diff --git a/packages/ssestream/ssestream.go b/packages/ssestream/ssestream.go index 3534aed8..a595a3f2 100644 --- a/packages/ssestream/ssestream.go +++ b/packages/ssestream/ssestream.go @@ -163,6 +163,11 @@ func (s *Stream[T]) Next() bool { continue } + // Skip events with empty data (e.g., from SSE retry: or comment lines) + if len(s.decoder.Event().Data) == 0 { + continue + } + var nxt T if s.decoder.Event().Type == "" || !strings.HasPrefix(s.decoder.Event().Type, "thread.") { diff --git a/packages/ssestream/ssestream_test.go b/packages/ssestream/ssestream_test.go new file mode 100644 index 00000000..2e6ea81b --- /dev/null +++ b/packages/ssestream/ssestream_test.go @@ -0,0 +1,154 @@ +package ssestream + +import ( + "bytes" + "net/http" + "testing" +) + +type mockReadCloser struct { + *bytes.Reader +} + +func (m mockReadCloser) Close() error { + return nil +} + +// TestStream_EmptyEvents tests that the stream correctly handles empty SSE events +// (e.g., from retry: directives or comment lines) without crashing on JSON unmarshal +func TestStream_EmptyEvents(t *testing.T) { + // Simulate SSE stream with retry directive that creates empty event + sseData := `retry: 3000 + +data: {"id":"msg_01ABC","type":"content_block_delta","delta":{"type":"text","text":"Hello"}} + +data: [DONE] + +` + + resp := &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: mockReadCloser{bytes.NewReader([]byte(sseData))}, + } + + decoder := NewDecoder(resp) + if decoder == nil { + t.Fatal("Expected decoder to be created, got nil") + } + + type testMsg struct { + ID string `json:"id"` + Type string `json:"type"` + Delta struct { + Type string `json:"type"` + Text string `json:"text"` + } `json:"delta"` + } + + stream := NewStream[testMsg](decoder, nil) + + // Should successfully iterate without crashing on empty event + var receivedMessages int + for stream.Next() { + msg := stream.Current() + receivedMessages++ + + if msg.ID != "msg_01ABC" { + t.Errorf("Expected ID 'msg_01ABC', got '%s'", msg.ID) + } + if msg.Delta.Text != "Hello" { + t.Errorf("Expected text 'Hello', got '%s'", msg.Delta.Text) + } + } + + if err := stream.Err(); err != nil { + t.Errorf("Expected no error, got: %v", err) + } + + if receivedMessages != 1 { + t.Errorf("Expected 1 message, got %d", receivedMessages) + } +} + +// TestStream_OnlyRetryDirective tests stream with only retry directive (no data events) +func TestStream_OnlyRetryDirective(t *testing.T) { + sseData := `retry: 3000 + +` + + resp := &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: mockReadCloser{bytes.NewReader([]byte(sseData))}, + } + + decoder := NewDecoder(resp) + type testMsg struct { + ID string `json:"id"` + } + stream := NewStream[testMsg](decoder, nil) + + // Should handle gracefully without any messages + var count int + for stream.Next() { + count++ + } + + if err := stream.Err(); err != nil { + t.Errorf("Expected no error, got: %v", err) + } + + if count != 0 { + t.Errorf("Expected 0 messages, got %d", count) + } +} + +// TestStream_MultipleEmptyEvents tests handling of multiple empty events +func TestStream_MultipleEmptyEvents(t *testing.T) { + sseData := `retry: 3000 + +: comment line + +data: {"id":"1","text":"first"} + +retry: 5000 + +data: {"id":"2","text":"second"} + +` + + resp := &http.Response{ + StatusCode: 200, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: mockReadCloser{bytes.NewReader([]byte(sseData))}, + } + + decoder := NewDecoder(resp) + type testMsg struct { + ID string `json:"id"` + Text string `json:"text"` + } + stream := NewStream[testMsg](decoder, nil) + + messages := []testMsg{} + for stream.Next() { + messages = append(messages, stream.Current()) + } + + if err := stream.Err(); err != nil { + t.Errorf("Expected no error, got: %v", err) + } + + if len(messages) != 2 { + t.Fatalf("Expected 2 messages, got %d", len(messages)) + } + + if messages[0].ID != "1" || messages[0].Text != "first" { + t.Errorf("First message incorrect: %+v", messages[0]) + } + + if messages[1].ID != "2" || messages[1].Text != "second" { + t.Errorf("Second message incorrect: %+v", messages[1]) + } +}