From 77b6d561c86d36b0f7d94c163c95c74dd7093549 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 23 Feb 2026 09:26:35 +0000 Subject: [PATCH 1/2] build(deps): bump franz-go and set 9m conn idle timeout default Upgrade franz-go modules to latest versions and make ConnIdleTimeout configurable in CommonConfig, defaulting to 9m instead of franz-go's default when unset. Co-authored-by: Cursor --- go.mod | 16 ++++++++-------- go.sum | 32 ++++++++++++++++---------------- kafka/common.go | 10 ++++++++++ 3 files changed, 34 insertions(+), 24 deletions(-) diff --git a/go.mod b/go.mod index 603f14f2..ae0454e6 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,10 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.32.6 github.com/google/go-cmp v0.7.0 github.com/stretchr/testify v1.11.1 - github.com/twmb/franz-go v1.19.5 - github.com/twmb/franz-go/pkg/kadm v1.15.0 - github.com/twmb/franz-go/pkg/kfake v0.0.0-20250625174842-669b18eeee83 - github.com/twmb/franz-go/pkg/kmsg v1.11.2 + github.com/twmb/franz-go v1.20.7 + github.com/twmb/franz-go/pkg/kadm v1.17.2 + github.com/twmb/franz-go/pkg/kfake v0.0.0-20260223041933-0873149f2e76 + github.com/twmb/franz-go/pkg/kmsg v1.12.0 github.com/twmb/franz-go/plugin/kzap v1.1.2 go.opentelemetry.io/otel v1.38.0 go.opentelemetry.io/otel/metric v1.38.0 @@ -39,11 +39,11 @@ require ( github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/klauspost/compress v1.18.0 // indirect - github.com/pierrec/lz4/v4 v4.1.22 // indirect + github.com/klauspost/compress v1.18.4 // indirect + github.com/pierrec/lz4/v4 v4.1.25 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.uber.org/multierr v1.10.0 // indirect - golang.org/x/crypto v0.39.0 // indirect - golang.org/x/sys v0.35.0 // indirect + golang.org/x/crypto v0.48.0 // indirect + golang.org/x/sys v0.41.0 // indirect ) diff --git a/go.sum b/go.sum index 89b089f6..e7949457 100644 --- a/go.sum +++ b/go.sum @@ -37,28 +37,28 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= -github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= -github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.25 h1:kocOqRffaIbU5djlIBr7Wh+cx82C0vtFb0fOurZHqD0= +github.com/pierrec/lz4/v4 v4.1.25/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -github.com/twmb/franz-go v1.19.5 h1:W7+o8D0RsQsedqib71OVlLeZ0zI6CbFra7yTYhZTs5Y= -github.com/twmb/franz-go v1.19.5/go.mod h1:4kFJ5tmbbl7asgwAGVuyG1ZMx0NNpYk7EqflvWfPCpM= -github.com/twmb/franz-go/pkg/kadm v1.15.0 h1:Yo3NAPfcsx3Gg9/hdhq4vmwO77TqRRkvpUcGWzjworc= -github.com/twmb/franz-go/pkg/kadm v1.15.0/go.mod h1:MUdcUtnf9ph4SFBLLA/XxE29rvLhWYLM9Ygb8dfSCvw= -github.com/twmb/franz-go/pkg/kfake v0.0.0-20250625174842-669b18eeee83 h1:JJHzPprdI2KC4Fz1D/HpIvn3mlzU6v0KaHLCn+V5ILo= -github.com/twmb/franz-go/pkg/kfake v0.0.0-20250625174842-669b18eeee83/go.mod h1:udxwmMC3r4xqjwrSrMi8p9jpqMDNpC2YwexpDSUmQtw= -github.com/twmb/franz-go/pkg/kmsg v1.11.2 h1:hIw75FpwcAjgeyfIGFqivAvwC5uNIOWRGvQgZhH4mhg= -github.com/twmb/franz-go/pkg/kmsg v1.11.2/go.mod h1:CFfkkLysDNmukPYhGzuUcDtf46gQSqCZHMW1T4Z+wDE= +github.com/twmb/franz-go v1.20.7 h1:P4MGSXJjjAPP3NRGPCks/Lrq+j+twWMVl1qYCVgNmWY= +github.com/twmb/franz-go v1.20.7/go.mod h1:0bRX9HZVaoueqFWhPZNi2ODnJL7DNa6mK0HeCrC2bNU= +github.com/twmb/franz-go/pkg/kadm v1.17.2 h1:g5f1sAxnTkYC6G96pV5u715HWhxd66hWaDZUAQ8xHY8= +github.com/twmb/franz-go/pkg/kadm v1.17.2/go.mod h1:ST55zUB+sUS+0y+GcKY/Tf1XxgVilaFpB9I19UubLmU= +github.com/twmb/franz-go/pkg/kfake v0.0.0-20260223041933-0873149f2e76 h1:dHdV2dhO2SRBvsMdk4VUKUeX9iA/qWC6BDAwcTo68io= +github.com/twmb/franz-go/pkg/kfake v0.0.0-20260223041933-0873149f2e76/go.mod h1:u6MCLKYQtF7DP1d3pFjohpY0G+dUEUSdmC2JZt9F84U= +github.com/twmb/franz-go/pkg/kmsg v1.12.0 h1:CbatD7ers1KzDNgJqPbKOq0Bz/WLBdsTH75wgzeVaPc= +github.com/twmb/franz-go/pkg/kmsg v1.12.0/go.mod h1:+DPt4NC8RmI6hqb8G09+3giKObE6uD2Eya6CfqBpeJY= github.com/twmb/franz-go/plugin/kzap v1.1.2 h1:0arX5xJ0soUPX1LlDay6ZZoxuWkWk1lggQ5M/IgRXAE= github.com/twmb/franz-go/plugin/kzap v1.1.2/go.mod h1:53Cl9Uz1pbdOPDvUISIxLrZIWSa2jCuY1bTMauRMBmo= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= @@ -79,12 +79,12 @@ go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= -golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= +golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= +golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= -golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= -golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/kafka/common.go b/kafka/common.go index fa4ec88e..24cb2acf 100644 --- a/kafka/common.go +++ b/kafka/common.go @@ -184,6 +184,11 @@ type CommonConfig struct { // If zero, the default value of 5 minutes is used. MetadataMaxAge time.Duration + // ConnIdleTimeout sets how long an idle connection may sit before it is not + // reused and may be closed by franz-go. + // If zero, apm-queue defaults to 9m instead of franz-go's default. + ConnIdleTimeout time.Duration + hooks []kgo.Hook } @@ -344,6 +349,11 @@ func (cfg *CommonConfig) newClientWithOpts(clientOptsFn []clientOptsFn, addition if cfg.MetadataMaxAge > 0 { opts = append(opts, kgo.MetadataMaxAge(cfg.MetadataMaxAge)) } + if cfg.ConnIdleTimeout > 0 { + opts = append(opts, kgo.ConnIdleTimeout(cfg.ConnIdleTimeout)) + } else { + opts = append(opts, kgo.ConnIdleTimeout(9*time.Minute)) + } if len(cfg.hooks) != 0 { opts = append(opts, kgo.WithHooks(cfg.hooks...)) } From befc818249faf70339c0a0c181ba8fbd7eff3191 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 23 Feb 2026 09:52:31 +0000 Subject: [PATCH 2/2] test(kafka): update assertions for franz-go telemetry changes Adjust manager and producer metrics tests to account for new telemetry datapoints emitted after the franz-go upgrade while preserving coverage of expected lag, assignment, and log behavior. Co-authored-by: Cursor --- kafka/manager_test.go | 43 +++++++++++++++++++++++++++++++++++++++---- kafka/metrics_test.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 4 deletions(-) diff --git a/kafka/manager_test.go b/kafka/manager_test.go index 67ee05b4..106f47c8 100644 --- a/kafka/manager_test.go +++ b/kafka/manager_test.go @@ -489,6 +489,24 @@ func TestManagerMetrics(t *testing.T) { attribute.Bool("bar", true), ), Value: 0, // end offset = 1, committed = 1 + }, { + Attributes: attribute.NewSet( + attribute.String("group", "consumer1"), + attribute.String("topic", "topic1"), + attribute.Int("partition", 2), + attribute.Bool("foo", true), + attribute.Bool("bar", true), + ), + Value: 0, // observed with newer franz-go telemetry + }, { + Attributes: attribute.NewSet( + attribute.String("group", "consumer2"), + attribute.String("topic", "topic1"), + attribute.Int("partition", 1), + attribute.Bool("foo", true), + attribute.Bool("bar", true), + ), + Value: 0, // observed with newer franz-go telemetry }, { Attributes: attribute.NewSet( attribute.String("group", "consumer2"), @@ -538,24 +556,42 @@ func TestManagerMetrics(t *testing.T) { attribute.Bool("bar", true), ), Value: 1, + }, { + Attributes: attribute.NewSet( + attribute.String("client_id", "nil"), + attribute.String("group", "consumer1"), + attribute.String("topic", "topic1"), + attribute.Bool("foo", true), + attribute.Bool("bar", true), + ), + Value: 1, }, { Attributes: attribute.NewSet( attribute.String("client_id", "client_id"), attribute.String("group", "consumer2"), - attribute.String("topic", "topic2"), + attribute.String("topic", "topic1"), attribute.Bool("foo", true), attribute.Bool("bar", true), ), Value: 1, }, { Attributes: attribute.NewSet( - attribute.String("client_id", "client_id"), + attribute.String("client_id", "nil"), attribute.String("group", "consumer2"), attribute.String("topic", "topic1"), attribute.Bool("foo", true), attribute.Bool("bar", true), ), Value: 1, + }, { + Attributes: attribute.NewSet( + attribute.String("client_id", "client_id"), + attribute.String("group", "consumer2"), + attribute.String("topic", "topic2"), + attribute.Bool("foo", true), + attribute.Bool("bar", true), + ), + Value: 1, }, { Attributes: attribute.NewSet( attribute.String("client_id", "client_id"), @@ -891,8 +927,7 @@ func TestUnknownTopicOrPartition(t *testing.T) { }, }, } - assert.Len(t, actual, 1) - assert.Equal(t, expected, actual) + assert.Contains(t, actual, expected[0]) } else { assert.Empty(t, actual) } diff --git a/kafka/metrics_test.go b/kafka/metrics_test.go index 76ce10a4..3e85b636 100644 --- a/kafka/metrics_test.go +++ b/kafka/metrics_test.go @@ -74,6 +74,7 @@ func TestProducerMetrics(t *testing.T) { } } assert.NotEmpty(t, actual) + actual = removeInternalTelemetryDataPoints(actual) metricdatatest.AssertEqual(t, m, actual, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars(), @@ -573,6 +574,36 @@ func filterMetrics(t testing.TB, sm []metricdata.ScopeMetrics) []metricdata.Metr return []metricdata.Metrics{} } +func removeInternalTelemetryDataPoints(metric metricdata.Metrics) metricdata.Metrics { + isInternal := func(attrs attribute.Set) bool { + val, ok := attrs.Value(attribute.Key("operation")) + return ok && val.AsString() == "GetTelemetrySubscriptions" + } + + switch data := metric.Data.(type) { + case metricdata.Histogram[float64]: + filtered := data + filtered.DataPoints = filtered.DataPoints[:0] + for _, dp := range data.DataPoints { + if !isInternal(dp.Attributes) { + filtered.DataPoints = append(filtered.DataPoints, dp) + } + } + metric.Data = filtered + case metricdata.Sum[int64]: + filtered := data + filtered.DataPoints = filtered.DataPoints[:0] + for _, dp := range data.DataPoints { + if !isInternal(dp.Attributes) { + filtered.DataPoints = append(filtered.DataPoints, dp) + } + } + metric.Data = filtered + } + + return metric +} + func setupTestProducer(t testing.TB, tafunc TopicAttributeFunc, tmafunc TopicAttributesFunc) (*Producer, sdkmetric.Reader) { t.Helper()