From 097595c4cd0b9802608cb5e8ba6c2fb30f89b328 Mon Sep 17 00:00:00 2001 From: chengxi Date: Wed, 25 Mar 2026 20:40:34 -0400 Subject: [PATCH 1/3] fix(go): update stats field to align with rust SDK --- examples/go/go.sum | 2 + .../binary_serialization/stats_serializer.go | 90 -------- .../stats_serializer_test.go | 127 ----------- foreign/go/client/tcp/tcp_utilities.go | 7 +- foreign/go/contracts/stats.go | 159 ++++++++++++-- foreign/go/contracts/stats_test.go | 206 ++++++++++++++++++ foreign/go/go.mod | 1 + 7 files changed, 349 insertions(+), 243 deletions(-) delete mode 100644 foreign/go/binary_serialization/stats_serializer.go delete mode 100644 foreign/go/binary_serialization/stats_serializer_test.go create mode 100644 foreign/go/contracts/stats_test.go diff --git a/examples/go/go.sum b/examples/go/go.sum index 689bd0ab24..c85130ef52 100644 --- a/examples/go/go.sum +++ b/examples/go/go.sum @@ -2,6 +2,8 @@ github.com/avast/retry-go/v5 v5.0.0 h1:kf1Qc2UsTZ4qq8elDymqfbISvkyMuhgRxuJqX2NHP github.com/avast/retry-go/v5 v5.0.0/go.mod h1://d+usmKWio1agtZfS1H/ltTqwtIfBnRq9zEwjc3eH8= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= diff --git a/foreign/go/binary_serialization/stats_serializer.go b/foreign/go/binary_serialization/stats_serializer.go deleted file mode 100644 index 101cc0730d..0000000000 --- a/foreign/go/binary_serialization/stats_serializer.go +++ /dev/null @@ -1,90 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package binaryserialization - -import ( - "encoding/binary" - "math" - - iggcon "github.com/apache/iggy/foreign/go/contracts" -) - -type TcpStats struct { - iggcon.Stats -} - -// Constants for byte positions and lengths in the payload. -const ( - processIDPos = 0 - cpuUsagePos = 4 - totalCpuUsagePos = 8 - memoryUsagePos = 12 - totalMemoryPos = 20 - availableMemoryPos = 28 - runTimePos = 36 - startTimePos = 44 - readBytesPos = 52 - writtenBytesPos = 60 - messagesSizeBytesPos = 68 - streamsCountPos = 76 - topicsCountPos = 80 - partitionsCountPos = 84 - segmentsCountPos = 88 - messagesCountPos = 92 - clientsCountPos = 100 - consumerGroupsCountPos = 104 -) - -func (stats *TcpStats) Deserialize(payload []byte) error { - stats.ProcessId = binary.LittleEndian.Uint32(payload[processIDPos : processIDPos+4]) - stats.CpuUsage = math.Float32frombits(binary.LittleEndian.Uint32(payload[cpuUsagePos : cpuUsagePos+4])) - stats.TotalCpuUsage = math.Float32frombits(binary.LittleEndian.Uint32(payload[totalCpuUsagePos : totalCpuUsagePos+4])) - stats.MemoryUsage = binary.LittleEndian.Uint64(payload[memoryUsagePos : memoryUsagePos+8]) - stats.TotalMemory = binary.LittleEndian.Uint64(payload[totalMemoryPos : totalMemoryPos+8]) - stats.AvailableMemory = binary.LittleEndian.Uint64(payload[availableMemoryPos : availableMemoryPos+8]) - stats.RunTime = binary.LittleEndian.Uint64(payload[runTimePos : runTimePos+8]) - stats.StartTime = binary.LittleEndian.Uint64(payload[startTimePos : startTimePos+8]) - stats.ReadBytes = binary.LittleEndian.Uint64(payload[readBytesPos : readBytesPos+8]) - stats.WrittenBytes = binary.LittleEndian.Uint64(payload[writtenBytesPos : writtenBytesPos+8]) - stats.MessagesSizeBytes = binary.LittleEndian.Uint64(payload[messagesSizeBytesPos : messagesSizeBytesPos+8]) - stats.StreamsCount = binary.LittleEndian.Uint32(payload[streamsCountPos : streamsCountPos+4]) - stats.TopicsCount = binary.LittleEndian.Uint32(payload[topicsCountPos : topicsCountPos+4]) - stats.PartitionsCount = binary.LittleEndian.Uint32(payload[partitionsCountPos : partitionsCountPos+4]) - stats.SegmentsCount = binary.LittleEndian.Uint32(payload[segmentsCountPos : segmentsCountPos+4]) - stats.MessagesCount = binary.LittleEndian.Uint64(payload[messagesCountPos : messagesCountPos+8]) - stats.ClientsCount = binary.LittleEndian.Uint32(payload[clientsCountPos : clientsCountPos+4]) - stats.ConsumerGroupsCount = binary.LittleEndian.Uint32(payload[consumerGroupsCountPos : consumerGroupsCountPos+4]) - - position := consumerGroupsCountPos + 4 - hostnameLength := int(binary.LittleEndian.Uint32(payload[position : position+4])) - stats.Hostname = string(payload[position+4 : position+4+hostnameLength]) - position += 4 + hostnameLength - - osNameLength := int(binary.LittleEndian.Uint32(payload[position : position+4])) - stats.OsName = string(payload[position+4 : position+4+osNameLength]) - position += 4 + osNameLength - - osVersionLength := int(binary.LittleEndian.Uint32(payload[position : position+4])) - stats.OsVersion = string(payload[position+4 : position+4+osVersionLength]) - position += 4 + osVersionLength - - kernelVersionLength := int(binary.LittleEndian.Uint32(payload[position : position+4])) - stats.KernelVersion = string(payload[position+4 : position+4+kernelVersionLength]) - - return nil -} diff --git a/foreign/go/binary_serialization/stats_serializer_test.go b/foreign/go/binary_serialization/stats_serializer_test.go deleted file mode 100644 index 2e5004ce2b..0000000000 --- a/foreign/go/binary_serialization/stats_serializer_test.go +++ /dev/null @@ -1,127 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package binaryserialization - -import ( - "testing" -) - -func TestDeserialize(t *testing.T) { - payload2 := []byte{ - 240, 68, 0, 0, //"process_id": 17648, - 124, 202, 146, 58, //"cpu_usage": 0.0011199261, - 124, 202, 146, 58, //"total_cpu_usage": 0.0011199261, - 0, 224, 66, 3, 0, 0, 0, 0, //"memory_usage": 54714368, - 0, 32, 123, 208, 7, 0, 0, 0, //"total_memory": 33562501120, - 0, 224, 28, 253, 4, 0, 0, 0, //"available_memory": 21426397184, - 169, 13, 0, 0, 0, 0, 0, 0, //"run_time": 3497, - 239, 201, 6, 101, 0, 0, 0, 0, //"start_time": 1694943727, - 0, 0, 0, 0, 0, 0, 0, 0, //"read_bytes": 0, - 0, 128, 8, 0, 0, 0, 0, 0, //"written_bytes": 557056, - 138, 2, 0, 0, 0, 0, 0, 0, //"messages_size_bytes": 650, - 1, 0, 0, 0, //"streams_count": 1, - 1, 0, 0, 0, //"topics_count": 1, - 12, 0, 0, 0, //"partitions_count": 12, - 12, 0, 0, 0, //"segments_count": 12, - 4, 0, 0, 0, 0, 0, 0, 0, //"messages_count": 4, - 11, 0, 0, 0, //"clients_count": 11, - 0, 0, 0, 0, //"consumer_groups_count": 0, - 6, 0, 0, 0, //hostname length: 6 - 112, 111, 112, 45, 111, 115, //"hostname": "pop-os", - 7, 0, 0, 0, //os name length :7 - 80, 111, 112, 33, 95, 79, 83, //"os_name": "Pop!_OS", - 19, 0, 0, 0, //os version length: 19 - 76, 105, 110, 117, 120, 32, 50, 50, 46, 48, 52, 32, 80, 111, 112, 33, 95, 79, 83, //"os_version": "Linux 22.04 Pop!_OS", - 22, 0, 0, 0, //kernel version length: 22 - 54, 46, 52, 46, 54, 45, 55, 54, 48, 54, 48, 52, 48, 54, 45, 103, 101, 110, 101, 114, 105, 99, //"kernel_version": "6.4.6-76060406-generic" - } - - // Create a TcpStats object and deserialize the payload - var stats TcpStats - err := stats.Deserialize(payload2) - - // Check if there was an error during deserialization - if err != nil { - t.Errorf("Deserialization error: %v", err) - } - - // Verify the deserialized values - if stats.ProcessId != 17648 { - t.Errorf("ProcessId is incorrect. Expected: 17648, Got: %d", stats.ProcessId) - } - if stats.CpuUsage != 0.0011199261 { - t.Errorf("CPUUsage is incorrect. Expected: 0.0011199261, Got: %f", stats.CpuUsage) - } - if stats.MemoryUsage != 54714368 { - t.Errorf("MemoryUsage is incorrect. Expected: 54714368, Got: %d", stats.MemoryUsage) - } - if stats.TotalMemory != 33562501120 { - t.Errorf("TotalMemory is incorrect. Expected: 33562501120, Got: %d", stats.TotalMemory) - } - if stats.AvailableMemory != 21426397184 { - t.Errorf("AvailableMemory is incorrect. Expected: 21426397184, Got: %d", stats.AvailableMemory) - } - if stats.RunTime != 3497 { - t.Errorf("RunTime is incorrect. Expected: 3497, Got: %d", stats.RunTime) - } - if stats.StartTime != 1694943727 { - t.Errorf("StartTime is incorrect. Expected: 1694943727, Got: %d", stats.StartTime) - } - if stats.ReadBytes != 0 { - t.Errorf("ReadBytes is incorrect. Expected: 0, Got: %d", stats.ReadBytes) - } - if stats.WrittenBytes != 557056 { - t.Errorf("WrittenBytes is incorrect. Expected: 557056, Got: %d", stats.WrittenBytes) - } - if stats.MessagesSizeBytes != 650 { - t.Errorf("MessagesSizeBytes is incorrect. Expected: 650, Got: %d", stats.MessagesSizeBytes) - } - if stats.StreamsCount != 1 { - t.Errorf("StreamsCount is incorrect. Expected: 1, Got: %d", stats.StreamsCount) - } - if stats.TopicsCount != 1 { - t.Errorf("TopicsCount is incorrect. Expected: 1, Got: %d", stats.TopicsCount) - } - if stats.PartitionsCount != 12 { - t.Errorf("PartitionsCount is incorrect. Expected: 12, Got: %d", stats.PartitionsCount) - } - if stats.SegmentsCount != 12 { - t.Errorf("SegmentsCount is incorrect. Expected: 12, Got: %d", stats.SegmentsCount) - } - if stats.MessagesCount != 4 { - t.Errorf("MessagesCount is incorrect. Expected: 4, Got: %d", stats.MessagesCount) - } - if stats.ClientsCount != 11 { - t.Errorf("ClientsCount is incorrect. Expected: 11, Got: %d", stats.ClientsCount) - } - if stats.ConsumerGroupsCount != 0 { - t.Errorf("ConsumerGroupsCount is incorrect. Expected: 0, Got: %d", stats.ConsumerGroupsCount) - } - if stats.Hostname != "pop-os" { - t.Errorf("Hostname is incorrect. Expected: \"pop-os\", Got: \"%s\"", stats.Hostname) - } - if stats.OsName != "Pop!_OS" { - t.Errorf("OsName is incorrect. Expected: \"Pop!_OS\", Got: \"%s\"", stats.OsName) - } - if stats.OsVersion != "Linux 22.04 Pop!_OS" { - t.Errorf("OsVersion is incorrect. Expected: \"Linux 22.04 Pop!_OS\", Got: \"%s\"", stats.OsVersion) - } - if stats.KernelVersion != "6.4.6-76060406-generic" { - t.Errorf("KernelVersion is incorrect. Expected: \"6.4.6-76060406-generic\", Got: \"%s\"", stats.KernelVersion) - } -} diff --git a/foreign/go/client/tcp/tcp_utilities.go b/foreign/go/client/tcp/tcp_utilities.go index 0da1784cb1..f3338adece 100644 --- a/foreign/go/client/tcp/tcp_utilities.go +++ b/foreign/go/client/tcp/tcp_utilities.go @@ -18,7 +18,6 @@ package tcp import ( - binaryserialization "github.com/apache/iggy/foreign/go/binary_serialization" iggcon "github.com/apache/iggy/foreign/go/contracts" "github.com/apache/iggy/foreign/go/internal/command" ) @@ -29,10 +28,10 @@ func (c *IggyTcpClient) GetStats() (*iggcon.Stats, error) { return nil, err } - stats := &binaryserialization.TcpStats{} - err = stats.Deserialize(buffer) + s := &iggcon.Stats{} + err = s.UnmarshalBinary(buffer) - return &stats.Stats, err + return s, err } func (c *IggyTcpClient) Ping() error { diff --git a/foreign/go/contracts/stats.go b/foreign/go/contracts/stats.go index 1409b01e58..06eaf2ce06 100644 --- a/foreign/go/contracts/stats.go +++ b/foreign/go/contracts/stats.go @@ -17,27 +17,142 @@ package iggcon +import ( + "fmt" + + "github.com/apache/iggy/foreign/go/internal/codec" +) + +// CacheMetrics holds cache hit/miss statistics for a single partition. +type CacheMetrics struct { + StreamId uint32 `json:"stream_id"` + TopicId uint32 `json:"topic_id"` + PartitionId uint32 `json:"partition_id"` + Hits uint64 `json:"hits"` + Misses uint64 `json:"misses"` + HitRatio float32 `json:"hit_ratio"` +} + +// cacheMetricsWireSize is the fixed size of a CacheMetrics entry on the wire: +// stream_id(4) + topic_id(4) + partition_id(4) + hits(8) + misses(8) + hit_ratio(4) = 32. +const cacheMetricsWireSize = 4 + 4 + 4 + 8 + 8 + 4 + type Stats struct { - ProcessId uint32 `json:"process_id"` - CpuUsage float32 `json:"cpu_usage"` - TotalCpuUsage float32 `json:"total_cpu_usage"` - MemoryUsage uint64 `json:"memory_usage"` - TotalMemory uint64 `json:"total_memory"` - AvailableMemory uint64 `json:"available_memory"` - RunTime uint64 `json:"run_time"` - StartTime uint64 `json:"start_time"` - ReadBytes uint64 `json:"read_bytes"` - WrittenBytes uint64 `json:"written_bytes"` - MessagesSizeBytes uint64 `json:"messages_size_bytes"` - StreamsCount uint32 `json:"streams_count"` - TopicsCount uint32 `json:"topics_count"` - PartitionsCount uint32 `json:"partitions_count"` - SegmentsCount uint32 `json:"segments_count"` - MessagesCount uint64 `json:"messages_count"` - ClientsCount uint32 `json:"clients_count"` - ConsumerGroupsCount uint32 `json:"consumer_groups_count"` - Hostname string `json:"hostname"` - OsName string `json:"os_name"` - OsVersion string `json:"os_version"` - KernelVersion string `json:"kernel_version"` + ProcessId uint32 `json:"process_id"` + CpuUsage float32 `json:"cpu_usage"` + TotalCpuUsage float32 `json:"total_cpu_usage"` + MemoryUsage uint64 `json:"memory_usage"` + TotalMemory uint64 `json:"total_memory"` + AvailableMemory uint64 `json:"available_memory"` + RunTime uint64 `json:"run_time"` + StartTime uint64 `json:"start_time"` + ReadBytes uint64 `json:"read_bytes"` + WrittenBytes uint64 `json:"written_bytes"` + MessagesSizeBytes uint64 `json:"messages_size_bytes"` + StreamsCount uint32 `json:"streams_count"` + TopicsCount uint32 `json:"topics_count"` + PartitionsCount uint32 `json:"partitions_count"` + SegmentsCount uint32 `json:"segments_count"` + MessagesCount uint64 `json:"messages_count"` + ClientsCount uint32 `json:"clients_count"` + ConsumerGroupsCount uint32 `json:"consumer_groups_count"` + Hostname string `json:"hostname"` + OsName string `json:"os_name"` + OsVersion string `json:"os_version"` + KernelVersion string `json:"kernel_version"` + IggyServerVersion string `json:"iggy_server_version"` + IggyServerSemver *uint32 `json:"iggy_server_semver,omitempty"` + CacheMetrics []CacheMetrics `json:"cache_metrics"` + ThreadsCount uint32 `json:"threads_count"` + FreeDiskSpace uint64 `json:"free_disk_space"` + TotalDiskSpace uint64 `json:"total_disk_space"` +} + +func (cm *CacheMetrics) MarshalBinary() ([]byte, error) { + w := codec.NewWriterCap(32) + w.U32(cm.StreamId) + w.U32(cm.TopicId) + w.U32(cm.PartitionId) + w.U64(cm.Hits) + w.U64(cm.Misses) + w.F32(cm.HitRatio) + return w.Bytes(), w.Err() +} + +func (cm *CacheMetrics) UnmarshalBinary(data []byte) error { + r := codec.NewReader(data) + cm.StreamId = r.U32() + cm.TopicId = r.U32() + cm.PartitionId = r.U32() + cm.Hits = r.U64() + cm.Misses = r.U64() + cm.HitRatio = r.F32() + return r.Err() +} + +func (s *Stats) UnmarshalBinary(payload []byte) error { + r := codec.NewReader(payload) + s.ProcessId = r.U32() + s.CpuUsage = r.F32() + s.TotalCpuUsage = r.F32() + s.MemoryUsage = r.U64() + s.TotalMemory = r.U64() + s.AvailableMemory = r.U64() + s.RunTime = r.U64() + s.StartTime = r.U64() + s.ReadBytes = r.U64() + s.WrittenBytes = r.U64() + s.MessagesSizeBytes = r.U64() + s.StreamsCount = r.U32() + s.TopicsCount = r.U32() + s.PartitionsCount = r.U32() + s.SegmentsCount = r.U32() + s.MessagesCount = r.U64() + s.ClientsCount = r.U32() + s.ConsumerGroupsCount = r.U32() + s.Hostname = r.U32LenStr() + s.OsName = r.U32LenStr() + s.OsVersion = r.U32LenStr() + s.KernelVersion = r.U32LenStr() + s.IggyServerVersion = r.U32LenStr() + + if r.Err() != nil { + return r.Err() + } + + // iggy_server_semver is optional. If at least 8 bytes remain, the next 4 + // bytes are the semver and the following 4 are cache_metrics_count. + // If only 4 bytes remain, there is no semver and those 4 bytes are cache_metrics_count. + if r.Remaining() >= 8 { + v := r.U32() + s.IggyServerSemver = &v + } + + cacheCount := int(r.U32()) + if r.Err() != nil { + return r.Err() + } + + if cacheCount > r.Remaining()/cacheMetricsWireSize { + return fmt.Errorf("stats: cache metrics count %d exceeds remaining bytes %d", cacheCount, r.Remaining()) + } + + if cacheCount > 0 { + s.CacheMetrics = make([]CacheMetrics, cacheCount) + for i := range s.CacheMetrics { + r.Obj(cacheMetricsWireSize, &s.CacheMetrics[i]) + } + } + + if r.Err() == nil && r.Remaining() >= 4 { + s.ThreadsCount = r.U32() + } + if r.Err() == nil && r.Remaining() >= 8 { + s.FreeDiskSpace = r.U64() + } + if r.Err() == nil && r.Remaining() >= 8 { + s.TotalDiskSpace = r.U64() + } + + return r.Err() } diff --git a/foreign/go/contracts/stats_test.go b/foreign/go/contracts/stats_test.go new file mode 100644 index 0000000000..e9d22dc5fc --- /dev/null +++ b/foreign/go/contracts/stats_test.go @@ -0,0 +1,206 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package iggcon + +import ( + "encoding/binary" + "math" + "strings" + "testing" + + "github.com/apache/iggy/foreign/go/internal/codec" + "github.com/google/go-cmp/cmp" +) + +// buildStatsPayload constructs a wire payload for Stats using the same +// layout as the Rust binary_protocol StatsResponse encoder. +// The semver field controls whether tail fields (threads, disk) are encoded. +func buildStatsPayload(t *testing.T, s Stats) []byte { + t.Helper() + w := codec.NewWriter() + w.U32(s.ProcessId) + w.F32(s.CpuUsage) + w.F32(s.TotalCpuUsage) + w.U64(s.MemoryUsage) + w.U64(s.TotalMemory) + w.U64(s.AvailableMemory) + w.U64(s.RunTime) + w.U64(s.StartTime) + w.U64(s.ReadBytes) + w.U64(s.WrittenBytes) + w.U64(s.MessagesSizeBytes) + w.U32(s.StreamsCount) + w.U32(s.TopicsCount) + w.U32(s.PartitionsCount) + w.U32(s.SegmentsCount) + w.U64(s.MessagesCount) + w.U32(s.ClientsCount) + w.U32(s.ConsumerGroupsCount) + w.U32LenStr(s.Hostname) + w.U32LenStr(s.OsName) + w.U32LenStr(s.OsVersion) + w.U32LenStr(s.KernelVersion) + w.U32LenStr(s.IggyServerVersion) + + if s.IggyServerSemver != nil { + w.U32(*s.IggyServerSemver) + } + + w.U32(uint32(len(s.CacheMetrics))) + for i := range s.CacheMetrics { + w.Obj(&s.CacheMetrics[i]) + } + + if s.IggyServerSemver != nil { + w.U32(s.ThreadsCount) + w.U64(s.FreeDiskSpace) + w.U64(s.TotalDiskSpace) + } + + if err := w.Err(); err != nil { + t.Fatalf("buildStatsPayload: %v", err) + } + return w.Bytes() +} + +func sampleStats() Stats { + iggyServerSemver := uint32(600) + return Stats{ + ProcessId: 1234, + CpuUsage: 25.5, + TotalCpuUsage: 50.0, + MemoryUsage: 1_073_741_824, + TotalMemory: 8_589_934_592, + AvailableMemory: 4_294_967_296, + RunTime: 3600, + StartTime: 1_710_000_000_000, + ReadBytes: 1_000_000, + WrittenBytes: 500_000, + MessagesSizeBytes: 2_000_000, + StreamsCount: 3, + TopicsCount: 10, + PartitionsCount: 30, + SegmentsCount: 90, + MessagesCount: 50_000, + ClientsCount: 5, + ConsumerGroupsCount: 2, + Hostname: "node-1", + OsName: "Linux", + OsVersion: "6.1", + KernelVersion: "6.1.0", + IggyServerVersion: "0.6.0", + IggyServerSemver: &iggyServerSemver, + ThreadsCount: 16, + FreeDiskSpace: 107_374_182_400, + TotalDiskSpace: 512_110_190_592, + } +} + +func assertStatsEqual(t *testing.T, got, want Stats) { + t.Helper() + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("Stats mismatch (-want +got):\n%s", diff) + } +} + +func TestUnmarshalBinary(t *testing.T) { + want := sampleStats() + payload := buildStatsPayload(t, want) + + var got Stats + if err := got.UnmarshalBinary(payload); err != nil { + t.Fatalf("UnmarshalBinary error: %v", err) + } + + assertStatsEqual(t, got, want) +} + +func TestUnmarshalBinary_maliciousCacheCount(t *testing.T) { + s := Stats{ + Hostname: "h", + OsName: "o", + OsVersion: "s", + KernelVersion: "t", + IggyServerVersion: "v", + } + payload := buildStatsPayload(t, s) + binary.LittleEndian.PutUint32(payload[len(payload)-4:], math.MaxUint32) + + var stats Stats + err := stats.UnmarshalBinary(payload) + if err == nil { + t.Fatal("expected error for oversized cache metrics count, got nil") + } + if !strings.Contains(err.Error(), "cache metrics count") { + t.Errorf("unexpected error: got %v, want error mentioning cache metrics count", err) + } +} + +func TestUnmarshalBinary_withCacheMetrics(t *testing.T) { + want := sampleStats() + want.CacheMetrics = []CacheMetrics{ + {StreamId: 1, TopicId: 1, PartitionId: 0, Hits: 1000, Misses: 50, HitRatio: 0.95238095}, + {StreamId: 2, TopicId: 3, PartitionId: 1, Hits: 0, Misses: 100, HitRatio: 0.0}, + } + payload := buildStatsPayload(t, want) + + var got Stats + if err := got.UnmarshalBinary(payload); err != nil { + t.Fatalf("UnmarshalBinary error: %v", err) + } + + assertStatsEqual(t, got, want) +} + +func TestUnmarshalBinary_noSemver(t *testing.T) { + want := sampleStats() + want.IggyServerSemver = nil + want.ThreadsCount = 0 + want.FreeDiskSpace = 0 + want.TotalDiskSpace = 0 + payload := buildStatsPayload(t, want) + + var got Stats + if err := got.UnmarshalBinary(payload); err != nil { + t.Fatalf("UnmarshalBinary error: %v", err) + } + + assertStatsEqual(t, got, want) +} + +func TestUnmarshalBinary_emptyStrings(t *testing.T) { + want := sampleStats() + want.Hostname = "" + want.OsName = "" + want.OsVersion = "" + want.KernelVersion = "" + want.IggyServerVersion = "" + want.IggyServerSemver = nil + want.ThreadsCount = 0 + want.FreeDiskSpace = 0 + want.TotalDiskSpace = 0 + payload := buildStatsPayload(t, want) + + var got Stats + if err := got.UnmarshalBinary(payload); err != nil { + t.Fatalf("UnmarshalBinary error: %v", err) + } + + assertStatsEqual(t, got, want) +} diff --git a/foreign/go/go.mod b/foreign/go/go.mod index 5ce8a47cdb..a712054fbd 100644 --- a/foreign/go/go.mod +++ b/foreign/go/go.mod @@ -4,6 +4,7 @@ go 1.25.0 require ( github.com/avast/retry-go/v5 v5.0.0 + github.com/google/go-cmp v0.7.0 github.com/google/uuid v1.6.0 github.com/klauspost/compress v1.18.0 github.com/stretchr/testify v1.11.1 From c7ced344a3b69ff7ec93ad1ba0a912d1d33c3cf1 Mon Sep 17 00:00:00 2001 From: chengxi Date: Thu, 26 Mar 2026 00:49:39 -0400 Subject: [PATCH 2/3] chore(go): improve test coverage --- foreign/go/contracts/stats_test.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/foreign/go/contracts/stats_test.go b/foreign/go/contracts/stats_test.go index e9d22dc5fc..28085e7d2d 100644 --- a/foreign/go/contracts/stats_test.go +++ b/foreign/go/contracts/stats_test.go @@ -184,6 +184,35 @@ func TestUnmarshalBinary_noSemver(t *testing.T) { assertStatsEqual(t, got, want) } +func TestUnmarshalBinary_truncatedFixedFields(t *testing.T) { + // Payload too short to contain all fixed fields. + payload := make([]byte, 10) // far too small + var stats Stats + if err := stats.UnmarshalBinary(payload); err == nil { + t.Fatal("expected error for truncated payload, got nil") + } +} + +func TestUnmarshalBinary_truncatedAfterStrings(t *testing.T) { + // Build a valid payload with no semver, then chop off the cache_metrics_count + // so that reading cache count fails. + s := Stats{ + Hostname: "h", + OsName: "o", + OsVersion: "v", + KernelVersion: "k", + IggyServerVersion: "s", + } + payload := buildStatsPayload(t, s) + // The last 4 bytes are cache_metrics_count (0). Remove them. + payload = payload[:len(payload)-4] + + var stats Stats + if err := stats.UnmarshalBinary(payload); err == nil { + t.Fatal("expected error for truncated cache count, got nil") + } +} + func TestUnmarshalBinary_emptyStrings(t *testing.T) { want := sampleStats() want.Hostname = "" From bca89fa4b280253128a36ebb2057f948c57cffb2 Mon Sep 17 00:00:00 2001 From: chengxi Date: Tue, 14 Apr 2026 02:23:29 -0400 Subject: [PATCH 3/3] refactor(go): apply review suggestions. --- foreign/go/contracts/stats.go | 28 ++++-------------- foreign/go/contracts/stats_test.go | 46 +++++++----------------------- 2 files changed, 16 insertions(+), 58 deletions(-) diff --git a/foreign/go/contracts/stats.go b/foreign/go/contracts/stats.go index 06eaf2ce06..df89a4a1a2 100644 --- a/foreign/go/contracts/stats.go +++ b/foreign/go/contracts/stats.go @@ -61,7 +61,7 @@ type Stats struct { OsVersion string `json:"os_version"` KernelVersion string `json:"kernel_version"` IggyServerVersion string `json:"iggy_server_version"` - IggyServerSemver *uint32 `json:"iggy_server_semver,omitempty"` + IggyServerSemver uint32 `json:"iggy_server_semver"` CacheMetrics []CacheMetrics `json:"cache_metrics"` ThreadsCount uint32 `json:"threads_count"` FreeDiskSpace uint64 `json:"free_disk_space"` @@ -115,19 +115,7 @@ func (s *Stats) UnmarshalBinary(payload []byte) error { s.OsVersion = r.U32LenStr() s.KernelVersion = r.U32LenStr() s.IggyServerVersion = r.U32LenStr() - - if r.Err() != nil { - return r.Err() - } - - // iggy_server_semver is optional. If at least 8 bytes remain, the next 4 - // bytes are the semver and the following 4 are cache_metrics_count. - // If only 4 bytes remain, there is no semver and those 4 bytes are cache_metrics_count. - if r.Remaining() >= 8 { - v := r.U32() - s.IggyServerSemver = &v - } - + s.IggyServerSemver = r.U32() cacheCount := int(r.U32()) if r.Err() != nil { return r.Err() @@ -144,15 +132,9 @@ func (s *Stats) UnmarshalBinary(payload []byte) error { } } - if r.Err() == nil && r.Remaining() >= 4 { - s.ThreadsCount = r.U32() - } - if r.Err() == nil && r.Remaining() >= 8 { - s.FreeDiskSpace = r.U64() - } - if r.Err() == nil && r.Remaining() >= 8 { - s.TotalDiskSpace = r.U64() - } + s.ThreadsCount = r.U32() + s.FreeDiskSpace = r.U64() + s.TotalDiskSpace = r.U64() return r.Err() } diff --git a/foreign/go/contracts/stats_test.go b/foreign/go/contracts/stats_test.go index 28085e7d2d..1e50790aa8 100644 --- a/foreign/go/contracts/stats_test.go +++ b/foreign/go/contracts/stats_test.go @@ -29,7 +29,6 @@ import ( // buildStatsPayload constructs a wire payload for Stats using the same // layout as the Rust binary_protocol StatsResponse encoder. -// The semver field controls whether tail fields (threads, disk) are encoded. func buildStatsPayload(t *testing.T, s Stats) []byte { t.Helper() w := codec.NewWriter() @@ -57,20 +56,16 @@ func buildStatsPayload(t *testing.T, s Stats) []byte { w.U32LenStr(s.KernelVersion) w.U32LenStr(s.IggyServerVersion) - if s.IggyServerSemver != nil { - w.U32(*s.IggyServerSemver) - } + w.U32(s.IggyServerSemver) w.U32(uint32(len(s.CacheMetrics))) for i := range s.CacheMetrics { w.Obj(&s.CacheMetrics[i]) } - if s.IggyServerSemver != nil { - w.U32(s.ThreadsCount) - w.U64(s.FreeDiskSpace) - w.U64(s.TotalDiskSpace) - } + w.U32(s.ThreadsCount) + w.U64(s.FreeDiskSpace) + w.U64(s.TotalDiskSpace) if err := w.Err(); err != nil { t.Fatalf("buildStatsPayload: %v", err) @@ -79,7 +74,6 @@ func buildStatsPayload(t *testing.T, s Stats) []byte { } func sampleStats() Stats { - iggyServerSemver := uint32(600) return Stats{ ProcessId: 1234, CpuUsage: 25.5, @@ -104,7 +98,7 @@ func sampleStats() Stats { OsVersion: "6.1", KernelVersion: "6.1.0", IggyServerVersion: "0.6.0", - IggyServerSemver: &iggyServerSemver, + IggyServerSemver: 600, ThreadsCount: 16, FreeDiskSpace: 107_374_182_400, TotalDiskSpace: 512_110_190_592, @@ -140,7 +134,9 @@ func TestUnmarshalBinary_maliciousCacheCount(t *testing.T) { IggyServerVersion: "v", } payload := buildStatsPayload(t, s) - binary.LittleEndian.PutUint32(payload[len(payload)-4:], math.MaxUint32) + // cache_metrics_count sits before the tail fields: threads(4) + free_disk(8) + total_disk(8) = 20 + cacheCountOffset := len(payload) - 20 - 4 + binary.LittleEndian.PutUint32(payload[cacheCountOffset:], math.MaxUint32) var stats Stats err := stats.UnmarshalBinary(payload) @@ -168,22 +164,6 @@ func TestUnmarshalBinary_withCacheMetrics(t *testing.T) { assertStatsEqual(t, got, want) } -func TestUnmarshalBinary_noSemver(t *testing.T) { - want := sampleStats() - want.IggyServerSemver = nil - want.ThreadsCount = 0 - want.FreeDiskSpace = 0 - want.TotalDiskSpace = 0 - payload := buildStatsPayload(t, want) - - var got Stats - if err := got.UnmarshalBinary(payload); err != nil { - t.Fatalf("UnmarshalBinary error: %v", err) - } - - assertStatsEqual(t, got, want) -} - func TestUnmarshalBinary_truncatedFixedFields(t *testing.T) { // Payload too short to contain all fixed fields. payload := make([]byte, 10) // far too small @@ -194,7 +174,7 @@ func TestUnmarshalBinary_truncatedFixedFields(t *testing.T) { } func TestUnmarshalBinary_truncatedAfterStrings(t *testing.T) { - // Build a valid payload with no semver, then chop off the cache_metrics_count + // Build a valid payload then chop off everything after the semver // so that reading cache count fails. s := Stats{ Hostname: "h", @@ -204,8 +184,8 @@ func TestUnmarshalBinary_truncatedAfterStrings(t *testing.T) { IggyServerVersion: "s", } payload := buildStatsPayload(t, s) - // The last 4 bytes are cache_metrics_count (0). Remove them. - payload = payload[:len(payload)-4] + // Remove tail fields + cache_metrics_count (4+4+8+8 = 24 bytes from the end). + payload = payload[:len(payload)-24] var stats Stats if err := stats.UnmarshalBinary(payload); err == nil { @@ -220,10 +200,6 @@ func TestUnmarshalBinary_emptyStrings(t *testing.T) { want.OsVersion = "" want.KernelVersion = "" want.IggyServerVersion = "" - want.IggyServerSemver = nil - want.ThreadsCount = 0 - want.FreeDiskSpace = 0 - want.TotalDiskSpace = 0 payload := buildStatsPayload(t, want) var got Stats