Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 102 additions & 6 deletions internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ import (

// defaults.
const (
DefaultDialTimeout = 30 * time.Second
DefaultReadTimeout = 30 * time.Second
DefaultWriteTimeout = 30 * time.Second
DefaultBackoff = 2 * time.Second
DefaultMaxRetries = 10
DefaultDialTimeout = 30 * time.Second
DefaultReadTimeout = 30 * time.Second
DefaultWriteTimeout = 30 * time.Second
DefaultBackoff = 2 * time.Second
DefaultMaxRetries = 10
DefaultKeepAlive = 30 * time.Second
DefaultMetadataRefreshFreq = 1 * time.Minute
DefaultSessionTimeout = 30 * time.Second
DefaultHeartbeatInterval = 10 * time.Second
DefaultRebalanceTimeout = 60 * time.Second
)

var _ sarama.ConsumerGroupHandler = (*Consumer)(nil) // compile time proof
Expand All @@ -42,6 +47,11 @@ type Consumer struct {
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
KeepAlive time.Duration
MetadataRefreshFreq time.Duration
SessionTimeout time.Duration
HeartbeatInterval time.Duration
RebalanceTimeout time.Duration
Backoff time.Duration
MaxRetries uint8
MessageBufferSize int
Expand Down Expand Up @@ -426,6 +436,81 @@ func WithSaramaConsumerGroupHandler(handler sarama.ConsumerGroupHandler) Option
}
}

// WithKeepAlive sets TCP keepalive interval for broker connections.
func WithKeepAlive(d time.Duration) Option {
return func(c *Consumer) error {
if d < 0 {
return fmt.Errorf(
"[kafkaconsumergroup.WithKeepAlive] error: [%w, '%s' received, must >= 0]",
cerrors.ErrInvalid, d,
)
}
c.KeepAlive = d

return nil
}
}

// WithMetadataRefreshFreq sets how often broker metadata (including IPs) is refreshed.
func WithMetadataRefreshFreq(d time.Duration) Option {
return func(c *Consumer) error {
if d <= 0 {
return fmt.Errorf(
"[kafkaconsumergroup.WithMetadataRefreshFreq] error: [%w, '%s' received, must > 0]",
cerrors.ErrInvalid, d,
)
}
c.MetadataRefreshFreq = d

return nil
}
}

// WithSessionTimeout sets consumer group session timeout.
func WithSessionTimeout(d time.Duration) Option {
return func(c *Consumer) error {
if d <= 0 {
return fmt.Errorf(
"[kafkaconsumergroup.WithSessionTimeout] error: [%w, '%s' received, must > 0]",
cerrors.ErrInvalid, d,
)
}
c.SessionTimeout = d

return nil
}
}

// WithHeartbeatInterval sets consumer group heartbeat interval.
func WithHeartbeatInterval(d time.Duration) Option {
return func(c *Consumer) error {
if d <= 0 {
return fmt.Errorf(
"[kafkaconsumergroup.WithHeartbeatInterval] error: [%w, '%s' received, must > 0]",
cerrors.ErrInvalid, d,
)
}
c.HeartbeatInterval = d

return nil
}
}

// WithRebalanceTimeout sets consumer group rebalance timeout.
func WithRebalanceTimeout(d time.Duration) Option {
return func(c *Consumer) error {
if d <= 0 {
return fmt.Errorf(
"[kafkaconsumergroup.WithRebalanceTimeout] error: [%w, '%s' received, must > 0]",
cerrors.ErrInvalid, d,
)
}
c.RebalanceTimeout = d

return nil
}
}

