Conversation
Codecov Report✅ All modified and coverable lines are covered by tests.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Updates the Kafka consumer group wrapper to expose additional Sarama/Kafka tuning knobs (TCP keepalive, metadata refresh, and consumer group timeouts) and wires the configured Kafka version into the Sarama config.
Changes:
- Add configurable keepalive, metadata refresh frequency, and consumer-group timing parameters to
Consumervia newWith*options and defaults. - Apply the new parameters to the Sarama
ConfigduringNew(). - Add unit tests covering option input validation and “happy path” construction.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup.go | Adds new consumer config fields/options and applies them to Sarama config (including KafkaVersion wiring). |
| internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup_test.go | Adds tests for new options’ validation and successful construction. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| config.Consumer.Group.Rebalance.Timeout = consumer.RebalanceTimeout | ||
| config.Version = consumer.KafkaVersion | ||
| config.Consumer.Return.Errors = true | ||
|
|
There was a problem hiding this comment.
Consider validating the consumer-group timing configuration (at least HeartbeatInterval < SessionTimeout, and any other Sarama/Kafka constraints) before attempting to create the consumer group. As-is, an invalid combination will fail inside Sarama and trigger the retry/backoff loop even though it can never succeed, delaying feedback and producing misleading "cannot connect" logs.
| if err := config.Validate(); err != nil { | |
| return nil, fmt.Errorf( | |
| "[kafkaconsumergroup.New][config.Validate] error: [%w]", | |
| err, | |
| ) | |
| } |
| config.Version = sarama.V3_9_0_0 | ||
| config.Net.KeepAlive = consumer.KeepAlive | ||
| config.Metadata.RefreshFrequency = consumer.MetadataRefreshFreq | ||
| config.Metadata.Full = true |
There was a problem hiding this comment.
config.Metadata.Full = true forces fetching metadata for all topics/partitions on each refresh. Combined with a 1m refresh frequency, this can create significant broker load in clusters with many topics. Consider making this behavior configurable and/or defaulting to non-full metadata unless a concrete need for full refresh is established.
| config.Metadata.Full = true | |
| config.Metadata.Full = false |
| kafkaconsumergroup.WithKafkaGroupName("github-group"), | ||
| kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), | ||
| kafkaconsumergroup.WithKeepAlive(15*time.Second), | ||
| kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup), | ||
| ) |
There was a problem hiding this comment.
This test only asserts that New succeeds; it doesn’t verify that WithKeepAlive actually wires through to the Sarama config. Consider capturing the *sarama.Config argument passed into CreateConsumerGroup and asserting config.Net.KeepAlive matches the requested value (and that defaults remain unchanged).
| kafkaconsumergroup.WithKafkaGroupName("github-group"), | ||
| kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), | ||
| kafkaconsumergroup.WithMetadataRefreshFreq(2*time.Minute), | ||
| kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup), | ||
| ) |
There was a problem hiding this comment.
This test only asserts that New succeeds; it doesn’t verify that WithMetadataRefreshFreq actually wires through to the Sarama config. Consider asserting on the *sarama.Config argument passed into CreateConsumerGroup (e.g., config.Metadata.RefreshFrequency, and potentially whether config.Metadata.Full is intended to be forced on).
| kafkaconsumergroup.WithKafkaGroupName("github-group"), | ||
| kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), | ||
| kafkaconsumergroup.WithSessionTimeout(20*time.Second), | ||
| kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup), | ||
| ) |
There was a problem hiding this comment.
This test only asserts that New succeeds; it doesn’t verify that WithSessionTimeout actually wires through to the Sarama config. Consider capturing the *sarama.Config passed to CreateConsumerGroup and asserting config.Consumer.Group.Session.Timeout matches the requested value.
| kafkaconsumergroup.WithKafkaGroupName("github-group"), | ||
| kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), | ||
| kafkaconsumergroup.WithHeartbeatInterval(5*time.Second), | ||
| kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup), | ||
| ) |
There was a problem hiding this comment.
This test only asserts that New succeeds; it doesn’t verify that WithHeartbeatInterval actually wires through to the Sarama config. Consider capturing the *sarama.Config passed to CreateConsumerGroup and asserting config.Consumer.Group.Heartbeat.Interval matches the requested value.
| kafkaconsumergroup.WithKafkaGroupName("github-group"), | ||
| kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), | ||
| kafkaconsumergroup.WithRebalanceTimeout(45*time.Second), | ||
| kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup), | ||
| ) |
There was a problem hiding this comment.
This test only asserts that New succeeds; it doesn’t verify that WithRebalanceTimeout actually wires through to the Sarama config. Consider capturing the *sarama.Config passed to CreateConsumerGroup and asserting config.Consumer.Group.Rebalance.Timeout matches the requested value.
No description provided.