Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions foreign/go/binary_serialization/binary_response_deserializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ func DeserializeOffset(payload []byte) *iggcon.ConsumerOffsetInfo {
}

func DeserializeStream(payload []byte) (*iggcon.StreamDetails, error) {
stream, pos := DeserializeToStream(payload, 0)
stream, pos, err := DeserializeToStream(payload, 0)
if err != nil {
return nil, err
}
topics := make([]iggcon.Topic, 0)
for pos < len(payload) {
topic, readBytes, err := DeserializeToTopic(payload, pos)
Expand All @@ -74,33 +77,47 @@ func DeserializeStream(payload []byte) (*iggcon.StreamDetails, error) {
}, nil
}

func DeserializeStreams(payload []byte) []iggcon.Stream {
func DeserializeStreams(payload []byte) ([]iggcon.Stream, error) {
streams := make([]iggcon.Stream, 0)
position := 0

//TODO there's a deserialization bug, investigate this
//it occurs only with payload greater than 2 pow 16
for position < len(payload) {
stream, readBytes := DeserializeToStream(payload, position)
stream, readBytes, err := DeserializeToStream(payload, position)
if err != nil {
return nil, fmt.Errorf("failed to deserialize stream at offset %d: %w", position, err)
}
streams = append(streams, stream)
position += readBytes
}

return streams
return streams, nil
}

func DeserializeToStream(payload []byte, position int) (iggcon.Stream, int) {
const streamFixedSize = 4 + 8 + 4 + 8 + 8 + 1 // 33 bytes: id + created_at + topics_count + size_bytes + messages_count + name_len

func DeserializeToStream(payload []byte, position int) (iggcon.Stream, int, error) {
remaining := len(payload) - position
if remaining < streamFixedSize {
return iggcon.Stream{}, 0, fmt.Errorf(
"not enough data to read stream header: need %d bytes, got %d",
streamFixedSize, remaining)
}

id := binary.LittleEndian.Uint32(payload[position : position+4])
createdAt := binary.LittleEndian.Uint64(payload[position+4 : position+12])
topicsCount := binary.LittleEndian.Uint32(payload[position+12 : position+16])
sizeBytes := binary.LittleEndian.Uint64(payload[position+16 : position+24])
messagesCount := binary.LittleEndian.Uint64(payload[position+24 : position+32])
nameLength := int(payload[position+32])

nameBytes := payload[position+33 : position+33+nameLength]
name := string(nameBytes)
totalSize := streamFixedSize + nameLength
if remaining < totalSize {
return iggcon.Stream{}, 0, fmt.Errorf(
"not enough data to read stream name: need %d bytes, got %d",
totalSize, remaining)
}

readBytes := 4 + 8 + 4 + 8 + 8 + 1 + nameLength
name := string(payload[position+33 : position+33+nameLength])

return iggcon.Stream{
Id: id,
Expand All @@ -109,7 +126,7 @@ func DeserializeToStream(payload []byte, position int) (iggcon.Stream, int) {
SizeBytes: sizeBytes,
MessagesCount: messagesCount,
CreatedAt: createdAt,
}, readBytes
}, totalSize, nil
}

func DeserializeFetchMessagesResponse(payload []byte, compression iggcon.IggyMessageCompression) (*iggcon.PolledMessage, error) {
Expand Down
160 changes: 160 additions & 0 deletions foreign/go/binary_serialization/binary_response_deserializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package binaryserialization

import (
"encoding/binary"
"fmt"
"strings"
"testing"

Expand Down Expand Up @@ -86,3 +87,162 @@ func TestDeserializeFetchMessages_EmptyPayload(t *testing.T) {
t.Fatalf("expected 0 messages, got %d", len(result.Messages))
}
}

func encodeStream(id uint32, createdAt uint64, topicsCount uint32, sizeBytes, messagesCount uint64, name string) []byte {
nameBytes := []byte(name)
buf := make([]byte, streamFixedSize+len(nameBytes))
binary.LittleEndian.PutUint32(buf[0:4], id)
binary.LittleEndian.PutUint64(buf[4:12], createdAt)
binary.LittleEndian.PutUint32(buf[12:16], topicsCount)
binary.LittleEndian.PutUint64(buf[16:24], sizeBytes)
binary.LittleEndian.PutUint64(buf[24:32], messagesCount)
buf[32] = byte(len(nameBytes))
copy(buf[33:], nameBytes)
return buf
}

func assertStream(t *testing.T, label string, got iggcon.Stream, wantId uint32, wantCreatedAt uint64, wantTopicsCount uint32, wantSizeBytes, wantMessagesCount uint64, wantName string) {
t.Helper()
if got.Id != wantId {
t.Errorf("%s.Id = %d, want %d", label, got.Id, wantId)
}
if got.CreatedAt != wantCreatedAt {
t.Errorf("%s.CreatedAt = %d, want %d", label, got.CreatedAt, wantCreatedAt)
}
if got.TopicsCount != wantTopicsCount {
t.Errorf("%s.TopicsCount = %d, want %d", label, got.TopicsCount, wantTopicsCount)
}
if got.SizeBytes != wantSizeBytes {
t.Errorf("%s.SizeBytes = %d, want %d", label, got.SizeBytes, wantSizeBytes)
}
if got.MessagesCount != wantMessagesCount {
t.Errorf("%s.MessagesCount = %d, want %d", label, got.MessagesCount, wantMessagesCount)
}
if got.Name != wantName {
t.Errorf("%s.Name = %q, want %q", label, got.Name, wantName)
}
}

func TestDeserializeToStream_SingleStream(t *testing.T) {
payload := encodeStream(42, 1_710_000_000, 5, 2048, 100, "my-stream")

stream, readBytes, err := DeserializeToStream(payload, 0)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if readBytes != len(payload) {
t.Fatalf("readBytes = %d, want %d", readBytes, len(payload))
}
assertStream(t, "stream", stream, 42, 1_710_000_000, 5, 2048, 100, "my-stream")
}

func TestDeserializeToStream_TruncatedHeader(t *testing.T) {
payload := make([]byte, streamFixedSize-1)
_, _, err := DeserializeToStream(payload, 0)
if err == nil {
t.Fatal("expected error for truncated header, got nil")
}
}

func TestDeserializeToStream_TruncatedName(t *testing.T) {
buf := make([]byte, streamFixedSize)
buf[32] = 10
_, _, err := DeserializeToStream(buf, 0)
if err == nil {
t.Fatal("expected error for truncated name, got nil")
}
}

func TestDeserializeStreams_Empty(t *testing.T) {
streams, err := DeserializeStreams([]byte{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(streams) != 0 {
t.Fatalf("expected 0 streams, got %d", len(streams))
}
}

func TestDeserializeStreams_MultipleStreams(t *testing.T) {
var payload []byte
payload = append(payload, encodeStream(1, 100, 2, 512, 50, "stream-one")...)
payload = append(payload, encodeStream(2, 200, 0, 0, 0, "s2")...)
payload = append(payload, encodeStream(3, 300, 1, 1024, 10, "third")...)

streams, err := DeserializeStreams(payload)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(streams) != 3 {
t.Fatalf("expected 3 streams, got %d", len(streams))
}

assertStream(t, "stream[0]", streams[0], 1, 100, 2, 512, 50, "stream-one")
assertStream(t, "stream[1]", streams[1], 2, 200, 0, 0, 0, "s2")
assertStream(t, "stream[2]", streams[2], 3, 300, 1, 1024, 10, "third")
}

func TestDeserializeStreams_CorruptedPayload(t *testing.T) {
good := encodeStream(1, 100, 0, 0, 0, "ok")
truncated := make([]byte, streamFixedSize-5)
payload := append(good, truncated...)

_, err := DeserializeStreams(payload)
if err == nil {
t.Fatal("expected error for corrupted payload, got nil")
}
}

// Regression test for issue #3130: payloads > 64KB produced corrupted
// stream lists because no bounds checking was performed.
func TestDeserializeStreams_LargePayloadOver64KB(t *testing.T) {
const targetSize = 70_000
var payload []byte
var id uint32

for len(payload) < targetSize {
id++
name := fmt.Sprintf("stream-with-a-longer-name-for-padding-%d", id)
payload = append(payload, encodeStream(id, uint64(id)*1000, id%10, uint64(id)*512, uint64(id)*5, name)...)
}

if len(payload) <= 1<<16 {
t.Fatalf("payload size %d is not > 64KB; increase stream count or name length", len(payload))
}

streams, err := DeserializeStreams(payload)
if err != nil {
t.Fatalf("unexpected error deserializing %d-byte payload: %v", len(payload), err)
}

if uint32(len(streams)) != id {
t.Fatalf("expected %d streams, got %d", id, len(streams))
}

for i, s := range streams {
expectedId := uint32(i + 1)
expectedName := fmt.Sprintf("stream-with-a-longer-name-for-padding-%d", expectedId)
assertStream(t, fmt.Sprintf("stream[%d]", i), s,
expectedId, uint64(expectedId)*1000, expectedId%10,
uint64(expectedId)*512, uint64(expectedId)*5, expectedName)
}
}

func TestDeserializeStreams_MaxLengthName(t *testing.T) {
name := make([]byte, 255)
for i := range name {
name[i] = 'a' + byte(i%26)
}
payload := encodeStream(1, 999, 3, 4096, 200, string(name))

streams, err := DeserializeStreams(payload)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(streams) != 1 {
t.Fatalf("expected 1 stream, got %d", len(streams))
}
if streams[0].Name != string(name) {
t.Errorf("Name length = %d, want 255", len(streams[0].Name))
}
}
2 changes: 1 addition & 1 deletion foreign/go/client/tcp/tcp_stream_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (c *IggyTcpClient) GetStreams() ([]iggcon.Stream, error) {
return nil, err
}

return binaryserialization.DeserializeStreams(buffer), nil
return binaryserialization.DeserializeStreams(buffer)
}

func (c *IggyTcpClient) GetStream(streamId iggcon.Identifier) (*iggcon.StreamDetails, error) {
Expand Down
Loading