diff --git a/internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup.go b/internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup.go index 54afa5d..f6fa4a1 100644 --- a/internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup.go +++ b/internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup.go @@ -18,11 +18,16 @@ import ( // defaults. const ( - DefaultDialTimeout = 30 * time.Second - DefaultReadTimeout = 30 * time.Second - DefaultWriteTimeout = 30 * time.Second - DefaultBackoff = 2 * time.Second - DefaultMaxRetries = 10 + DefaultDialTimeout = 30 * time.Second + DefaultReadTimeout = 30 * time.Second + DefaultWriteTimeout = 30 * time.Second + DefaultBackoff = 2 * time.Second + DefaultMaxRetries = 10 + DefaultKeepAlive = 30 * time.Second + DefaultMetadataRefreshFreq = 1 * time.Minute + DefaultSessionTimeout = 30 * time.Second + DefaultHeartbeatInterval = 10 * time.Second + DefaultRebalanceTimeout = 60 * time.Second ) var _ sarama.ConsumerGroupHandler = (*Consumer)(nil) // compile time proof @@ -42,6 +47,11 @@ type Consumer struct { DialTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration + KeepAlive time.Duration + MetadataRefreshFreq time.Duration + SessionTimeout time.Duration + HeartbeatInterval time.Duration + RebalanceTimeout time.Duration Backoff time.Duration MaxRetries uint8 MessageBufferSize int @@ -426,6 +436,81 @@ func WithSaramaConsumerGroupHandler(handler sarama.ConsumerGroupHandler) Option } } +// WithKeepAlive sets TCP keepalive interval for broker connections. +func WithKeepAlive(d time.Duration) Option { + return func(c *Consumer) error { + if d < 0 { + return fmt.Errorf( + "[kafkaconsumergroup.WithKeepAlive] error: [%w, '%s' received, must >= 0]", + cerrors.ErrInvalid, d, + ) + } + c.KeepAlive = d + + return nil + } +} + +// WithMetadataRefreshFreq sets how often broker metadata (including IPs) is refreshed. +func WithMetadataRefreshFreq(d time.Duration) Option { + return func(c *Consumer) error { + if d <= 0 { + return fmt.Errorf( + "[kafkaconsumergroup.WithMetadataRefreshFreq] error: [%w, '%s' received, must > 0]", + cerrors.ErrInvalid, d, + ) + } + c.MetadataRefreshFreq = d + + return nil + } +} + +// WithSessionTimeout sets consumer group session timeout. +func WithSessionTimeout(d time.Duration) Option { + return func(c *Consumer) error { + if d <= 0 { + return fmt.Errorf( + "[kafkaconsumergroup.WithSessionTimeout] error: [%w, '%s' received, must > 0]", + cerrors.ErrInvalid, d, + ) + } + c.SessionTimeout = d + + return nil + } +} + +// WithHeartbeatInterval sets consumer group heartbeat interval. +func WithHeartbeatInterval(d time.Duration) Option { + return func(c *Consumer) error { + if d <= 0 { + return fmt.Errorf( + "[kafkaconsumergroup.WithHeartbeatInterval] error: [%w, '%s' received, must > 0]", + cerrors.ErrInvalid, d, + ) + } + c.HeartbeatInterval = d + + return nil + } +} + +// WithRebalanceTimeout sets consumer group rebalance timeout. +func WithRebalanceTimeout(d time.Duration) Option { + return func(c *Consumer) error { + if d <= 0 { + return fmt.Errorf( + "[kafkaconsumergroup.WithRebalanceTimeout] error: [%w, '%s' received, must > 0]", + cerrors.ErrInvalid, d, + ) + } + c.RebalanceTimeout = d + + return nil + } +} + // New instantiates new kafka github consumer group instance. func New(options ...Option) (*Consumer, error) { consumer := new(Consumer) @@ -437,6 +522,11 @@ func New(options ...Option) (*Consumer, error) { consumer.DialTimeout = DefaultDialTimeout consumer.ReadTimeout = DefaultReadTimeout consumer.WriteTimeout = DefaultWriteTimeout + consumer.KeepAlive = DefaultKeepAlive + consumer.MetadataRefreshFreq = DefaultMetadataRefreshFreq + consumer.SessionTimeout = DefaultSessionTimeout + consumer.HeartbeatInterval = DefaultHeartbeatInterval + consumer.RebalanceTimeout = DefaultRebalanceTimeout consumer.Backoff = DefaultBackoff consumer.MaxRetries = DefaultMaxRetries consumer.NumberOfWorkers = runtime.NumCPU() @@ -463,7 +553,13 @@ func New(options ...Option) (*Consumer, error) { config.Net.DialTimeout = consumer.DialTimeout config.Net.ReadTimeout = consumer.ReadTimeout config.Net.WriteTimeout = consumer.WriteTimeout - config.Version = sarama.V3_9_0_0 + config.Net.KeepAlive = consumer.KeepAlive + config.Metadata.RefreshFrequency = consumer.MetadataRefreshFreq + config.Metadata.Full = true + config.Consumer.Group.Session.Timeout = consumer.SessionTimeout + config.Consumer.Group.Heartbeat.Interval = consumer.HeartbeatInterval + config.Consumer.Group.Rebalance.Timeout = consumer.RebalanceTimeout + config.Version = consumer.KafkaVersion config.Consumer.Return.Errors = true var saramaConsumerGroup sarama.ConsumerGroup diff --git a/internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup_test.go b/internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup_test.go index d7a8662..9da4fc7 100644 --- a/internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup_test.go +++ b/internal/kafkacp/kafkaconsumergroup/kafkaconsumergroup_test.go @@ -444,3 +444,252 @@ func TestNew_Consume_Success(t *testing.T) { _ = process.Signal(os.Interrupt) wg.Wait() } + +func TestNew_InvalidKeepAlive(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithKeepAlive(-1*time.Second), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_ValidKeepAlive(t *testing.T) { + logger := mockslogger.New() + + consumerGroup := &mockConsumerGroup{} + consumerGroupFactory := &mockConsumerGroupFactory{} + consumerGroupFactory.On("CreateConsumerGroup", mock.Anything, mock.Anything, mock.Anything). + Return(consumerGroup, nil) + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKeepAlive(15*time.Second), + kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup), + ) + + assert.NoError(t, err) + assert.NotNil(t, consumer) +} + +func TestNew_InvalidMetadataRefreshFreq(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithMetadataRefreshFreq(-1*time.Second), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_ZeroMetadataRefreshFreq(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithMetadataRefreshFreq(0), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_InvalidSessionTimeout(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithSessionTimeout(-1*time.Second), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_ZeroSessionTimeout(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithSessionTimeout(0), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_InvalidHeartbeatInterval(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithHeartbeatInterval(-1*time.Second), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_ZeroHeartbeatInterval(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithHeartbeatInterval(0), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_InvalidRebalanceTimeout(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithRebalanceTimeout(-1*time.Second), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +} + +func TestNew_ValidMetadataRefreshFreq(t *testing.T) { + logger := mockslogger.New() + + consumerGroup := &mockConsumerGroup{} + consumerGroupFactory := &mockConsumerGroupFactory{} + consumerGroupFactory.On("CreateConsumerGroup", mock.Anything, mock.Anything, mock.Anything). + Return(consumerGroup, nil) + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithMetadataRefreshFreq(2*time.Minute), + kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup), + ) + + assert.NoError(t, err) + assert.NotNil(t, consumer) +} + +func TestNew_ValidSessionTimeout(t *testing.T) { + logger := mockslogger.New() + + consumerGroup := &mockConsumerGroup{} + consumerGroupFactory := &mockConsumerGroupFactory{} + consumerGroupFactory.On("CreateConsumerGroup", mock.Anything, mock.Anything, mock.Anything). + Return(consumerGroup, nil) + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithSessionTimeout(20*time.Second), + kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup), + ) + + assert.NoError(t, err) + assert.NotNil(t, consumer) +} + +func TestNew_ValidHeartbeatInterval(t *testing.T) { + logger := mockslogger.New() + + consumerGroup := &mockConsumerGroup{} + consumerGroupFactory := &mockConsumerGroupFactory{} + consumerGroupFactory.On("CreateConsumerGroup", mock.Anything, mock.Anything, mock.Anything). + Return(consumerGroup, nil) + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithHeartbeatInterval(5*time.Second), + kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup), + ) + + assert.NoError(t, err) + assert.NotNil(t, consumer) +} + +func TestNew_ValidRebalanceTimeout(t *testing.T) { + logger := mockslogger.New() + + consumerGroup := &mockConsumerGroup{} + consumerGroupFactory := &mockConsumerGroupFactory{} + consumerGroupFactory.On("CreateConsumerGroup", mock.Anything, mock.Anything, mock.Anything). + Return(consumerGroup, nil) + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithRebalanceTimeout(45*time.Second), + kafkaconsumergroup.WithSaramaConsumerGroupFactoryFunc(consumerGroupFactory.CreateConsumerGroup), + ) + + assert.NoError(t, err) + assert.NotNil(t, consumer) +} + +func TestNew_ZeroRebalanceTimeout(t *testing.T) { + logger := mockslogger.New() + + consumer, err := kafkaconsumergroup.New( + kafkaconsumergroup.WithLogger(logger), + kafkaconsumergroup.WithProcessMessageFunc(mockProcessMessageFunc()), + kafkaconsumergroup.WithKafkaGroupName("github-group"), + kafkaconsumergroup.WithTopic(kafkacp.KafkaTopicIdentifierGitHub.String()), + kafkaconsumergroup.WithKafkaBrokers(kafkacp.DefaultKafkaBrokers), + kafkaconsumergroup.WithRebalanceTimeout(0), + ) + + assert.ErrorIs(t, err, cerrors.ErrInvalid) + assert.Nil(t, consumer) +}