From 4bae14d73ab3fa857c72309af28daa71f43458e9 Mon Sep 17 00:00:00 2001 From: ljluestc Date: Sun, 25 Jan 2026 00:34:23 -0800 Subject: [PATCH] feat(kq): Add support for SASL and TLS in Pusher --- kq/pusher.go | 47 +++++++++++++++++++++++++++++++++++++++++++++++ kq/pusher_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/kq/pusher.go b/kq/pusher.go index b254d4c..27d4f09 100644 --- a/kq/pusher.go +++ b/kq/pusher.go @@ -2,10 +2,13 @@ package kq import ( "context" + "crypto/tls" "strconv" "time" "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl" + "github.com/segmentio/kafka-go/sasl/plain" "github.com/zeromicro/go-queue/kq/internal" "github.com/zeromicro/go-zero/core/executors" "github.com/zeromicro/go-zero/core/logx" @@ -30,6 +33,8 @@ type ( // kafka.Writer options allowAutoTopicCreation bool balancer kafka.Balancer + saslMechanism sasl.Mechanism + tlsConfig *tls.Config // executors.ChunkExecutor options chunkSize int @@ -37,6 +42,10 @@ type ( // syncPush is used to enable sync push syncPush bool + + // sasl options + username string + password string } ) @@ -60,6 +69,22 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher { producer.Balancer = options.balancer } + if options.tlsConfig != nil || options.saslMechanism != nil || (len(options.username) > 0 && len(options.password) > 0) { + transport := &kafka.Transport{ + TLS: options.tlsConfig, + SASL: options.saslMechanism, + } + + if len(options.username) > 0 && len(options.password) > 0 { + transport.SASL = plain.Mechanism{ + Username: options.username, + Password: options.password, + } + } + + producer.Transport = transport + } + pusher := &Pusher{ producer: producer, topic: topic, @@ -178,3 +203,25 @@ func WithSyncPush() PushOption { options.syncPush = true } } + +// WithSaslPlain customizes the Pusher with the given username and password. +func WithSaslPlain(username, password string) PushOption { + return func(options *pushOptions) { + options.username = username + options.password = password + } +} + +// WithSASL customizes the Pusher with the given sasl mechanism. +func WithSASL(saslMechanism sasl.Mechanism) PushOption { + return func(options *pushOptions) { + options.saslMechanism = saslMechanism + } +} + +// WithTLS customizes the Pusher with the given tls config. +func WithTLS(tlsConfig *tls.Config) PushOption { + return func(options *pushOptions) { + options.tlsConfig = tlsConfig + } +} diff --git a/kq/pusher_test.go b/kq/pusher_test.go index 72b380d..aea59f2 100644 --- a/kq/pusher_test.go +++ b/kq/pusher_test.go @@ -2,11 +2,13 @@ package kq import ( "context" + "crypto/tls" "errors" "testing" "time" "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) @@ -63,6 +65,47 @@ func TestNewPusher(t *testing.T) { assert.NotNil(t, pusher) assert.True(t, pusher.producer.(*kafka.Writer).AllowAutoTopicCreation) }) + + t.Run("WithTLS", func(t *testing.T) { + pusher := NewPusher(addrs, topic, WithTLS(&tls.Config{})) + assert.NotNil(t, pusher) + assert.NotNil(t, pusher.producer.(*kafka.Writer).Transport) + }) + + t.Run("WithSASL", func(t *testing.T) { + pusher := NewPusher(addrs, topic, WithSASL(plain.Mechanism{})) + assert.NotNil(t, pusher) + assert.NotNil(t, pusher.producer.(*kafka.Writer).Transport) + }) + + t.Run("WithSaslPlain", func(t *testing.T) { + pusher := NewPusher(addrs, topic, WithSaslPlain("user", "pass")) + assert.NotNil(t, pusher) + transport := pusher.producer.(*kafka.Writer).Transport.(*kafka.Transport) + assert.NotNil(t, transport) + assert.NotNil(t, transport.SASL) + }) + + t.Run("WithTLSAndSASL", func(t *testing.T) { + pusher := NewPusher(addrs, topic, + WithTLS(&tls.Config{}), + WithSaslPlain("user", "pass"), + ) + assert.NotNil(t, pusher) + transport := pusher.producer.(*kafka.Writer).Transport.(*kafka.Transport) + assert.NotNil(t, transport) + assert.NotNil(t, transport.TLS) + assert.NotNil(t, transport.SASL) + }) + + t.Run("WithTLSAndSaslPlain", func(t *testing.T) { + pusher := NewPusher(addrs, topic, WithTLS(&tls.Config{}), WithSaslPlain("user", "pass")) + assert.NotNil(t, pusher) + transport := pusher.producer.(*kafka.Writer).Transport.(*kafka.Transport) + assert.NotNil(t, transport) + assert.NotNil(t, transport.TLS) + assert.NotNil(t, transport.SASL) + }) } func TestPusher_Close(t *testing.T) {