From 91e6df0a41ae5737c556570a83b90f3d268eea40 Mon Sep 17 00:00:00 2001 From: Atharva Lade Date: Fri, 24 Apr 2026 10:52:12 -0500 Subject: [PATCH] fix(go-sdk): add bounds checking to DeserializeStreams for payloads > 64KB --- .../binary_response_deserializer.go | 39 +++-- .../binary_response_deserializer_test.go | 160 ++++++++++++++++++ .../go/client/tcp/tcp_stream_management.go | 2 +- 3 files changed, 189 insertions(+), 12 deletions(-) diff --git a/foreign/go/binary_serialization/binary_response_deserializer.go b/foreign/go/binary_serialization/binary_response_deserializer.go index 3de147010f..fa2bb938b3 100644 --- a/foreign/go/binary_serialization/binary_response_deserializer.go +++ b/foreign/go/binary_serialization/binary_response_deserializer.go @@ -53,7 +53,10 @@ func DeserializeOffset(payload []byte) *iggcon.ConsumerOffsetInfo { } func DeserializeStream(payload []byte) (*iggcon.StreamDetails, error) { - stream, pos := DeserializeToStream(payload, 0) + stream, pos, err := DeserializeToStream(payload, 0) + if err != nil { + return nil, err + } topics := make([]iggcon.Topic, 0) for pos < len(payload) { topic, readBytes, err := DeserializeToTopic(payload, pos) @@ -74,22 +77,32 @@ func DeserializeStream(payload []byte) (*iggcon.StreamDetails, error) { }, nil } -func DeserializeStreams(payload []byte) []iggcon.Stream { +func DeserializeStreams(payload []byte) ([]iggcon.Stream, error) { streams := make([]iggcon.Stream, 0) position := 0 - //TODO there's a deserialization bug, investigate this - //it occurs only with payload greater than 2 pow 16 for position < len(payload) { - stream, readBytes := DeserializeToStream(payload, position) + stream, readBytes, err := DeserializeToStream(payload, position) + if err != nil { + return nil, fmt.Errorf("failed to deserialize stream at offset %d: %w", position, err) + } streams = append(streams, stream) position += readBytes } - return streams + return streams, nil } -func DeserializeToStream(payload []byte, position int) (iggcon.Stream, int) { +const streamFixedSize = 4 + 8 + 4 + 8 + 8 + 1 // 33 bytes: id + created_at + topics_count + size_bytes + messages_count + name_len + +func DeserializeToStream(payload []byte, position int) (iggcon.Stream, int, error) { + remaining := len(payload) - position + if remaining < streamFixedSize { + return iggcon.Stream{}, 0, fmt.Errorf( + "not enough data to read stream header: need %d bytes, got %d", + streamFixedSize, remaining) + } + id := binary.LittleEndian.Uint32(payload[position : position+4]) createdAt := binary.LittleEndian.Uint64(payload[position+4 : position+12]) topicsCount := binary.LittleEndian.Uint32(payload[position+12 : position+16]) @@ -97,10 +110,14 @@ func DeserializeToStream(payload []byte, position int) (iggcon.Stream, int) { messagesCount := binary.LittleEndian.Uint64(payload[position+24 : position+32]) nameLength := int(payload[position+32]) - nameBytes := payload[position+33 : position+33+nameLength] - name := string(nameBytes) + totalSize := streamFixedSize + nameLength + if remaining < totalSize { + return iggcon.Stream{}, 0, fmt.Errorf( + "not enough data to read stream name: need %d bytes, got %d", + totalSize, remaining) + } - readBytes := 4 + 8 + 4 + 8 + 8 + 1 + nameLength + name := string(payload[position+33 : position+33+nameLength]) return iggcon.Stream{ Id: id, @@ -109,7 +126,7 @@ func DeserializeToStream(payload []byte, position int) (iggcon.Stream, int) { SizeBytes: sizeBytes, MessagesCount: messagesCount, CreatedAt: createdAt, - }, readBytes + }, totalSize, nil } func DeserializeFetchMessagesResponse(payload []byte, compression iggcon.IggyMessageCompression) (*iggcon.PolledMessage, error) { diff --git a/foreign/go/binary_serialization/binary_response_deserializer_test.go b/foreign/go/binary_serialization/binary_response_deserializer_test.go index 1b4c0c0230..9165c9946a 100644 --- a/foreign/go/binary_serialization/binary_response_deserializer_test.go +++ b/foreign/go/binary_serialization/binary_response_deserializer_test.go @@ -19,6 +19,7 @@ package binaryserialization import ( "encoding/binary" + "fmt" "strings" "testing" @@ -86,3 +87,162 @@ func TestDeserializeFetchMessages_EmptyPayload(t *testing.T) { t.Fatalf("expected 0 messages, got %d", len(result.Messages)) } } + +func encodeStream(id uint32, createdAt uint64, topicsCount uint32, sizeBytes, messagesCount uint64, name string) []byte { + nameBytes := []byte(name) + buf := make([]byte, streamFixedSize+len(nameBytes)) + binary.LittleEndian.PutUint32(buf[0:4], id) + binary.LittleEndian.PutUint64(buf[4:12], createdAt) + binary.LittleEndian.PutUint32(buf[12:16], topicsCount) + binary.LittleEndian.PutUint64(buf[16:24], sizeBytes) + binary.LittleEndian.PutUint64(buf[24:32], messagesCount) + buf[32] = byte(len(nameBytes)) + copy(buf[33:], nameBytes) + return buf +} + +func assertStream(t *testing.T, label string, got iggcon.Stream, wantId uint32, wantCreatedAt uint64, wantTopicsCount uint32, wantSizeBytes, wantMessagesCount uint64, wantName string) { + t.Helper() + if got.Id != wantId { + t.Errorf("%s.Id = %d, want %d", label, got.Id, wantId) + } + if got.CreatedAt != wantCreatedAt { + t.Errorf("%s.CreatedAt = %d, want %d", label, got.CreatedAt, wantCreatedAt) + } + if got.TopicsCount != wantTopicsCount { + t.Errorf("%s.TopicsCount = %d, want %d", label, got.TopicsCount, wantTopicsCount) + } + if got.SizeBytes != wantSizeBytes { + t.Errorf("%s.SizeBytes = %d, want %d", label, got.SizeBytes, wantSizeBytes) + } + if got.MessagesCount != wantMessagesCount { + t.Errorf("%s.MessagesCount = %d, want %d", label, got.MessagesCount, wantMessagesCount) + } + if got.Name != wantName { + t.Errorf("%s.Name = %q, want %q", label, got.Name, wantName) + } +} + +func TestDeserializeToStream_SingleStream(t *testing.T) { + payload := encodeStream(42, 1_710_000_000, 5, 2048, 100, "my-stream") + + stream, readBytes, err := DeserializeToStream(payload, 0) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if readBytes != len(payload) { + t.Fatalf("readBytes = %d, want %d", readBytes, len(payload)) + } + assertStream(t, "stream", stream, 42, 1_710_000_000, 5, 2048, 100, "my-stream") +} + +func TestDeserializeToStream_TruncatedHeader(t *testing.T) { + payload := make([]byte, streamFixedSize-1) + _, _, err := DeserializeToStream(payload, 0) + if err == nil { + t.Fatal("expected error for truncated header, got nil") + } +} + +func TestDeserializeToStream_TruncatedName(t *testing.T) { + buf := make([]byte, streamFixedSize) + buf[32] = 10 + _, _, err := DeserializeToStream(buf, 0) + if err == nil { + t.Fatal("expected error for truncated name, got nil") + } +} + +func TestDeserializeStreams_Empty(t *testing.T) { + streams, err := DeserializeStreams([]byte{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(streams) != 0 { + t.Fatalf("expected 0 streams, got %d", len(streams)) + } +} + +func TestDeserializeStreams_MultipleStreams(t *testing.T) { + var payload []byte + payload = append(payload, encodeStream(1, 100, 2, 512, 50, "stream-one")...) + payload = append(payload, encodeStream(2, 200, 0, 0, 0, "s2")...) + payload = append(payload, encodeStream(3, 300, 1, 1024, 10, "third")...) + + streams, err := DeserializeStreams(payload) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(streams) != 3 { + t.Fatalf("expected 3 streams, got %d", len(streams)) + } + + assertStream(t, "stream[0]", streams[0], 1, 100, 2, 512, 50, "stream-one") + assertStream(t, "stream[1]", streams[1], 2, 200, 0, 0, 0, "s2") + assertStream(t, "stream[2]", streams[2], 3, 300, 1, 1024, 10, "third") +} + +func TestDeserializeStreams_CorruptedPayload(t *testing.T) { + good := encodeStream(1, 100, 0, 0, 0, "ok") + truncated := make([]byte, streamFixedSize-5) + payload := append(good, truncated...) + + _, err := DeserializeStreams(payload) + if err == nil { + t.Fatal("expected error for corrupted payload, got nil") + } +} + +// Regression test for issue #3130: payloads > 64KB produced corrupted +// stream lists because no bounds checking was performed. +func TestDeserializeStreams_LargePayloadOver64KB(t *testing.T) { + const targetSize = 70_000 + var payload []byte + var id uint32 + + for len(payload) < targetSize { + id++ + name := fmt.Sprintf("stream-with-a-longer-name-for-padding-%d", id) + payload = append(payload, encodeStream(id, uint64(id)*1000, id%10, uint64(id)*512, uint64(id)*5, name)...) + } + + if len(payload) <= 1<<16 { + t.Fatalf("payload size %d is not > 64KB; increase stream count or name length", len(payload)) + } + + streams, err := DeserializeStreams(payload) + if err != nil { + t.Fatalf("unexpected error deserializing %d-byte payload: %v", len(payload), err) + } + + if uint32(len(streams)) != id { + t.Fatalf("expected %d streams, got %d", id, len(streams)) + } + + for i, s := range streams { + expectedId := uint32(i + 1) + expectedName := fmt.Sprintf("stream-with-a-longer-name-for-padding-%d", expectedId) + assertStream(t, fmt.Sprintf("stream[%d]", i), s, + expectedId, uint64(expectedId)*1000, expectedId%10, + uint64(expectedId)*512, uint64(expectedId)*5, expectedName) + } +} + +func TestDeserializeStreams_MaxLengthName(t *testing.T) { + name := make([]byte, 255) + for i := range name { + name[i] = 'a' + byte(i%26) + } + payload := encodeStream(1, 999, 3, 4096, 200, string(name)) + + streams, err := DeserializeStreams(payload) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(streams) != 1 { + t.Fatalf("expected 1 stream, got %d", len(streams)) + } + if streams[0].Name != string(name) { + t.Errorf("Name length = %d, want 255", len(streams[0].Name)) + } +} diff --git a/foreign/go/client/tcp/tcp_stream_management.go b/foreign/go/client/tcp/tcp_stream_management.go index 9fbba5b4cc..932c6ac4ee 100644 --- a/foreign/go/client/tcp/tcp_stream_management.go +++ b/foreign/go/client/tcp/tcp_stream_management.go @@ -30,7 +30,7 @@ func (c *IggyTcpClient) GetStreams() ([]iggcon.Stream, error) { return nil, err } - return binaryserialization.DeserializeStreams(buffer), nil + return binaryserialization.DeserializeStreams(buffer) } func (c *IggyTcpClient) GetStream(streamId iggcon.Identifier) (*iggcon.StreamDetails, error) {