// New instantiates new kafka github consumer group instance.
func New(options ...Option) (*Consumer, error) {
consumer := new(Consumer)
Expand All @@ -437,6 +522,11 @@ func New(options ...Option) (*Consumer, error) {
consumer.DialTimeout = DefaultDialTimeout
consumer.ReadTimeout = DefaultReadTimeout
consumer.WriteTimeout = DefaultWriteTimeout
consumer.KeepAlive = DefaultKeepAlive
consumer.MetadataRefreshFreq = DefaultMetadataRefreshFreq
consumer.SessionTimeout = DefaultSessionTimeout
consumer.HeartbeatInterval = DefaultHeartbeatInterval
consumer.RebalanceTimeout = DefaultRebalanceTimeout
consumer.Backoff = DefaultBackoff
consumer.MaxRetries = DefaultMaxRetries
consumer.NumberOfWorkers = runtime.NumCPU()
Expand All @@ -463,7 +553,13 @@ func New(options ...Option) (*Consumer, error) {
config.Net.DialTimeout = consumer.DialTimeout
config.Net.ReadTimeout = consumer.ReadTimeout
config.Net.WriteTimeout = consumer.WriteTimeout
config.Version = sarama.V3_9_0_0
config.Net.KeepAlive = consumer.KeepAlive
config.Metadata.RefreshFrequency = consumer.MetadataRefreshFreq
config.Metadata.Full = true
Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config.Metadata.Full = true forces fetching metadata for all topics/partitions on each refresh. Combined with a 1m refresh frequency, this can create significant broker load in clusters with many topics. Consider making this behavior configurable and/or defaulting to non-full metadata unless a concrete need for full refresh is established.

Suggested change
config.Metadata.Full = true
config.Metadata.Full = false

Copilot uses AI. Check for mistakes.
config.Consumer.Group.Session.Timeout = consumer.SessionTimeout
config.Consumer.Group.Heartbeat.Interval = consumer.HeartbeatInterval
config.Consumer.Group.Rebalance.Timeout = consumer.RebalanceTimeout
config.Version = consumer.KafkaVersion
config.Consumer.Return.Errors = true

Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating the consumer-group timing configuration (at least HeartbeatInterval < SessionTimeout, and any other Sarama/Kafka constraints) before attempting to create the consumer group. As-is, an invalid combination will fail inside Sarama and trigger the retry/backoff loop even though it can never succeed, delaying feedback and producing misleading "cannot connect" logs.

Suggested change
if err := config.Validate(); err != nil {
return nil, fmt.Errorf(
"[kafkaconsumergroup.New][config.Validate] error: [%w]",
err,
)
}

Copilot uses AI. Check for mistakes.
var saramaConsumerGroup sarama.ConsumerGroup
Expand Down
249 changes: 249 additions & 0 deletions internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,3 +444,252 @@ func TestNew_Consume_Success(t *testing.T) {
_ = process.Signal(os.Interrupt)
wg.Wait()
}

func TestNew_InvalidKeepAlive(t *testing.T) {
logger := mockslogger.New()

consumer, err := kafkaconsumergroup.New(
kafkaconsumergroup.WithLogger(logger),
kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()),
kafkaconsumergroup.WithKafkaGroupName("github-group"),
kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()),
kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers),
kafkaconsumergroup.WithKeepAlive(-1*time.Second),
)

assert.ErrorIs(t, err, cerrors.ErrInvalid)
assert.Nil(t, consumer)
}

func TestNew_ValidKeepAlive(t *testing.T) {
logger := mockslogger.New()

consumerGroup := &mockConsumerGroup{}
consumerGroupFactory := &mockConsumerGroupFactory{}
consumerGroupFactory.On("CreateConsumerGroup", mock.Anything, mock.Anything, mock.Anything).
Return(consumerGroup, nil)

consumer, err := kafkaconsumergroup.New(
kafkaconsumergroup.WithLogger(logger),
kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()),
kafkaconsumergroup.WithKafkaGroupName("github-group"),
kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()),
kafkaconsumergroup.WithKeepAlive(15*time.Second),
kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup),
)
Comment on lines +475 to +479
Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only asserts that New succeeds; it doesn’t verify that WithKeepAlive actually wires through to the Sarama config. Consider capturing the *sarama.Config argument passed into CreateConsumerGroup and asserting config.Net.KeepAlive matches the requested value (and that defaults remain unchanged).

Copilot uses AI. Check for mistakes.

assert.NoError(t, err)
assert.NotNil(t, consumer)
}

