Skip to content

Commit ceb5909

Browse files
authored
Add new option for multiple attributes in topic attribute functions (metrics and logs) (#696)
This would be a breaking change. Relates to elastic/hosted-otel-controller#173 (comment).
1 parent 61f3ab3 commit ceb5909

12 files changed

Lines changed: 353 additions & 95 deletions

kafka/common.go

Lines changed: 86 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,12 @@ import (
4545
type SASLMechanism = sasl.Mechanism
4646

4747
// TopicLogFieldFunc is a function that returns a zap.Field for a given topic.
48+
// Deprecated: use TopicLogFieldsFunc instead.
4849
type TopicLogFieldFunc func(topic string) zap.Field
4950

51+
// TopicLogFieldsFunc is a function that returns a list of zap.Field for a given topic.
52+
type TopicLogFieldsFunc func(topic string) []zap.Field
53+
5054
// CommonConfig defines common configuration for Kafka consumers, producers,
5155
// and managers.
5256
type CommonConfig struct {
@@ -150,16 +154,28 @@ type CommonConfig struct {
150154
// Defaults to the global one.
151155
MeterProvider metric.MeterProvider
152156

153-
// TopicAttributeFunc can be used to create custom dimensions from a Kafka
157+
// TopicAttributeFunc can be used to create one custom dimension from a Kafka
154158
// topic for these metrics:
155159
// - producer.messages.count
156160
// - consumer.messages.fetched
161+
// Deprecated: Use TopicAttributesFunc instead.
157162
TopicAttributeFunc TopicAttributeFunc
158163

159-
// TopicAttributeFunc can be used to create custom dimensions from a Kafka
160-
// topic for log messages
164+
// TopicAttributesFunc can be used to create multiple custom dimensions from a Kafka
165+
// topic for these metrics:
166+
// - producer.messages.count
167+
// - consumer.messages.fetched
168+
TopicAttributesFunc TopicAttributesFunc
169+
170+
// TopicAttributeFunc can be used to create one custom dimension from a Kafka
171+
// topic for log messages.
172+
// Deprecated: use TopicLogFieldsFunc instead.
161173
TopicLogFieldFunc TopicLogFieldFunc
162174

175+
// TopicLogFieldsFunc can be used to create custom dimensions from a Kafka
176+
// // topic for log messages.
177+
TopicLogFieldsFunc TopicLogFieldsFunc
178+
163179
// MetadataMaxAge is the maximum age of metadata before it is refreshed.
164180
// The lower the value the more frequently new topics will be discovered.
165181
// If zero, the default value of 5 minutes is used.
@@ -198,23 +214,14 @@ func (cfg *CommonConfig) finalize() error {
198214
errs = append(errs, err)
199215
}
200216

201-
// Wrap the cfg.TopicLogFieldFunc to ensure it never returns a field with
202-
// an unknown type (like `zap.Field{}`).
203-
if cfg.TopicLogFieldFunc != nil {
204-
cfg.TopicLogFieldFunc = topicFieldFunc(cfg.TopicLogFieldFunc)
205-
}
206-
if cfg.TopicAttributeFunc == nil {
207-
cfg.TopicAttributeFunc = func(topic string) attribute.KeyValue {
208-
return attribute.KeyValue{}
209-
}
210-
}
217+
cfg.TopicLogFieldsFunc = mergeTopicLogFieldsFunctions(cfg.TopicLogFieldFunc, cfg.TopicLogFieldsFunc)
218+
cfg.TopicAttributesFunc = mergeTopicAttributeFunctions(cfg.TopicAttributeFunc, cfg.TopicAttributesFunc)
211219

212220
return errors.Join(errs...)
213221
}
214222

215223
// Merge the configs generated from env vars and/or files.
216224
func (cfg *CommonConfig) flatten(envCfg *envConfig, fileCfg *fileConfig) error {
217-
218225
// Config file was set with env vars.
219226
if cfg.ConfigFile == "" {
220227
cfg.ConfigFile = envCfg.configFile
@@ -281,7 +288,24 @@ func (cfg *CommonConfig) meterProvider() metric.MeterProvider {
281288
return otel.GetMeterProvider()
282289
}
283290

284-
func (cfg *CommonConfig) newClient(topicAttributeFunc TopicAttributeFunc, additionalOpts ...kgo.Opt) (*kgo.Client, error) {
291+
type clientOptsFn func(opts *clientOpts)
292+
293+
type clientOpts struct {
294+
topicAttributesFunc TopicAttributesFunc
295+
}
296+
297+
func withTopicMultipleAttributeFunc(topicAttributesFunc TopicAttributesFunc) func(clOpts *clientOpts) {
298+
return func(clOpts *clientOpts) {
299+
clOpts.topicAttributesFunc = topicAttributesFunc
300+
}
301+
}
302+
303+
func (cfg *CommonConfig) newClientWithOpts(clientOptsFn []clientOptsFn, additionalOpts ...kgo.Opt) (*kgo.Client, error) {
304+
clOpts := &clientOpts{}
305+
for _, opt := range clientOptsFn {
306+
opt(clOpts)
307+
}
308+
285309
opts := []kgo.Opt{
286310
kgo.WithLogger(kzap.New(cfg.Logger.Named("kafka"))),
287311
kgo.SeedBrokers(cfg.Brokers...),
@@ -304,8 +328,8 @@ func (cfg *CommonConfig) newClient(topicAttributeFunc TopicAttributeFunc, additi
304328
}
305329
opts = append(opts, additionalOpts...)
306330
if !cfg.DisableTelemetry {
307-
metricHooks, err := newKgoHooks(cfg.meterProvider(),
308-
cfg.Namespace, cfg.namespacePrefix(), topicAttributeFunc,
331+
metricHooks, err := newKgoHooks(
332+
cfg.meterProvider(), cfg.Namespace, cfg.namespacePrefix(), clOpts.topicAttributesFunc,
309333
)
310334
if err != nil {
311335
return nil, fmt.Errorf("kafka: failed creating kgo metrics hooks: %w", err)
@@ -359,6 +383,51 @@ func topicFieldFunc(f TopicLogFieldFunc) TopicLogFieldFunc {
359383
}
360384
}
361385

386+
// mergeTopicLogFieldsFunctions merges the fields from TopicLogFieldFunc
387+
// and TopicLogFieldsFunc, and returns a new TopicLogFieldsFunc with all
388+
// log fields.
389+
func mergeTopicLogFieldsFunctions(single TopicLogFieldFunc, multiple TopicLogFieldsFunc) TopicLogFieldsFunc {
390+
if single == nil {
391+
return multiple
392+
}
393+
if multiple == nil {
394+
return func(topic string) []zap.Field {
395+
fn := topicFieldFunc(single)
396+
return []zap.Field{fn(topic)}
397+
}
398+
}
399+
return func(topic string) []zap.Field {
400+
fields := multiple(topic)
401+
for i := range fields {
402+
if fields[i].Type <= zapcore.UnknownType {
403+
fields[i] = zap.Skip()
404+
}
405+
}
406+
return append(fields, single(topic))
407+
}
408+
}
409+
410+
// mergeTopicAttributeFunctions merges the attributes from TopicAttributeFunc
411+
// and TopicAttributesFunc, and returns a new TopicAttributesFunc with all
412+
// attributes.
413+
func mergeTopicAttributeFunctions(single TopicAttributeFunc, multiple TopicAttributesFunc) TopicAttributesFunc {
414+
if single == nil {
415+
return multiple
416+
}
417+
if multiple == nil {
418+
return func(topic string) []attribute.KeyValue {
419+
v := single(topic)
420+
if v == (attribute.KeyValue{}) {
421+
return nil
422+
}
423+
return []attribute.KeyValue{v}
424+
}
425+
}
426+
return func(topic string) []attribute.KeyValue {
427+
return append(multiple(topic), single(topic))
428+
}
429+
}
430+
362431
// newCertReloadingDialer returns a dialer that reloads the CA cert when the
363432
// file mod time changes.
364433
func newCertReloadingDialer(caPath, certPath, keyPath string,

kafka/common_test.go

Lines changed: 102 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"github.com/twmb/franz-go/pkg/kfake"
4646
"github.com/twmb/franz-go/pkg/kgo"
4747
"github.com/twmb/franz-go/pkg/sasl"
48+
"go.opentelemetry.io/otel/attribute"
4849
"go.uber.org/zap"
4950
)
5051

@@ -59,8 +60,6 @@ func TestCommonConfig(t *testing.T) {
5960
t.Helper()
6061
err := in.finalize()
6162
require.NoError(t, err)
62-
in.TopicAttributeFunc = nil
63-
in.TopicLogFieldFunc = nil
6463
in.hooks = nil
6564
assert.Equal(t, expected, in)
6665
}
@@ -340,7 +339,7 @@ func TestCommonConfigFileHook(t *testing.T) {
340339
require.NoError(t, cfg.finalize())
341340
assert.Equal(t, []string{"testing.invalid"}, cfg.Brokers)
342341

343-
client, err := cfg.newClient(nil)
342+
client, err := cfg.newClientWithOpts(nil)
344343
require.NoError(t, err)
345344
defer client.Close()
346345

@@ -774,6 +773,106 @@ func TestCertificateHotReloadErrors(t *testing.T) {
774773
})
775774
}
776775

776+
func TestMergeTopicFunctions(t *testing.T) {
777+
t.Run("log fields", func(t *testing.T) {
778+
tests := []struct {
779+
name string
780+
single TopicLogFieldFunc
781+
multiple TopicLogFieldsFunc
782+
want []zap.Field
783+
}{
784+
{
785+
name: "both nil functions",
786+
},
787+
{
788+
name: "nil single log field",
789+
multiple: func(_ string) []zap.Field {
790+
return []zap.Field{zap.String("test-1", "test-1"), zap.String("test-2", "test-2")}
791+
},
792+
want: []zap.Field{zap.String("test-1", "test-1"), zap.String("test-2", "test-2")},
793+
},
794+
{
795+
name: "nil multiple log field",
796+
single: func(_ string) zap.Field {
797+
return zap.String("test-1", "test-1")
798+
},
799+
want: []zap.Field{zap.String("test-1", "test-1")},
800+
},
801+
{
802+
name: "both functions exist",
803+
multiple: func(_ string) []zap.Field {
804+
return []zap.Field{zap.String("test-1", "test-1"), zap.String("test-2", "test-2")}
805+
},
806+
single: func(_ string) zap.Field {
807+
return zap.String("test-3", "test-3")
808+
},
809+
want: []zap.Field{zap.String("test-1", "test-1"), zap.String("test-2", "test-2"), zap.String("test-3", "test-3")},
810+
},
811+
}
812+
813+
for _, tt := range tests {
814+
t.Run(tt.name, func(t *testing.T) {
815+
fn := mergeTopicLogFieldsFunctions(tt.single, tt.multiple)
816+
if tt.want == nil {
817+
require.Nil(t, fn)
818+
return
819+
}
820+
fields := fn("test")
821+
require.Equal(t, tt.want, fields)
822+
})
823+
}
824+
})
825+
t.Run("attributes", func(t *testing.T) {
826+
tests := []struct {
827+
name string
828+
single TopicAttributeFunc
829+
multiple TopicAttributesFunc
830+
want []attribute.KeyValue
831+
}{
832+
{
833+
name: "both nil functions",
834+
},
835+
{
836+
name: "nil single attribute",
837+
multiple: func(_ string) []attribute.KeyValue {
838+
return []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2")}
839+
},
840+
want: []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2")},
841+
},
842+
{
843+
name: "nil multiple log field",
844+
single: func(_ string) attribute.KeyValue {
845+
return attribute.String("test-1", "test-1")
846+
},
847+
want: []attribute.KeyValue{attribute.String("test-1", "test-1")},
848+
},
849+
{
850+
name: "both functions exist",
851+
multiple: func(_ string) []attribute.KeyValue {
852+
return []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2")}
853+
},
854+
single: func(_ string) attribute.KeyValue {
855+
return attribute.String("test-3", "test-3")
856+
},
857+
want: []attribute.KeyValue{attribute.String("test-1", "test-1"), attribute.String("test-2", "test-2"), attribute.String("test-3", "test-3")},
858+
},
859+
}
860+
861+
for _, tt := range tests {
862+
t.Run(tt.name, func(t *testing.T) {
863+
fn := mergeTopicAttributeFunctions(tt.single, tt.multiple)
864+
if tt.want == nil {
865+
require.Nil(t, fn)
866+
return
867+
}
868+
fields := fn("test")
869+
require.Equal(t, tt.want, fields)
870+
})
871+
}
872+
873+
})
874+
}
875+
777876
// Helper functions for certificate generation
778877
func generateCA(t testing.TB) (*rsa.PrivateKey, *x509.Certificate, []byte) {
779878
caKey, err := rsa.GenerateKey(rand.Reader, 2048)

kafka/consumer.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) {
212212
namespacePrefix := cfg.namespacePrefix()
213213
consumer := &consumer{
214214
topicPrefix: namespacePrefix,
215-
logFieldFn: cfg.TopicLogFieldFunc,
215+
logFieldsFn: cfg.TopicLogFieldsFunc,
216216
assignments: make(map[topicPartition]*pc),
217217
processor: cfg.Processor,
218218
logger: cfg.Logger.Named("partition"),
@@ -230,7 +230,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) {
230230
kgo.ConsumeTopics(topics...),
231231
// If a rebalance happens while the client is polling, the consumed
232232
// records may belong to a partition which has been reassigned to a
233-
// different consumer int he group. To avoid this scenario, Polls will
233+
// different consumer in the group. To avoid this scenario, Polls will
234234
// block rebalances of partitions which would be lost, and the consumer
235235
// MUST manually call `AllowRebalance`.
236236
kgo.BlockRebalanceOnPoll(),
@@ -270,7 +270,10 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) {
270270
opts = append(opts, kgo.BrokerMaxReadBytes(cfg.BrokerMaxReadBytes))
271271
}
272272

273-
client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...)
273+
client, err := cfg.newClientWithOpts(
274+
[]clientOptsFn{withTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)},
275+
opts...,
276+
)
274277
if err != nil {
275278
return nil, fmt.Errorf("kafka: failed creating kafka consumer: %w", err)
276279
}
@@ -396,8 +399,10 @@ func (c *Consumer) fetch(ctx context.Context) error {
396399
// franz-go can inject fake fetches in case of errors.
397400
// the fake fetch can have an empty topic so we need to
398401
// account for that
399-
if c.cfg.TopicLogFieldFunc != nil && topicName != "" {
400-
logger = logger.With(c.cfg.TopicLogFieldFunc(topicName))
402+
if topicName != "" {
403+
if c.cfg.TopicLogFieldsFunc != nil {
404+
logger = logger.With(c.cfg.TopicLogFieldsFunc(topicName)...)
405+
}
401406
}
402407

403408
logger.Error(
@@ -429,7 +434,8 @@ type consumer struct {
429434
processor apmqueue.Processor
430435
logger *zap.Logger
431436
delivery apmqueue.DeliveryType
432-
logFieldFn TopicLogFieldFunc
437+
logFieldsFn TopicLogFieldsFunc
438+
433439
// ctx contains the graceful cancellation context that is passed to the
434440
// partition consumers.
435441
ctx context.Context
@@ -452,8 +458,8 @@ func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[
452458
zap.String("topic", t),
453459
zap.Int32("partition", partition),
454460
)
455-
if c.logFieldFn != nil {
456-
logger = logger.With(c.logFieldFn(t))
461+
if c.logFieldsFn != nil {
462+
logger = logger.With(c.logFieldsFn(t)...)
457463
}
458464

459465
pc := newPartitionConsumer(c.ctx, client, c.processor,
@@ -535,8 +541,8 @@ func (c *consumer) processFetch(fetches kgo.Fetches) {
535541
if c.delivery == apmqueue.AtMostOnceDeliveryType {
536542
topicName := strings.TrimPrefix(ftp.Topic, c.topicPrefix)
537543
logger := c.logger
538-
if c.logFieldFn != nil {
539-
logger = logger.With(c.logFieldFn(topicName))
544+
if c.logFieldsFn != nil {
545+
logger = logger.With(c.logFieldsFn(topicName)...)
540546
}
541547
logger.Warn(
542548
"data loss: failed to send records to process after commit",

kafka/consumer_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -907,8 +907,6 @@ func assertNotNilOptions(t testing.TB, cfg *ConsumerConfig) {
907907
cfg.Processor = nil
908908
assert.NotNil(t, cfg.Logger)
909909
cfg.Logger = nil
910-
assert.NotNil(t, cfg.TopicAttributeFunc)
911-
cfg.TopicAttributeFunc = nil
912910
}
913911

914912
func newConsumer(t testing.TB, cfg ConsumerConfig) *Consumer {

kafka/log_compacted_consumer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,10 @@ func NewLogCompactedConsumer(cfg LogCompactedConfig,
108108
opts = append(opts, kgo.FetchMinBytes(cfg.MinFetchSize))
109109
}
110110

111-
client, err := cfg.newClient(cfg.TopicAttributeFunc, opts...)
111+
client, err := cfg.newClientWithOpts(
112+
[]clientOptsFn{withTopicMultipleAttributeFunc(cfg.TopicAttributesFunc)},
113+
opts...,
114+
)
112115
if err != nil {
113116
return nil, err
114117
}

0 commit comments

Comments
 (0)