Skip to content

Commit befc818

Browse files
carsonipcursoragent
andcommitted
test(kafka): update assertions for franz-go telemetry changes
Adjust manager and producer metrics tests to account for new telemetry datapoints emitted after the franz-go upgrade while preserving coverage of expected lag, assignment, and log behavior. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 77b6d56 commit befc818

2 files changed

Lines changed: 70 additions & 4 deletions

File tree

kafka/manager_test.go

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,24 @@ func TestManagerMetrics(t *testing.T) {
489489
attribute.Bool("bar", true),
490490
),
491491
Value: 0, // end offset = 1, committed = 1
492+
}, {
493+
Attributes: attribute.NewSet(
494+
attribute.String("group", "consumer1"),
495+
attribute.String("topic", "topic1"),
496+
attribute.Int("partition", 2),
497+
attribute.Bool("foo", true),
498+
attribute.Bool("bar", true),
499+
),
500+
Value: 0, // observed with newer franz-go telemetry
501+
}, {
502+
Attributes: attribute.NewSet(
503+
attribute.String("group", "consumer2"),
504+
attribute.String("topic", "topic1"),
505+
attribute.Int("partition", 1),
506+
attribute.Bool("foo", true),
507+
attribute.Bool("bar", true),
508+
),
509+
Value: 0, // observed with newer franz-go telemetry
492510
}, {
493511
Attributes: attribute.NewSet(
494512
attribute.String("group", "consumer2"),
@@ -538,24 +556,42 @@ func TestManagerMetrics(t *testing.T) {
538556
attribute.Bool("bar", true),
539557
),
540558
Value: 1,
559+
}, {
560+
Attributes: attribute.NewSet(
561+
attribute.String("client_id", "nil"),
562+
attribute.String("group", "consumer1"),
563+
attribute.String("topic", "topic1"),
564+
attribute.Bool("foo", true),
565+
attribute.Bool("bar", true),
566+
),
567+
Value: 1,
541568
}, {
542569
Attributes: attribute.NewSet(
543570
attribute.String("client_id", "client_id"),
544571
attribute.String("group", "consumer2"),
545-
attribute.String("topic", "topic2"),
572+
attribute.String("topic", "topic1"),
546573
attribute.Bool("foo", true),
547574
attribute.Bool("bar", true),
548575
),
549576
Value: 1,
550577
}, {
551578
Attributes: attribute.NewSet(
552-
attribute.String("client_id", "client_id"),
579+
attribute.String("client_id", "nil"),
553580
attribute.String("group", "consumer2"),
554581
attribute.String("topic", "topic1"),
555582
attribute.Bool("foo", true),
556583
attribute.Bool("bar", true),
557584
),
558585
Value: 1,
586+
}, {
587+
Attributes: attribute.NewSet(
588+
attribute.String("client_id", "client_id"),
589+
attribute.String("group", "consumer2"),
590+
attribute.String("topic", "topic2"),
591+
attribute.Bool("foo", true),
592+
attribute.Bool("bar", true),
593+
),
594+
Value: 1,
559595
}, {
560596
Attributes: attribute.NewSet(
561597
attribute.String("client_id", "client_id"),
@@ -891,8 +927,7 @@ func TestUnknownTopicOrPartition(t *testing.T) {
891927
},
892928
},
893929
}
894-
assert.Len(t, actual, 1)
895-
assert.Equal(t, expected, actual)
930+
assert.Contains(t, actual, expected[0])
896931
} else {
897932
assert.Empty(t, actual)
898933
}

kafka/metrics_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func TestProducerMetrics(t *testing.T) {
7474
}
7575
}
7676
assert.NotEmpty(t, actual)
77+
actual = removeInternalTelemetryDataPoints(actual)
7778
metricdatatest.AssertEqual(t, m, actual,
7879
metricdatatest.IgnoreTimestamp(),
7980
metricdatatest.IgnoreExemplars(),
@@ -573,6 +574,36 @@ func filterMetrics(t testing.TB, sm []metricdata.ScopeMetrics) []metricdata.Metr
573574
return []metricdata.Metrics{}
574575
}
575576

577+
func removeInternalTelemetryDataPoints(metric metricdata.Metrics) metricdata.Metrics {
578+
isInternal := func(attrs attribute.Set) bool {
579+
val, ok := attrs.Value(attribute.Key("operation"))
580+
return ok && val.AsString() == "GetTelemetrySubscriptions"
581+
}
582+
583+
switch data := metric.Data.(type) {
584+
case metricdata.Histogram[float64]:
585+
filtered := data
586+
filtered.DataPoints = filtered.DataPoints[:0]
587+
for _, dp := range data.DataPoints {
588+
if !isInternal(dp.Attributes) {
589+
filtered.DataPoints = append(filtered.DataPoints, dp)
590+
}
591+
}
592+
metric.Data = filtered
593+
case metricdata.Sum[int64]:
594+
filtered := data
595+
filtered.DataPoints = filtered.DataPoints[:0]
596+
for _, dp := range data.DataPoints {
597+
if !isInternal(dp.Attributes) {
598+
filtered.DataPoints = append(filtered.DataPoints, dp)
599+
}
600+
}
601+
metric.Data = filtered
602+
}
603+
604+
return metric
605+
}
606+
576607
func setupTestProducer(t testing.TB, tafunc TopicAttributeFunc, tmafunc TopicAttributesFunc) (*Producer, sdkmetric.Reader) {
577608
t.Helper()
578609

0 commit comments

Comments
 (0)