Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions kq/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package kq

import (
"context"
"crypto/tls"
"strconv"
"time"

"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/zeromicro/go-queue/kq/internal"
"github.com/zeromicro/go-zero/core/executors"
"github.com/zeromicro/go-zero/core/logx"
Expand All @@ -30,13 +33,19 @@ type (
// kafka.Writer options
allowAutoTopicCreation bool
balancer kafka.Balancer
saslMechanism sasl.Mechanism
tlsConfig *tls.Config

// executors.ChunkExecutor options
chunkSize int
flushInterval time.Duration

// syncPush is used to enable sync push
syncPush bool

// sasl options
username string
password string
}
)

Expand All @@ -60,6 +69,22 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
producer.Balancer = options.balancer
}

if options.tlsConfig != nil || options.saslMechanism != nil || (len(options.username) > 0 && len(options.password) > 0) {
transport := &kafka.Transport{
TLS: options.tlsConfig,
SASL: options.saslMechanism,
}

if len(options.username) > 0 && len(options.password) > 0 {
transport.SASL = plain.Mechanism{
Username: options.username,
Password: options.password,
}
}

producer.Transport = transport
}

pusher := &Pusher{
producer: producer,
topic: topic,
Expand Down Expand Up @@ -178,3 +203,25 @@ func WithSyncPush() PushOption {
options.syncPush = true
}
}

// WithSaslPlain customizes the Pusher with the given username and password.
func WithSaslPlain(username, password string) PushOption {
return func(options *pushOptions) {
options.username = username
options.password = password
}
}

// WithSASL customizes the Pusher with the given sasl mechanism.
func WithSASL(saslMechanism sasl.Mechanism) PushOption {
return func(options *pushOptions) {
options.saslMechanism = saslMechanism
}
}

// WithTLS customizes the Pusher with the given tls config.
func WithTLS(tlsConfig *tls.Config) PushOption {
return func(options *pushOptions) {
options.tlsConfig = tlsConfig
}
}
43 changes: 43 additions & 0 deletions kq/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package kq

import (
"context"
"crypto/tls"
"errors"
"testing"
"time"

"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
Expand Down Expand Up @@ -63,6 +65,47 @@ func TestNewPusher(t *testing.T) {
assert.NotNil(t, pusher)
assert.True(t, pusher.producer.(*kafka.Writer).AllowAutoTopicCreation)
})

t.Run("WithTLS", func(t *testing.T) {
pusher := NewPusher(addrs, topic, WithTLS(&tls.Config{}))
assert.NotNil(t, pusher)
assert.NotNil(t, pusher.producer.(*kafka.Writer).Transport)
})

t.Run("WithSASL", func(t *testing.T) {
pusher := NewPusher(addrs, topic, WithSASL(plain.Mechanism{}))
assert.NotNil(t, pusher)
assert.NotNil(t, pusher.producer.(*kafka.Writer).Transport)
})

t.Run("WithSaslPlain", func(t *testing.T) {
pusher := NewPusher(addrs, topic, WithSaslPlain("user", "pass"))
assert.NotNil(t, pusher)
transport := pusher.producer.(*kafka.Writer).Transport.(*kafka.Transport)
assert.NotNil(t, transport)
assert.NotNil(t, transport.SASL)
})

t.Run("WithTLSAndSASL", func(t *testing.T) {
pusher := NewPusher(addrs, topic,
WithTLS(&tls.Config{}),
WithSaslPlain("user", "pass"),
)
assert.NotNil(t, pusher)
transport := pusher.producer.(*kafka.Writer).Transport.(*kafka.Transport)
assert.NotNil(t, transport)
assert.NotNil(t, transport.TLS)
assert.NotNil(t, transport.SASL)
})

t.Run("WithTLSAndSaslPlain", func(t *testing.T) {
pusher := NewPusher(addrs, topic, WithTLS(&tls.Config{}), WithSaslPlain("user", "pass"))
assert.NotNil(t, pusher)
transport := pusher.producer.(*kafka.Writer).Transport.(*kafka.Transport)
assert.NotNil(t, transport)
assert.NotNil(t, transport.TLS)
assert.NotNil(t, transport.SASL)
})
}

func TestPusher_Close(t *testing.T) {
Expand Down