func TestNew_InvalidMetadataRefreshFreq(t *testing.T) {
logger := mockslogger.New()

consumer, err := kafkaconsumergroup.New(
kafkaconsumergroup.WithLogger(logger),
kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()),
kafkaconsumergroup.WithKafkaGroupName("github-group"),
kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()),
kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers),
kafkaconsumergroup.WithMetadataRefreshFreq(-1*time.Second),
)

assert.ErrorIs(t, err, cerrors.ErrInvalid)
assert.Nil(t, consumer)
}

func TestNew_ZeroMetadataRefreshFreq(t *testing.T) {
logger := mockslogger.New()

consumer, err := kafkaconsumergroup.New(
kafkaconsumergroup.WithLogger(logger),
kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()),
kafkaconsumergroup.WithKafkaGroupName("github-group"),
kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()),
kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers),
kafkaconsumergroup.WithMetadataRefreshFreq(0),
)

assert.ErrorIs(t, err, cerrors.ErrInvalid)
assert.Nil(t, consumer)
}

func TestNew_InvalidSessionTimeout(t *testing.T) {
logger := mockslogger.New()

consumer, err := kafkaconsumergroup.New(
kafkaconsumergroup.WithLogger(logger),
kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()),
kafkaconsumergroup.WithKafkaGroupName("github-group"),
kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()),
kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers),
kafkaconsumergroup.WithSessionTimeout(-1*time.Second),
)

assert.ErrorIs(t, err, cerrors.ErrInvalid)
assert.Nil(t, consumer)
}

func TestNew_ZeroSessionTimeout(t *testing.T) {
logger := mockslogger.New()

consumer, err := kafkaconsumergroup.New(
kafkaconsumergroup.WithLogger(logger),
kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()),
kafkaconsumergroup.WithKafkaGroupName("github-group"),
kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()),
kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers),
kafkaconsumergroup.WithSessionTimeout(0),
)

assert.ErrorIs(t, err, cerrors.ErrInvalid)
assert.Nil(t, consumer)
}

func TestNew_InvalidHeartbeatInterval(t *testing.T) {
logger := mockslogger.New()

consumer, err := kafkaconsumergroup.New(
kafkaconsumergroup.WithLogger(logger),
kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()),
kafkaconsumergroup.WithKafkaGroupName("github-group"),
kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()),
kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers),
kafkaconsumergroup.WithHeartbeatInterval(-1*time.Second),
)

assert.ErrorIs(t, err, cerrors.ErrInvalid)
assert.Nil(t, consumer)
}

func TestNew_ZeroHeartbeatInterval(t *testing.T) {
logger := mockslogger.New()

consumer, err := kafkaconsumergroup.New(
kafkaconsumergroup.WithLogger(logger),
kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()),
kafkaconsumergroup.WithKafkaGroupName("github-group"),
kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()),
kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers),
kafkaconsumergroup.WithHeartbeatInterval(0),
)

assert.ErrorIs(t, err, cerrors.ErrInvalid)
assert.Nil(t, consumer)
}

func TestNew_InvalidRebalanceTimeout(t *testing.T) {
logger := mockslogger.New()

consumer, err := kafkaconsumergroup.New(
kafkaconsumergroup.WithLogger(logger),
kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()),
kafkaconsumergroup.WithKafkaGroupName("github-group"),
kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()),
kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers),
kafkaconsumergroup.WithRebalanceTimeout(-1*time.Second),
)

assert.ErrorIs(t, err, cerrors.ErrInvalid)
assert.Nil(t, consumer)
}

func TestNew_ValidMetadataRefreshFreq(t *testing.T) {
logger := mockslogger.New()

consumerGroup := &mockConsumerGroup{}
consumerGroupFactory := &mockConsumerGroupFactory{}
consumerGroupFactory.On("CreateConsumerGroup", mock.Anything, mock.Anything, mock.Anything).
Return(consumerGroup, nil)

consumer, err := kafkaconsumergroup.New(
kafkaconsumergroup.WithLogger(logger),
kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()),
kafkaconsumergroup.WithKafkaGroupName("github-group"),
kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()),
kafkaconsumergroup.WithMetadataRefreshFreq(2*time.Minute),
kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup),
)
Comment on lines +608 to +612
Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only asserts that New succeeds; it doesn’t verify that WithMetadataRefreshFreq actually wires through to the Sarama config. Consider asserting on the *sarama.Config argument passed into CreateConsumerGroup (e.g., config.Metadata.RefreshFrequency, and potentially whether config.Metadata.Full is intended to be forced on).

Copilot uses AI. Check for mistakes.

assert.NoError(t, err)
assert.NotNil(t, consumer)
}

func TestNew_ValidSessionTimeout(t *testing.T) {
logger := mockslogger.New()

consumerGroup := &mockConsumerGroup{}
consumerGroupFactory := &mockConsumerGroupFactory{}
consumerGroupFactory.On("CreateConsumerGroup", mock.Anything, mock.Anything, mock.Anything).
Return(consumerGroup, nil)

consumer, err := kafkaconsumergroup.New(
kafkaconsumergroup.WithLogger(logger),
kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()),
kafkaconsumergroup.WithKafkaGroupName("github-group"),
kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()),
kafkaconsumergroup.WithSessionTimeout(20*time.Second),
kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup),
)
Comment on lines +629 to +633
Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only asserts that New succeeds; it doesn’t verify that WithSessionTimeout actually wires through to the Sarama config. Consider capturing the *sarama.Config passed to CreateConsumerGroup and asserting config.Consumer.Group.Session.Timeout matches the requested value.

Copilot uses AI. Check for mistakes.

assert.NoError(t, err)
assert.NotNil(t, consumer)
}

func TestNew_ValidHeartbeatInterval(t *testing.T) {
logger := mockslogger.New()

consumerGroup := &mockConsumerGroup{}
consumerGroupFactory := &mockConsumerGroupFactory{}
consumerGroupFactory.On("CreateConsumerGroup", mock.Anything, mock.Anything, mock.Anything).
Return(consumerGroup, nil)

consumer, err := kafkaconsumergroup.New(
kafkaconsumergroup.WithLogger(logger),
kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()),
kafkaconsumergroup.WithKafkaGroupName("github-group"),
kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()),
kafkaconsumergroup.WithHeartbeatInterval(5*time.Second),
kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup),
)
Comment on lines +650 to +654
Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only asserts that New succeeds; it doesn’t verify that WithHeartbeatInterval actually wires through to the Sarama config. Consider capturing the *sarama.Config passed to CreateConsumerGroup and asserting config.Consumer.Group.Heartbeat.Interval matches the requested value.

Copilot uses AI. Check for mistakes.

assert.NoError(t, err)
assert.NotNil(t, consumer)
}

func TestNew_ValidRebalanceTimeout(t *testing.T) {
logger := mockslogger.New()

consumerGroup := &mockConsumerGroup{}
consumerGroupFactory := &mockConsumerGroupFactory{}
consumerGroupFactory.On("CreateConsumerGroup", mock.Anything, mock.Anything, mock.Anything).
Return(consumerGroup, nil)

consumer, err := kafkaconsumergroup.New(
kafkaconsumergroup.WithLogger(logger),
kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()),
kafkaconsumergroup.WithKafkaGroupName("github-group"),
kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()),
kafkaconsumergroup.WithRebalanceTimeout(45*time.Second),
kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup),
)
Comment on lines +671 to +675
Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only asserts that New succeeds; it doesn’t verify that WithRebalanceTimeout actually wires through to the Sarama config. Consider capturing the *sarama.Config passed to CreateConsumerGroup and asserting config.Consumer.Group.Rebalance.Timeout matches the requested value.

Copilot uses AI. Check for mistakes.

assert.NoError(t, err)
assert.NotNil(t, consumer)
}

func TestNew_ZeroRebalanceTimeout(t *testing.T) {
logger := mockslogger.New()

consumer, err := kafkaconsumergroup.New(
kafkaconsumergroup.WithLogger(logger),
kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()),
kafkaconsumergroup.WithKafkaGroupName("github-group"),
kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()),
kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers),
kafkaconsumergroup.WithRebalanceTimeout(0),
)

assert.ErrorIs(t, err, cerrors.ErrInvalid)
assert.Nil(t, consumer)
}