diff --git a/.gitignore b/.gitignore index a7519ab..963f51b 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ test.log /tmp/* /examples/tmp/* /bin/serve/docker/prometheus/data +/bin/serve/docker/nats/certs/*.pem diff --git a/.tool-versions b/.tool-versions index 1e576d1..e4df726 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1 +1,2 @@ golang 1.21.7 +golangci-lint 1.64.5 diff --git a/bin/coverage.sh b/bin/coverage.sh new file mode 100755 index 0000000..45d4f55 --- /dev/null +++ b/bin/coverage.sh @@ -0,0 +1,52 @@ +#!/usr/bin/env bash + +set -e + +# Default values +should_open=false +package_path="./..." + +# Function to display usage +usage() { + echo "Usage: $0 [-p|--package ] [-o|--open] [-h|--help]" + echo " -p, --package Specify the package path to test (default: all packages)" + echo " -o, --open Open the coverage report in the default browser" + echo " -h, --help Display this help message" +} + +# Parse arguments +while [[ "$#" -gt 0 ]]; do + case $1 in + -p|--package) package_path="$2"; shift ;; + -o|--open) should_open=true ;; + -h|--help) + usage + exit 0 + ;; + *) + echo "Unknown parameter passed: $1" + usage + exit 1 + ;; + esac + shift +done + +# TODO: work through lint issues and enable golanci-lint +# lint +# golangci-lint run $package_path + +# unit test +go test $package_path -coverpkg=$package_path -race -covermode=atomic -coverprofile=coverage.out +go tool cover -html=coverage.out -o ~/Desktop/coverage.html + +# report +echo "" +echo "===" +echo "Coverage report is generated at ~/Desktop/coverage.html" +echo "===" + +# open +if [ "$should_open" = true ]; then + open ~/Desktop/coverage.html +fi diff --git a/bin/serve/docker.sh b/bin/serve/docker.sh index b28d3ae..d562aa8 100755 --- a/bin/serve/docker.sh +++ b/bin/serve/docker.sh @@ -1,5 +1,7 @@ #!/usr/bin/env bash +./bin/serve/gen-certs.sh + pushd ./bin/serve/docker make start diff --git a/bin/serve/docker/docker-compose.yml b/bin/serve/docker/docker-compose.yml index c893432..ae95f4b 100644 --- a/bin/serve/docker/docker-compose.yml +++ b/bin/serve/docker/docker-compose.yml @@ -1,6 +1,21 @@ version: '3' services: + nats: + image: docker.io/nats:2.9.20 + ports: + - "4222:4222" + - "6222:6222" + - "8222:8222" + volumes: + - "./nats/certs/server.pem:/certs/server.pem" + - "./nats/certs/server-key.pem:/certs/server-key.pem" + command: + - "-tls" + - "-tlscert=/certs/server.pem" + - "-tlskey=/certs/server-key.pem" + - "-js" + # grafana: # build: # context: ./grafana diff --git a/bin/serve/gen-certs.sh b/bin/serve/gen-certs.sh new file mode 100755 index 0000000..a54eff4 --- /dev/null +++ b/bin/serve/gen-certs.sh @@ -0,0 +1,66 @@ +#!/usr/bin/env bash + +set -e + +if [ -z "$(which mkcert)" ]; then + echo "mkcert not installed. Installing....." + HOMEBREW_NO_AUTO_UPDATE=1 brew install mkcert +fi + +check_and_generate_certificate() { + if [ ! -f "$(mkcert -CAROOT)/rootCA.pem" ]; then + cat << EOF +##################################################################### +# ERROR: The mkcert root CA certificate has not been generated. # +# Please run the following command to install it: # +# # +# mkcert --install # +# # +# This is required for nats to start properly. # +##################################################################### +EOF + return 1 + fi + + + # Check if server.pem exists and if it is a directory + if [ -d ./server.pem ]; then + echo "server.pem is a directory. Deleting certs contents." + rm -rf ./* + echo "Directory cleaned up." + fi + + # Check if the server.pem file exists + if [ ! -f ./server.pem ]; then + echo "server.pem does not exist. Generating a new certificate." + mkcert -cert-file server.pem -key-file server-key.pem localhost 127.0.0.1 ::1 + return 0 + fi + + # Get the certificate expiration date + enddate=$(openssl x509 -enddate -noout -in ./server.pem | cut -d= -f2) + + # Convert the expiration date to a Unix timestamp + enddate_timestamp=$(date -j -f "%b %d %T %Y %Z" "$enddate" "+%Y%m%d%H%M%S") + + # Get today's date as a Unix timestamp + current_timestamp=$(date "+%Y%m%d%H%M%S") + + # Compare the dates + if [ "$enddate_timestamp" -le "$current_timestamp" ]; then + echo "Certificate has expired or is expiring today. Generating a new one." + mkcert -cert-file server.pem -key-file server-key.pem localhost 127.0.0.1 ::1 + return 0 + fi + + echo "Certificate is still valid." +} + +mkdir -p ./bin/serve/docker/nats/certs + +pushd ./bin/serve/docker/nats/certs + + check_and_generate_certificate + +popd + diff --git a/bin/test.sh b/bin/test.sh index aa75cb0..8e6e055 100755 --- a/bin/test.sh +++ b/bin/test.sh @@ -6,11 +6,14 @@ go install github.com/jstemmer/go-junit-report/v2@latest list=`go list ./... | grep -v mocks | grep -v docs | grep -v errors | grep -v examples` -go test -v -coverpkg=./... -race -covermode=atomic -coverprofile=coverage.out 2>&1 $list > test.log +go test -v -coverpkg=./... -race -covermode=atomic -coverprofile=coverage.out $list 2>&1 | tee test.log +test_exit=${PIPESTATUS[0]} -cat test.log +cat test.log | go-junit-report > junit.xml -cat test.log | go-junit-report -set-exit-code > junit.xml +if [ $test_exit -ne 0 ]; then + exit $test_exit +fi coverage=$(go tool cover -func coverage.out | grep total | awk '{print $3}') diff --git a/circuitbreaker/circuitbreaker.go b/circuitbreaker/circuitbreaker.go index 3e4b579..6019dd6 100644 --- a/circuitbreaker/circuitbreaker.go +++ b/circuitbreaker/circuitbreaker.go @@ -100,8 +100,8 @@ func (cb *CircuitBreaker) Allow() bool { allowed = true } - logger.Debug("circuit breaker allow check", logger.Fields{ - "state": cb.state.String(), + cb.config.Logger.Debug("circuit breaker allow check", logger.Fields{ + "state": cb.state, "allowed": allowed, "attempts": cb.attempts, "max_calls": cb.config.HalfOpenMaxCalls, @@ -123,13 +123,13 @@ func (cb *CircuitBreaker) RecordStart() bool { switch cb.state { case StateOpen: - logger.Debug("attempt rejected - circuit open", logger.Fields{ + cb.config.Logger.Debug("attempt rejected - circuit open", logger.Fields{ "state": cb.state.String(), }) return false case StateHalfOpen: if cb.attempts >= cb.config.HalfOpenMaxCalls { - logger.Debug("attempt rejected - max half-open calls reached", logger.Fields{ + cb.config.Logger.Debug("attempt rejected - max half-open calls reached", logger.Fields{ "attempts": cb.attempts, "max_calls": cb.config.HalfOpenMaxCalls, }) diff --git a/messaging/natsjscm/natsjscm.go b/messaging/natsjscm/natsjscm.go index bbaf84f..914a4f8 100644 --- a/messaging/natsjscm/natsjscm.go +++ b/messaging/natsjscm/natsjscm.go @@ -73,8 +73,8 @@ func NewConnectionManager(config ConnectionConfig) (Connector, error) { // Add connection event handlers options = append(options, - nats.ReconnectHandler(func(_ *nats.Conn) { - cm.handleReconnect() + nats.DisconnectHandler(func(_ *nats.Conn) { + cm.retryConnection() }), ) @@ -82,34 +82,6 @@ func NewConnectionManager(config ConnectionConfig) (Connector, error) { return cm, nil } -func (cm *ConnectionManager) handleReconnect() { - cm.mu.Lock() - defer cm.mu.Unlock() - - cm.log.Debugf("NATS connection reconnected") - - // Recreate JetStream context after reconnect - if cm.nc == nil { - return - } - - js, err := cm.createJetStreamContext(cm.nc) - if err != nil { - cm.log.Errorf("Failed to recreate JetStream context %s", err) - go cm.retryConnection() - return - } - - cm.js = js - cm.log.Debugf("JetStream context recreated") -} - -// createJetStreamContext creates a new JetStream context with the current configuration -func (cm *ConnectionManager) createJetStreamContext(nc *nats.Conn) (jetstream.JetStream, error) { - // Create JetStream context - return jetstream.New(nc) -} - // retryConnection attempts to reconnect to NATS periodically func (cm *ConnectionManager) retryConnection() { for { @@ -129,7 +101,7 @@ func (cm *ConnectionManager) retryConnection() { continue } - js, err := cm.createJetStreamContext(nc) + js, err := jetstream.New(nc) if err != nil { cm.log.Errorf("Failed to recreate JetStream context: %s", err) nc.Close() @@ -148,23 +120,28 @@ func (cm *ConnectionManager) retryConnection() { // Connect establishes a connection to NATS if not already connected func (cm *ConnectionManager) Connect() error { + cm.log.Debugf("Starting Connect") + cm.mu.Lock() defer cm.mu.Unlock() + cm.log.Debugf("Checking NATS connection") // If already connected, increment reference count if cm.nc != nil && cm.nc.IsConnected() { cm.refs++ return nil } + cm.log.Debugf("Connecting to NATS") // Connect to NATS nc, err := nats.Connect(cm.config.URL, cm.config.Options...) if err != nil { return fmt.Errorf("failed to connect to NATS: %w", err) } + cm.log.Debugf("Getting JetStream context") // Create JetStream context - js, err := cm.createJetStreamContext(nc) + js, err := jetstream.New(nc) if err != nil { nc.Close() return fmt.Errorf("failed to create JetStream context: %w", err) @@ -174,6 +151,8 @@ func (cm *ConnectionManager) Connect() error { cm.js = js cm.refs = 1 + cm.log.Debugf("Connected to NATS") + return nil } diff --git a/messaging/natsjscm/natsjscm_test.go b/messaging/natsjscm/natsjscm_test.go index a4aa137..fcd9d23 100644 --- a/messaging/natsjscm/natsjscm_test.go +++ b/messaging/natsjscm/natsjscm_test.go @@ -14,31 +14,11 @@ import ( "github.com/nats-io/nats.go" "github.com/simiancreative/simiango/messaging/natsjscm" + "github.com/simiancreative/simiango/mocks/logger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -// Mock Logger for testing -type mockLogger struct { - debugMessages []string - errorMessages []string -} - -func newMockLogger() *mockLogger { - return &mockLogger{ - debugMessages: []string{}, - errorMessages: []string{}, - } -} - -func (m *mockLogger) Debugf(format string, args ...interface{}) { - m.debugMessages = append(m.debugMessages, fmt.Sprintf(format, args...)) -} - -func (m *mockLogger) Errorf(format string, args ...interface{}) { - m.errorMessages = append(m.errorMessages, fmt.Sprintf(format, args...)) -} - func MockServer(args ...int) func() { port := 0 if len(args) > 0 { @@ -105,7 +85,7 @@ func TestNewConnectionManager(t *testing.T) { config: natsjscm.ConnectionConfig{ URL: "nats://localhost:4222", ReconnectWait: 5 * time.Second, - Logger: newMockLogger(), + Logger: &logger.MockLogger{}, }, expectError: false, }, @@ -302,14 +282,10 @@ func TestEnsureStream(t *testing.T) { func TestConnectionEvents(t *testing.T) { shutdown := MockServer() - // Create a mock logger to capture log messages - logger := newMockLogger() - // Create a connection manager with event handlers cm, err := natsjscm.NewConnectionManager(natsjscm.ConnectionConfig{ URL: os.Getenv("NATS_HOST"), ReconnectWait: 100 * time.Millisecond, - Logger: logger, }) require.NoError(t, err) diff --git a/messaging/natsjscon/natsjscon.go b/messaging/natsjscon/natsjscon.go new file mode 100644 index 0000000..09cef52 --- /dev/null +++ b/messaging/natsjscon/natsjscon.go @@ -0,0 +1,228 @@ +package natsjscon + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/simiancreative/simiango/logger" + "github.com/simiancreative/simiango/messaging/natsjscm" + "github.com/simiancreative/simiango/messaging/natsjsdlq" +) + +// Logger is the interface for logging +type Logger interface { + Debug(args ...any) + Debugf(format string, args ...any) + Error(args ...any) +} + +// ProcessStatus represents the result of message processing +type ProcessStatus int + +const ( + // Success indicates successful processing + Success ProcessStatus = iota + // Failure indicates processing failure (will be retried) + Failure + // TerminalFailure indicates a non-recoverable failure (sent to DLQ) + TerminalFailure +) + +var ResultNames = map[ProcessStatus]string{ + Success: "SUCCESS", + Failure: "FAILURE", + TerminalFailure: "TERMINAL-FAILURE", +} + +// Processor processes messages +type Processor func(ctx context.Context, msgs <-chan jetstream.Msg) map[jetstream.Msg]ProcessStatus + +// ConsumerConfig holds general configuration for a consumer +type ConsumerConfig struct { + // StreamName is the name of the stream to consume from + StreamName string + + // ConsumerName is the name for this consumer + ConsumerName string + + // Subject is the subject to subscribe to + Subject string + + // MaxRetries is the maximum number of retries before sending to DLQ + MaxRetries int + + // ProcessTimeout is the timeout for processing a message/batch + ProcessTimeout time.Duration + + // WorkerCount is the number of concurrent workers + WorkerCount int +} + +// ConsumptionStrategy defines the interface for different message consumption strategies +type ConsumptionStrategy interface { + // Setup prepares the consumption strategy + Setup(ctx context.Context) error + + // Consume retrieves messages for processing + Consume(ctx context.Context, workerID int) (<-chan jetstream.Msg, error) +} + +// Consumer manages consuming and processing messages +type Consumer struct { + // Configuration + config ConsumerConfig + + // Dependencies + logger Logger + strategy ConsumptionStrategy + processor Processor + cm natsjscm.Connector + dlqHandler natsjsdlq.Handler + + // State + running bool + wg sync.WaitGroup + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc +} + +// NewConsumer creates a new consumer with the specified strategy +func NewConsumer(config ConsumerConfig) *Consumer { + return &Consumer{config: config} +} + +func (c *Consumer) debug(args ...any) { + if c.logger == nil { + return + } + + c.logger.Debug(args...) +} + +// SetLogger sets the logger +func (c *Consumer) SetLogger(logger Logger) *Consumer { + c.mu.Lock() + defer c.mu.Unlock() + c.logger = logger + c.debug("logger set") + return c +} + +// SetConnector sets the connection manager +func (c *Consumer) SetConnector(cm natsjscm.Connector) *Consumer { + c.mu.Lock() + defer c.mu.Unlock() + + c.cm = cm + c.debug("connection manager set") + return c +} + +// SetStrategy sets the consumption strategy +func (c *Consumer) SetStrategy(strategy ConsumptionStrategy) *Consumer { + c.mu.Lock() + defer c.mu.Unlock() + + c.strategy = strategy + c.debug("strategy set") + return c +} + +// SetDLQHandler sets the dead letter queue handler +func (c *Consumer) SetDLQHandler(handler natsjsdlq.Handler) *Consumer { + c.mu.Lock() + defer c.mu.Unlock() + + c.dlqHandler = handler + c.debug("DLQ handler set") + return c +} + +// SetProcessor sets the message processor +func (c *Consumer) SetProcessor(processor Processor) *Consumer { + c.mu.Lock() + defer c.mu.Unlock() + + c.processor = processor + c.debug("processor set") + return c +} + +// Stop stops the consumer +func (c *Consumer) Stop() { + if !c.running { + return + } + + c.logger.Debug("stopping consumer", logger.Fields{ + "stream": c.config.StreamName, + "consumer": c.config.ConsumerName, + }) + + c.cancel() + c.running = false + c.wg.Wait() +} + +// validate checks if the consumer is properly configured +func (c *Consumer) validate() error { + if c.processor == nil { + return errors.New("processor is required") + } + + if c.cm == nil { + return errors.New("connection manager is required") + } + + if c.strategy == nil { + return errors.New("consumption strategy is required") + } + + if c.config.StreamName == "" { + return errors.New("stream name is required") + } + + if c.config.ConsumerName == "" { + return errors.New("consumer name is required") + } + + if c.config.Subject == "" { + return errors.New("subject is required") + } + + return nil +} + +func (c *Consumer) setup() error { + c.debug("starting consumer setup") + + if c.logger == nil { + c.debug("setting logger") + c.SetLogger(logger.New()) + } + + // Set defaults + if c.config.MaxRetries <= 0 { + c.debug("setting default max retries to 3") + c.config.MaxRetries = 3 + } + + if c.config.ProcessTimeout <= 0 { + c.debug("setting default process timeout to 30s") + c.config.ProcessTimeout = 30 * time.Second + } + + if c.config.WorkerCount <= 0 { + c.debug("setting default worker count to 1") + c.config.WorkerCount = 1 + } + + c.debug("consumer setup complete, connecting to NATS") + + // Ensure connection to NATS + return c.cm.Connect() +} diff --git a/messaging/natsjscon/natsjscon_handle_result.go b/messaging/natsjscon/natsjscon_handle_result.go new file mode 100644 index 0000000..24defef --- /dev/null +++ b/messaging/natsjscon/natsjscon_handle_result.go @@ -0,0 +1,138 @@ +package natsjscon + +import ( + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/simiancreative/simiango/logger" +) + +// handleResult handles the result of message processing +func (c *Consumer) handleResult(msg jetstream.Msg, status ProcessStatus) { + if msg == nil { + c.logger.Debug("missing jetstream message reference", nil) + return + } + + metadata, err := msg.Metadata() + if err != nil { + c.logger.Debug("error getting message metadata", logger.Fields{ + "error": err.Error(), + }) + return + } + + switch status { + case Success: + handleSuccess(msg, c) + return + case Failure: + handleFailure(msg, metadata, c) + return + case TerminalFailure: + handleTerminalError(msg, c) + return + default: + c.logger.Debug("unknown process status", logger.Fields{ + "status": status, + }) + handleFailure(msg, metadata, c) + } +} + +func handleSuccess(msg jetstream.Msg, c *Consumer) { + c.logger.Debugf("message processed successfully, acknowledging") + + // Acknowledge successful processing + err := msg.Ack() + + if err == nil { + return + } + + c.logger.Error("error acknowledging message", logger.Fields{ + "error": err.Error(), + }) +} + +func handleFailure(msg jetstream.Msg, metadata *jetstream.MsgMetadata, c *Consumer) { + // Check if max retries reached + limitReached := handleMaxRetries(msg, metadata, c) + if limitReached { + return + } + + // Negative ack for retry + err := msg.Nak() + if err == nil { + return + } + + // Log error + c.logger.Error("error negative-acknowledging message", logger.Fields{ + "error": err.Error(), + }) +} + +func handleMaxRetries(msg jetstream.Msg, metadata *jetstream.MsgMetadata, c *Consumer) bool { + if metadata == nil { + return false + } + + if metadata.NumDelivered < uint64(c.config.MaxRetries) { + return false + } + + err := msg.Ack() + if err != nil { + c.logger.Debug("error acknowledging message", logger.Fields{ + "error": err.Error(), + }) + } + + if c.dlqHandler == nil { + return true + } + + publishToDLQ(msg, "max_retries", c) + + return true +} + +func handleTerminalError(msg jetstream.Msg, c *Consumer) { + // Ack to prevent redelivery + err := msg.Ack() + if err != nil { + c.logger.Debug("error acknowledging message", logger.Fields{ + "error": err.Error(), + }) + } + + // Send to DLQ if enabled + if c.dlqHandler == nil { + return + } + + // Create a NATS message for the DLQ + publishToDLQ(msg, "terminal_error", c) +} + +func publishToDLQ(original jetstream.Msg, reason string, c *Consumer) { + if c.dlqHandler == nil { + return + } + + msg := &nats.Msg{ + Subject: original.Subject(), + Data: original.Data(), + Header: original.Headers(), + } + + err := c.dlqHandler.PublishMessage(msg, reason) + if err == nil { + return + } + + c.logger.Debug("error sending to DLQ", logger.Fields{ + "error": err.Error(), + }) +} diff --git a/messaging/natsjscon/natsjscon_start.go b/messaging/natsjscon/natsjscon_start.go new file mode 100644 index 0000000..fc35211 --- /dev/null +++ b/messaging/natsjscon/natsjscon_start.go @@ -0,0 +1,75 @@ +package natsjscon + +import ( + "context" + "errors" + "fmt" + + "github.com/simiancreative/simiango/logger" +) + +type CtxKey string + +// Start begins consuming messages +func (c *Consumer) Start(ctx context.Context) error { + c.mu.Lock() + defer c.mu.Unlock() + + c.debug("is consumer already running?") + if c.running { + return errors.New("consumer is already running") + } + + c.debug("validating consumer configuration") + if err := c.validate(); err != nil { + return fmt.Errorf("invalid consumer configuration: %w", err) + } + + c.debug("setting up consumer") + if err := c.setup(); err != nil { + return fmt.Errorf("failed to setup consumer: %w", err) + } + + ctx = c.setupCtx(ctx) + + // Set up the strategy + c.debug("setting up consumption strategy") + if err := c.strategy.Setup(ctx); err != nil { + return fmt.Errorf("failed to setup consumption strategy: %w", err) + } + + c.ctx, c.cancel = context.WithCancel(ctx) + c.running = true + + // Start worker pool + c.debug("starting worker pool") + for i := 0; i < c.config.WorkerCount; i++ { + c.wg.Add(1) + go c.worker(i) + } + + c.logger.Debug("consumer started", logger.Fields{ + "stream": c.config.StreamName, + "consumer": c.config.ConsumerName, + "subject": c.config.Subject, + "worker_count": c.config.WorkerCount, + }) + + return nil +} + +func (c *Consumer) setupCtx(ctx context.Context) context.Context { + key := CtxKey("stream-name") + ctx = context.WithValue(ctx, key, c.config.StreamName) + + key = CtxKey("subject") + ctx = context.WithValue(ctx, key, c.config.Subject) + + key = CtxKey("logger") + ctx = context.WithValue(ctx, key, c.logger) + + key = CtxKey("connection-manager") + ctx = context.WithValue(ctx, key, c.cm) + + return ctx +} diff --git a/messaging/natsjscon/natsjscon_test.go b/messaging/natsjscon/natsjscon_test.go new file mode 100644 index 0000000..46771bf --- /dev/null +++ b/messaging/natsjscon/natsjscon_test.go @@ -0,0 +1,127 @@ +package natsjscon_test + +import ( + "context" + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/simiancreative/simiango/messaging/natsjscon" + "github.com/simiancreative/simiango/mocks/messaging/natsjscm" + conmock "github.com/simiancreative/simiango/mocks/messaging/natsjscon" + "github.com/stretchr/testify/assert" +) + +func TestNewConsumer(t *testing.T) { + d := conmock.NewDependencies() + c := d.NewConsumer(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := c.Start(ctx) + assert.NoError(t, err) + + // wait + time.Sleep(500 * time.Millisecond) + + // does not panic + assert.NotPanics(t, func() { + c.Stop() + }, "Stop should not panic") +} + +func TestNewConsumerWithInvalidConfig(t *testing.T) { + d := conmock.NewDependencies() + + tests := []struct { + name string + fn func(c *natsjscon.Consumer) + shouldError bool + }{ + { + name: "SetProcessor", + fn: func(c *natsjscon.Consumer) { + c.SetProcessor(nil) + }, + shouldError: true, + }, + { + name: "SetConnector", + fn: func(c *natsjscon.Consumer) { + c.SetConnector(nil) + }, + shouldError: true, + }, + { + name: "ValidateStream", + fn: func(c *natsjscon.Consumer) { + *c = *d.NewConsumer(t, natsjscon.ConsumerConfig{ + StreamName: "", + ConsumerName: "test-consumer", + Subject: "test.subject.>", + }) + }, + shouldError: true, + }, + { + name: "ValidateConsumerName", + fn: func(c *natsjscon.Consumer) { + *c = *d.NewConsumer(t, natsjscon.ConsumerConfig{ + StreamName: "test-stream", + ConsumerName: "", + Subject: "test.subject.>", + }) + }, + shouldError: true, + }, + { + name: "ValidateSubject", + fn: func(c *natsjscon.Consumer) { + *c = *d.NewConsumer(t, natsjscon.ConsumerConfig{ + StreamName: "test-stream", + ConsumerName: "test-consumer", + Subject: "", + }) + }, + shouldError: true, + }, + } + + for _, tt := range tests { + c := d.NewConsumer(t) + + t.Run(tt.name, func(t *testing.T) { + tt.fn(c) + + if tt.shouldError { + assert.Error(t, c.Start(context.Background())) + } + + if !tt.shouldError { + assert.NoError(t, c.Start(context.Background())) + } + }) + } +} + +func TestHandleResult(t *testing.T) { + d := conmock.NewDependencies() + c := d.NewConsumer(t) + + badAck := natsjscm.NewMockJetStreamMsg(jetstream.ErrMsgAlreadyAckd) + + batch := natsjscm.NewMockMessageBatch( + []jetstream.Msg{badAck}, + nil, + ) + + d.Strategy.Reset(batch) + + err := c.Start(context.Background()) + assert.NoError(t, err) + + assert.Eventually(t, func() bool { + return d.Processor.AssertExpectations(t) && badAck.AssertExpectations(t) + }, 500*time.Millisecond, 100*time.Millisecond) +} diff --git a/messaging/natsjscon/natsjscon_worker.go b/messaging/natsjscon/natsjscon_worker.go new file mode 100644 index 0000000..cf6344f --- /dev/null +++ b/messaging/natsjscon/natsjscon_worker.go @@ -0,0 +1,92 @@ +package natsjscon + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/simiancreative/simiango/logger" +) + +// worker processes messages using the configured strategy +func (c *Consumer) worker(id int) { + defer c.wg.Done() + + c.logger.Debug("starting worker", logger.Fields{ + "worker_id": id, + "consumer": c.config.ConsumerName, + }) + + for { + select { + case <-c.ctx.Done(): + c.logger.Debug("worker stopping", logger.Fields{ + "worker_id": id, + "reason": "context cancelled", + }) + + return + default: + err := c.consumeAndProcess(id) + + if err == nil { + continue + } + + if errors.Is(err, context.Canceled) { + continue + } + + c.logger.Debug("error in worker", logger.Fields{ + "worker_id": id, + "error": err.Error(), + }) + + // Brief backoff on error + time.Sleep(100 * time.Millisecond) + } + } +} + +// consumeAndProcess uses the strategy to get messages and processes them +func (c *Consumer) consumeAndProcess(workerID int) error { + // Use strategy to consume messages + msgs, err := c.strategy.Consume(c.ctx, workerID) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + // This is normal when no messages are available + return nil + } + + return fmt.Errorf("failed to consume messages: %w", err) + } + + if len(msgs) == 0 { + return nil + } + + c.logger.Debug("processing batch", logger.Fields{ + "worker_id": workerID, + "batch_size": len(msgs), + }) + + // Create processing context with timeout + procCtx, procCancel := context.WithTimeout(c.ctx, c.config.ProcessTimeout) + defer procCancel() + + // Process batch + c.mu.RLock() + processor := c.processor + c.mu.RUnlock() + + // Call the batch processor + results := processor(procCtx, msgs) + + // Handle results + for msg, status := range results { + c.handleResult(msg, status) + } + + return nil +} diff --git a/messaging/natsjsdlq/natsjsdlq.go b/messaging/natsjsdlq/natsjsdlq.go index 04aad28..bf831df 100644 --- a/messaging/natsjsdlq/natsjsdlq.go +++ b/messaging/natsjsdlq/natsjsdlq.go @@ -15,6 +15,11 @@ type Msg interface { Metadata() (*nats.MsgMetadata, error) } +type Handler interface { + PublishMessage(msg *nats.Msg, reason string) error + ShouldDLQ(msg Msg) bool +} + // Config holds DLQ configuration type Config struct { // StreamName for the DLQ @@ -42,7 +47,7 @@ type Dependencies struct { } // Handler manages dead letter queue operations -type Handler struct { +type DefaultHandler struct { config Config cm natsjscm.Connector p natsjspub.Publisher @@ -50,12 +55,12 @@ type Handler struct { } // NewHandler creates a new DLQ handler -func NewHandler(deps Dependencies, config Config) (*Handler, error) { +func NewHandler(deps Dependencies, config Config) (Handler, error) { if err := validateConfig(deps, config); err != nil { return nil, fmt.Errorf("invalid DLQ configuration: %w", err) } - handler := &Handler{ + handler := &DefaultHandler{ config: config, ctx: config.Context, cm: deps.ConnectionManager, @@ -102,7 +107,7 @@ func validateConfig(deps Dependencies, config Config) error { } // setup ensures the DLQ stream exists -func (h *Handler) setup() error { +func (h *DefaultHandler) setup() error { streamConfig := jetstream.StreamConfig{ Name: h.config.StreamName, Subjects: []string{h.config.Subject}, @@ -119,7 +124,7 @@ func (h *Handler) setup() error { } // PublishMessage sends a message to the DLQ -func (h *Handler) PublishMessage(msg *nats.Msg, reason string) error { +func (h *DefaultHandler) PublishMessage(msg *nats.Msg, reason string) error { // Clone original message headers headers := nats.Header{} if msg.Header != nil { @@ -150,7 +155,7 @@ func (h *Handler) PublishMessage(msg *nats.Msg, reason string) error { } // ShouldDLQ determines if a message should be sent to DLQ based on delivery count -func (h *Handler) ShouldDLQ(msg Msg) bool { +func (h *DefaultHandler) ShouldDLQ(msg Msg) bool { metadata, err := msg.Metadata() if err != nil { if h.config.ErrorHandler != nil { diff --git a/messaging/natsjsdlq/natsjsdlq_test.go b/messaging/natsjsdlq/natsjsdlq_test.go index 4f5b5e0..93588cb 100644 --- a/messaging/natsjsdlq/natsjsdlq_test.go +++ b/messaging/natsjsdlq/natsjsdlq_test.go @@ -8,87 +8,27 @@ import ( "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" - "github.com/simiancreative/simiango/messaging/natsjscm" "github.com/simiancreative/simiango/messaging/natsjsdlq" + "github.com/simiancreative/simiango/mocks/messaging/natsjscm" + "github.com/simiancreative/simiango/mocks/messaging/natsjspub" "github.com/stretchr/testify/mock" "github.com/tj/assert" ) -// Mock implementations -type MockConnectionManager struct { - mock.Mock -} - -func (m *MockConnectionManager) Connect() error { - args := m.Called() - return args.Error(0) -} - -func (m *MockConnectionManager) GetConnection() *nats.Conn { - args := m.Called() - return args.Get(0).(*nats.Conn) -} - -func (m *MockConnectionManager) GetJetStream() natsjscm.JetStream { - args := m.Called() - return args.Get(0).(jetstream.JetStream) -} - -func (m *MockConnectionManager) EnsureStream( - ctx context.Context, - config jetstream.StreamConfig, -) (natsjscm.JetStream, error) { - args := m.Called(ctx, config) - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).(jetstream.JetStream), args.Error(1) -} - -func (m *MockConnectionManager) Disconnect() error { - args := m.Called() - return args.Error(0) -} - -func (m *MockConnectionManager) IsConnected() bool { - args := m.Called() - return args.Bool(0) -} - -type MockPublisher struct { - mock.Mock -} - -func (m *MockPublisher) Publish(ctx context.Context, msg *nats.Msg) (*jetstream.PubAck, error) { - args := m.Called(ctx, msg) - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).(*jetstream.PubAck), args.Error(1) -} - -type MockMsg struct { - mock.Mock -} - -func (m *MockMsg) Metadata() (*nats.MsgMetadata, error) { - args := m.Called() - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).(*nats.MsgMetadata), args.Error(1) -} - // Test Suite type DLQHandlerTestSuite struct { - mockCM *MockConnectionManager - mockPub *MockPublisher + mockCM *natsjscm.MockConnectionManager + mockPub *natsjspub.MockPublisher + mockJS *natsjscm.MockJetStream ctx context.Context } func (suite *DLQHandlerTestSuite) SetupTest() { - suite.mockCM = new(MockConnectionManager) - suite.mockPub = new(MockPublisher) + suite.mockCM = new(natsjscm.MockConnectionManager) + suite.mockPub = new(natsjspub.MockPublisher) + suite.mockJS = new(natsjscm.MockJetStream) + + suite.mockCM.SetJetStream(suite.mockJS) suite.ctx = context.Background() } @@ -112,7 +52,7 @@ func TestNewHandlerValidation(t *testing.T) { StreamName: "test-dlq", Subject: "test.dlq", MaxDeliveries: 3, - Storage: jetstream.FileStorage, + Storage: jetstream.MemoryStorage, Context: suite.ctx, }, expectedErr: "", @@ -195,7 +135,11 @@ func TestNewHandlerValidation(t *testing.T) { Storage: tc.config.Storage, Retention: jetstream.WorkQueuePolicy, } - suite.mockCM.On("EnsureStream", mock.Anything, streamConfig).Return(nil, nil).Once() + + suite.mockCM. + On("EnsureStream", mock.Anything, streamConfig). + Return(suite.mockJS, nil). + Once() } handler, err := natsjsdlq.NewHandler(tc.deps, tc.config) @@ -273,7 +217,10 @@ func TestShouldDLQ(t *testing.T) { Storage: jetstream.FileStorage, Retention: jetstream.WorkQueuePolicy, } - suite.mockCM.On("EnsureStream", mock.Anything, streamConfig).Return(nil, nil).Once() + suite.mockCM. + On("EnsureStream", mock.Anything, streamConfig). + Return(suite.mockJS, nil). + Once() handler, err := natsjsdlq.NewHandler(deps, validConfig) assert.NoError(t, err) @@ -282,7 +229,7 @@ func TestShouldDLQ(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { // Create a mock message with the configured metadata - mockMsg := new(MockMsg) + mockMsg := new(natsjspub.MockMsg) metadata := &nats.MsgMetadata{ NumDelivered: tc.numDelivered, } @@ -380,7 +327,10 @@ func TestPublishMessage(t *testing.T) { Storage: jetstream.FileStorage, Retention: jetstream.WorkQueuePolicy, } - suite.mockCM.On("EnsureStream", mock.Anything, streamConfig).Return(nil, nil).Once() + suite.mockCM. + On("EnsureStream", mock.Anything, streamConfig). + Return(suite.mockJS, nil). + Once() handler, err := natsjsdlq.NewHandler(deps, validConfig) assert.NoError(t, err) diff --git a/messaging/natsjspub/natsjspub_test.go b/messaging/natsjspub/natsjspub_test.go index ecea436..e98b4cf 100644 --- a/messaging/natsjspub/natsjspub_test.go +++ b/messaging/natsjspub/natsjspub_test.go @@ -8,12 +8,14 @@ import ( "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" "github.com/simiancreative/simiango/messaging/natsjspub" + "github.com/simiancreative/simiango/mocks/circuitbreaker" + "github.com/simiancreative/simiango/mocks/messaging/natsjscm" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) func TestNewPublisher(t *testing.T) { - mockCM := new(MockConnectionManager) + mockCM := new(natsjscm.MockConnectionManager) config := natsjspub.Config{ StreamName: "test-stream", Subject: "test-subject", @@ -23,7 +25,7 @@ func TestNewPublisher(t *testing.T) { t.Run("success", func(t *testing.T) { mockCM. On("EnsureStream", mock.Anything, mock.Anything). - Return(new(MockJetStream), nil) + Return(new(natsjscm.MockJetStream), nil) mockCM. On("IsConnected"). @@ -59,16 +61,16 @@ func TestNewPublisher(t *testing.T) { } type Mocks struct { - cm *MockConnectionManager - js *MockJetStream - cb *MockCircuitBreaker + cm *natsjscm.MockConnectionManager + js *natsjscm.MockJetStream + cb *circuitbreaker.MockCircuitBreaker } func createPublisher(t *testing.T) (natsjspub.PublishMulti, *Mocks) { mocks := &Mocks{ - cm: new(MockConnectionManager), - js: new(MockJetStream), - cb: new(MockCircuitBreaker), + cm: new(natsjscm.MockConnectionManager), + js: new(natsjscm.MockJetStream), + cb: new(circuitbreaker.MockCircuitBreaker), } mocks.cm.SetJetStream(mocks.js) diff --git a/messaging/natsjsstrategypull/pull.go b/messaging/natsjsstrategypull/pull.go new file mode 100644 index 0000000..df56c5b --- /dev/null +++ b/messaging/natsjsstrategypull/pull.go @@ -0,0 +1,252 @@ +package natsjsstrategypull + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/simiancreative/simiango/circuitbreaker" + "github.com/simiancreative/simiango/logger" + "github.com/simiancreative/simiango/messaging/natsjscm" + "github.com/simiancreative/simiango/messaging/natsjscon" +) + +type Logger interface { + Debugf(format string, args ...interface{}) + Debug(args ...any) + Error(args ...any) +} + +// PullStrategyConfig holds configuration specific to pull-based consumption +type Config struct { + // ConsumerName is the name for this consumer + ConsumerName string + + // BatchSize for processing + BatchSize int + + // PollTimeout is how long to wait when polling for new messages + PollTimeout time.Duration + + // AckWait is how long the server waits for an ack before redelivery + AckWait time.Duration + + // MaxAckPending is the maximum number of pending acks + MaxAckPending int + + // MaxRetries is the maximum number of retries + MaxRetries int + + // RetentionPolicy is the retention policy for the stream + RetentionPolicy jetstream.RetentionPolicy + + // Breaker is the configuration for the circuit breaker + Breaker circuitbreaker.Breaker +} + +// PullStrategy implements pull-based message consumption +type PullStrategy struct { + config Config + cm natsjscm.Connector + cb circuitbreaker.Breaker + consumer jetstream.Consumer + + streamName string + subject string + logger Logger +} + +// NewPullStrategy creates a new pull-based consumption strategy +func New(config Config) (*PullStrategy, error) { + if config.ConsumerName == "" { + return nil, errors.New("consumer name is required") + } + + // Set defaults for unspecified configuration + if config.BatchSize <= 0 { + config.BatchSize = 100 + } + + if config.PollTimeout <= 0 { + config.PollTimeout = 1 * time.Second + } + + if config.AckWait <= 0 { + config.AckWait = 30 * time.Second + } + + if config.MaxAckPending <= 0 { + config.MaxAckPending = 1000 + } + + if config.MaxRetries <= 0 { + config.MaxRetries = 3 + } + + return &PullStrategy{ + config: config, + }, nil +} + +// Setup creates or updates the JetStream consumer +func (p *PullStrategy) Setup(ctx context.Context) error { + p.streamName = ctx.Value(natsjscon.CtxKey("stream-name")).(string) + p.subject = ctx.Value(natsjscon.CtxKey("subject")).(string) + p.logger = ctx.Value(natsjscon.CtxKey("logger")).(Logger) + p.cm = ctx.Value(natsjscon.CtxKey("connection-manager")).(natsjscm.Connector) + p.cb = p.config.Breaker + + err := p.ensureStream(ctx) + if err != nil { + return fmt.Errorf("failed to ensure stream: %w", err) + } + + return p.createConsumer(ctx) +} + +// ensureStream makes sure the configured stream exists +func (p *PullStrategy) ensureStream(ctx context.Context) error { + p.logger.Debugf("ensuring stream is connected") + + // Get JetStream connection + if !p.cm.IsConnected() { + p.logger.Debugf("connecting to NATS") + if err := p.cm.Connect(); err != nil { + return fmt.Errorf("failed to connect to NATS: %w", err) + } + } + + // Set default retention policy if not configured + retentionPolicy := p.config.RetentionPolicy + if retentionPolicy == 0 { + retentionPolicy = jetstream.WorkQueuePolicy + } + + // Create stream config + streamConfig := jetstream.StreamConfig{ + Name: p.streamName, + Subjects: []string{fmt.Sprintf("%v.>", p.streamName)}, + Retention: retentionPolicy, + } + + // Ensure stream exists + p.logger.Debugf("ensuring stream %s exists", p.streamName) + _, err := p.cm.EnsureStream(ctx, streamConfig) + return err +} + +// Consume pulls a batch of messages and converts them to our Message type +func (p *PullStrategy) Consume(ctx context.Context, workerID int) (<-chan jetstream.Msg, error) { + // Check if we need to reconnect/reinitialize + if err := p.ensureConsumerActive(ctx); err != nil { + return nil, fmt.Errorf("failed to ensure consumer: %w", err) + } + + if p.cb != nil && !p.cb.Allow() { + p.logger.Debug("circuit breaker open", logger.Fields{ + "worker_id": workerID, + "state": p.cb.GetState(), + }) + return nil, errors.New("circuit breaker open") + } + + p.logger.Debug("pulling messages", logger.Fields{ + "worker_id": workerID, + "batch_size": p.config.BatchSize, + "timeout": p.config.PollTimeout, + }) + + // Record attempt start if circuit breaker is configured + var cbRecorded bool + if p.cb != nil { + cbRecorded = p.cb.RecordStart() + if !cbRecorded { + return nil, fmt.Errorf("circuit breaker rejected request") + } + } + + // Fetch messages from stream + jsMsgs, err := p.consumer.Fetch( + p.config.BatchSize, + jetstream.FetchMaxWait(p.config.PollTimeout), + ) + + // Record result in circuit breaker + if p.cb != nil && cbRecorded { + p.cb.RecordResult(err == nil) + } + + if err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + // This is normal for pull-based when no messages are available + return nil, nil + } + return nil, fmt.Errorf("failed to fetch messages: %w", err) + } + + return jsMsgs.Messages(), nil +} + +// ensureConsumerActive checks the connection status and reinitializes the consumer if needed +func (p *PullStrategy) ensureConsumerActive(ctx context.Context) error { + if p.consumer != nil && p.cm.IsConnected() { + // Check if the JS context in the connection manager has changed + currentJS := p.cm.GetJetStream() + if currentJS != nil { + return nil // All good, connection is active + } + } + + p.logger.Debug("reconnecting consumer") + + // Ensure connection and stream + if err := p.ensureStream(ctx); err != nil { + return fmt.Errorf("failed to ensure stream: %w", err) + } + + // Create consumer + if err := p.createConsumer(ctx); err != nil { + return fmt.Errorf("failed to create consumer: %w", err) + } + + p.logger.Debug("consumer reconnected successfully") + return nil +} + +func (p *PullStrategy) createConsumer(ctx context.Context) error { + // Get current JetStream instance + js := p.cm.GetJetStream() + if js == nil { + return errors.New("JetStream connection not available") + } + + // Get stream + stream, err := js.Stream(ctx, p.streamName) + if err != nil { + return fmt.Errorf("failed to get stream: %w", err) + } + + // Configure durable pull consumer + consumerConfig := jetstream.ConsumerConfig{ + Name: p.config.ConsumerName, + Durable: p.config.ConsumerName, + AckPolicy: jetstream.AckExplicitPolicy, + AckWait: p.config.AckWait, + MaxAckPending: p.config.MaxAckPending, + FilterSubject: p.subject, + DeliverPolicy: jetstream.DeliverAllPolicy, + MaxDeliver: p.config.MaxRetries + 1, // Include first delivery + } + + // Create or update the consumer + consumer, err := stream.CreateOrUpdateConsumer(ctx, consumerConfig) + if err != nil { + return fmt.Errorf("failed to create/update consumer: %w", err) + } + + p.consumer = consumer + + return nil +} diff --git a/messaging/natsjsstrategypull/pull_test.go b/messaging/natsjsstrategypull/pull_test.go new file mode 100644 index 0000000..38fc4b9 --- /dev/null +++ b/messaging/natsjsstrategypull/pull_test.go @@ -0,0 +1,73 @@ +package natsjsstrategypull_test + +import ( + "context" + "testing" + + "github.com/simiancreative/simiango/messaging/natsjscon" + "github.com/simiancreative/simiango/messaging/natsjsstrategypull" + conmock "github.com/simiancreative/simiango/mocks/messaging/natsjscon" + "github.com/stretchr/testify/assert" +) + +func TestNew(t *testing.T) { + config := natsjsstrategypull.Config{ + ConsumerName: "test-consumer", + } + strategy, err := natsjsstrategypull.New(config) + + assert.NoError(t, err) + assert.NotNil(t, strategy) +} + +func TestSetup(t *testing.T) { + d := conmock.NewDependencies() + + config := natsjsstrategypull.Config{ + ConsumerName: "test-consumer", + } + strategy, err := natsjsstrategypull.New(config) + assert.NoError(t, err) + + ctx := context.TODO() + + key := natsjscon.CtxKey("stream-name") + ctx = context.WithValue(ctx, key, "test") + key = natsjscon.CtxKey("subject") + ctx = context.WithValue(ctx, key, "test.subject") + key = natsjscon.CtxKey("logger") + ctx = context.WithValue(ctx, key, d.Logger) + key = natsjscon.CtxKey("connection-manager") + ctx = context.WithValue(ctx, key, d.Connector) + + err = strategy.Setup(ctx) + assert.NoError(t, err) +} + +func TestConsume(t *testing.T) { + d := conmock.NewDependencies() + + config := natsjsstrategypull.Config{ + ConsumerName: "test-consumer", + } + strategy, err := natsjsstrategypull.New(config) + assert.NoError(t, err) + + ctx := context.TODO() + + key := natsjscon.CtxKey("stream-name") + ctx = context.WithValue(ctx, key, "test") + key = natsjscon.CtxKey("subject") + ctx = context.WithValue(ctx, key, "test.subject") + key = natsjscon.CtxKey("logger") + ctx = context.WithValue(ctx, key, d.Logger) + key = natsjscon.CtxKey("connection-manager") + ctx = context.WithValue(ctx, key, d.Connector) + + err = strategy.Setup(ctx) + assert.NoError(t, err) + + msgs, err := strategy.Consume(ctx, 1) + assert.NoError(t, err) + assert.NotNil(t, msgs) +} diff --git a/messaging/natsjspub/natsjspub_mcb_test.go b/mocks/circuitbreaker/circuitbreaker.go similarity index 96% rename from messaging/natsjspub/natsjspub_mcb_test.go rename to mocks/circuitbreaker/circuitbreaker.go index dbd0744..5d2880a 100644 --- a/messaging/natsjspub/natsjspub_mcb_test.go +++ b/mocks/circuitbreaker/circuitbreaker.go @@ -1,4 +1,4 @@ -package natsjspub_test +package circuitbreaker import ( "github.com/sanity-io/litter" diff --git a/mocks/logger/logger.go b/mocks/logger/logger.go new file mode 100644 index 0000000..3b5a070 --- /dev/null +++ b/mocks/logger/logger.go @@ -0,0 +1,23 @@ +package logger + +import "github.com/stretchr/testify/mock" + +type MockLogger struct { + mock.Mock +} + +func (m *MockLogger) Debugf(fmt string, args ...any) { + m.Called(args) +} + +func (m *MockLogger) Errorf(fmt string, args ...any) { + m.Called(args) +} + +func (m *MockLogger) Debug(args ...any) { + m.Called(args) +} + +func (m *MockLogger) Error(args ...any) { + m.Called(args) +} diff --git a/mocks/messaging/natsjscm/batch.go b/mocks/messaging/natsjscm/batch.go new file mode 100644 index 0000000..b8a2eaf --- /dev/null +++ b/mocks/messaging/natsjscm/batch.go @@ -0,0 +1,39 @@ +package natsjscm + +// type MockMessageBatch interface { +// Messages() <-chan jetstream.Msg +// Error() error +// } + +import ( + "github.com/nats-io/nats.go/jetstream" +) + +// MockMessageBatch is a mock implementation of the MockMessageBatch interface. +type MockMessageBatch struct { + Msgs []jetstream.Msg + Err error +} + +// NewMockMessageBatch creates a new instance of MockMessageBatch. +func NewMockMessageBatch(messages []jetstream.Msg, err error) *MockMessageBatch { + return &MockMessageBatch{ + Msgs: messages, + Err: err, + } +} + +// Messages returns a channel of jetstream.Msg. +func (m *MockMessageBatch) Messages() <-chan jetstream.Msg { + msgChan := make(chan jetstream.Msg, len(m.Msgs)) + for _, msg := range m.Msgs { + msgChan <- msg + } + close(msgChan) + return msgChan +} + +// Error returns an error. +func (m *MockMessageBatch) Error() error { + return m.Err +} diff --git a/mocks/messaging/natsjscm/consumer.go b/mocks/messaging/natsjscm/consumer.go new file mode 100644 index 0000000..3290402 --- /dev/null +++ b/mocks/messaging/natsjscm/consumer.go @@ -0,0 +1,143 @@ +package natsjscm + +import ( + "context" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/mock" +) + +// MockConsumer is a stretcher-based mock for the jetstream.Consumer interface +type MockConsumer struct { + mock.Mock +} + +// Fetch mocks the Consumer.Fetch method +func (m *MockConsumer) Fetch( + batch int, + opts ...jetstream.FetchOpt, +) (jetstream.MessageBatch, error) { + args := []interface{}{batch} + for _, opt := range opts { + args = append(args, opt) + } + result := m.Called(args...) + + var messageBatch jetstream.MessageBatch + if result.Get(0) != nil { + messageBatch = result.Get(0).(jetstream.MessageBatch) + } + + return messageBatch, result.Error(1) +} + +// FetchBytes mocks the Consumer.FetchBytes method +func (m *MockConsumer) FetchBytes( + maxBytes int, + opts ...jetstream.FetchOpt, +) (jetstream.MessageBatch, error) { + args := []interface{}{maxBytes} + for _, opt := range opts { + args = append(args, opt) + } + result := m.Called(args...) + + var messageBatch jetstream.MessageBatch + if result.Get(0) != nil { + messageBatch = result.Get(0).(jetstream.MessageBatch) + } + + return messageBatch, result.Error(1) +} + +// FetchNoWait mocks the Consumer.FetchNoWait method +func (m *MockConsumer) FetchNoWait(batch int) (jetstream.MessageBatch, error) { + result := m.Called(batch) + + var messageBatch jetstream.MessageBatch + if result.Get(0) != nil { + messageBatch = result.Get(0).(jetstream.MessageBatch) + } + + return messageBatch, result.Error(1) +} + +// Consume mocks the Consumer.Consume method +func (m *MockConsumer) Consume( + handler jetstream.MessageHandler, + opts ...jetstream.PullConsumeOpt, +) (jetstream.ConsumeContext, error) { + args := []interface{}{handler} + for _, opt := range opts { + args = append(args, opt) + } + result := m.Called(args...) + + var consumeContext jetstream.ConsumeContext + if result.Get(0) != nil { + consumeContext = result.Get(0).(jetstream.ConsumeContext) + } + + return consumeContext, result.Error(1) +} + +// Messages mocks the Consumer.Messages method +func (m *MockConsumer) Messages( + opts ...jetstream.PullMessagesOpt, +) (jetstream.MessagesContext, error) { + args := []interface{}{} + for _, opt := range opts { + args = append(args, opt) + } + result := m.Called(args...) + + var messagesContext jetstream.MessagesContext + if result.Get(0) != nil { + messagesContext = result.Get(0).(jetstream.MessagesContext) + } + + return messagesContext, result.Error(1) +} + +// Next mocks the Consumer.Next method +func (m *MockConsumer) Next(opts ...jetstream.FetchOpt) (jetstream.Msg, error) { + args := []interface{}{} + for _, opt := range opts { + args = append(args, opt) + } + result := m.Called(args...) + + var msg jetstream.Msg + if result.Get(0) != nil { + msg = result.Get(0).(jetstream.Msg) + } + + return msg, result.Error(1) +} + +// Info mocks the Consumer.Info method +func (m *MockConsumer) Info(ctx context.Context) (*jetstream.ConsumerInfo, error) { + result := m.Called(ctx) + + var consumerInfo *jetstream.ConsumerInfo + if result.Get(0) != nil { + consumerInfo = result.Get(0).(*jetstream.ConsumerInfo) + } + + return consumerInfo, result.Error(1) +} + +// CachedInfo mocks the Consumer.CachedInfo method +func (m *MockConsumer) CachedInfo() *jetstream.ConsumerInfo { + result := m.Called() + + var consumerInfo *jetstream.ConsumerInfo + if result.Get(0) != nil { + consumerInfo = result.Get(0).(*jetstream.ConsumerInfo) + } + + return consumerInfo +} + +// Ensure the mock implements the interface +var _ jetstream.Consumer = (*MockConsumer)(nil) diff --git a/mocks/messaging/natsjscm/msg.go b/mocks/messaging/natsjscm/msg.go new file mode 100644 index 0000000..7517b04 --- /dev/null +++ b/mocks/messaging/natsjscm/msg.go @@ -0,0 +1,107 @@ +package natsjscm + +import ( + "context" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/mock" +) + +type MockJetStreamMsg struct { + mock.Mock +} + +func NewMockJetStreamMsg(err error) *MockJetStreamMsg { + m := &MockJetStreamMsg{} + m.On("Metadata").Return(&jetstream.MsgMetadata{}, nil) + m.On("Ack").Return(err) + + return m +} + +// Metadata returns [MsgMetadata] for a JetStream message. +func (m *MockJetStreamMsg) Metadata() (*jetstream.MsgMetadata, error) { + called := m.Called() + return called.Get(0).(*jetstream.MsgMetadata), called.Error(1) +} + +// Data returns the message body. +func (m *MockJetStreamMsg) Data() []byte { + called := m.Called() + return called.Get(0).([]byte) +} + +// Headers returns a map of headers for a message. +func (m *MockJetStreamMsg) Headers() nats.Header { + called := m.Called() + return called.Get(0).(nats.Header) +} + +// Subject returns a subject on which a message is published. +func (m *MockJetStreamMsg) Subject() string { + called := m.Called() + return called.String(0) +} + +// Reply returns a reply subject for a JetStream message. +func (m *MockJetStreamMsg) Reply() string { + called := m.Called() + return called.String(0) +} + +// Ack acknowledges a message. This tells the server that the message was +// successfully processed and it can move on to the next message. +func (m *MockJetStreamMsg) Ack() error { + call := m.Called() + return call.Error(0) +} + +// DoubleAck acknowledges a message and waits for ack reply from the server. +// While it impacts performance, it is useful for scenarios where +// message loss is not acceptable. +func (m *MockJetStreamMsg) DoubleAck(ctx context.Context) error { + call := m.Called(ctx) + return call.Error(0) +} + +// Nak negatively acknowledges a message. This tells the server to +// redeliver the message. +func (m *MockJetStreamMsg) Nak() error { + call := m.Called() + return call.Error(0) +} + +// NakWithDelay negatively acknowledges a message. This tells the server +// to redeliver the message after the given delay. +func (m *MockJetStreamMsg) NakWithDelay(delay time.Duration) error { + called := m.Called(delay) + return called.Error(0) +} + +// InProgress tells the server that this message is being worked on. It +// resets the redelivery timer on the server. +func (m *MockJetStreamMsg) InProgress() error { + called := m.Called() + return called.Error(0) +} + +// Term tells the server to not redeliver this message, regardless of +// the value of MaxDeliver. +func (m *MockJetStreamMsg) Term() error { + called := m.Called() + return called.Error(0) +} + +// TermWithReason tells the server to not redeliver this message, regardless of +// the value of MaxDeliver. The provided reason will be included in JetStream +// advisory event sent by the server. +// +// Note: This will only work with JetStream servers >= 2.10.4. +// For older servers, TermWithReason will be ignored by the server and the message +// will not be terminated. +func (m *MockJetStreamMsg) TermWithReason(reason string) error { + called := m.Called(reason) + return called.Error(0) +} diff --git a/messaging/natsjspub/natsjspub_mcm_test.go b/mocks/messaging/natsjscm/natsjscm.go similarity index 98% rename from messaging/natsjspub/natsjspub_mcm_test.go rename to mocks/messaging/natsjscm/natsjscm.go index b658fc4..7a7a010 100644 --- a/messaging/natsjspub/natsjspub_mcm_test.go +++ b/mocks/messaging/natsjscm/natsjscm.go @@ -1,4 +1,4 @@ -package natsjspub_test +package natsjscm import ( "context" diff --git a/mocks/messaging/natsjscm/stream.go b/mocks/messaging/natsjscm/stream.go new file mode 100644 index 0000000..1a7ef8a --- /dev/null +++ b/mocks/messaging/natsjscm/stream.go @@ -0,0 +1,344 @@ +package natsjscm + +import ( + "context" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/mock" +) + +// MockStream is a mock implementation of the Stream interface +type MockStream struct { + mock.Mock +} + +// CachedInfo provides a mock function with given fields: +func (_m *MockStream) CachedInfo() *jetstream.StreamInfo { + ret := _m.Called() + + var r0 *jetstream.StreamInfo + if rf, ok := ret.Get(0).(func() *jetstream.StreamInfo); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*jetstream.StreamInfo) + } + } + + return r0 +} + +// Consumer provides a mock function with given fields: ctx, name +func (_m *MockStream) Consumer(ctx context.Context, name string) (jetstream.Consumer, error) { + ret := _m.Called(ctx, name) + + var r0 jetstream.Consumer + if rf, ok := ret.Get(0).(func(context.Context, string) jetstream.Consumer); ok { + r0 = rf(ctx, name) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(jetstream.Consumer) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, name) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ConsumerNames provides a mock function with given fields: ctx +func (_m *MockStream) ConsumerNames(ctx context.Context) jetstream.ConsumerNameLister { + ret := _m.Called(ctx) + + var r0 jetstream.ConsumerNameLister + if rf, ok := ret.Get(0).(func(context.Context) jetstream.ConsumerNameLister); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(jetstream.ConsumerNameLister) + } + } + + return r0 +} + +// CreateConsumer provides a mock function with given fields: ctx, cfg +func (_m *MockStream) CreateConsumer( + ctx context.Context, + cfg jetstream.ConsumerConfig, +) (jetstream.Consumer, error) { + ret := _m.Called(ctx, cfg) + + var r0 jetstream.Consumer + if rf, ok := ret.Get(0).(func(context.Context, jetstream.ConsumerConfig) jetstream.Consumer); ok { + r0 = rf(ctx, cfg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(jetstream.Consumer) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, jetstream.ConsumerConfig) error); ok { + r1 = rf(ctx, cfg) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// CreateOrUpdateConsumer provides a mock function with given fields: ctx, cfg +func (_m *MockStream) CreateOrUpdateConsumer( + ctx context.Context, + cfg jetstream.ConsumerConfig, +) (jetstream.Consumer, error) { + ret := _m.Called(ctx, cfg) + + var r0 jetstream.Consumer + if rf, ok := ret.Get(0).(func(context.Context, jetstream.ConsumerConfig) jetstream.Consumer); ok { + r0 = rf(ctx, cfg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(jetstream.Consumer) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, jetstream.ConsumerConfig) error); ok { + r1 = rf(ctx, cfg) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DeleteConsumer provides a mock function with given fields: ctx, name +func (_m *MockStream) DeleteConsumer(ctx context.Context, name string) error { + ret := _m.Called(ctx, name) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, name) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DeleteMsg provides a mock function with given fields: ctx, seq +func (_m *MockStream) DeleteMsg(ctx context.Context, seq uint64) error { + ret := _m.Called(ctx, seq) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) error); ok { + r0 = rf(ctx, seq) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetLastMsgForSubject provides a mock function with given fields: ctx, subject +func (_m *MockStream) GetLastMsgForSubject( + ctx context.Context, + subject string, +) (*jetstream.RawStreamMsg, error) { + ret := _m.Called(ctx, subject) + + var r0 *jetstream.RawStreamMsg + if rf, ok := ret.Get(0).(func(context.Context, string) *jetstream.RawStreamMsg); ok { + r0 = rf(ctx, subject) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*jetstream.RawStreamMsg) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, subject) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetMsg provides a mock function with given fields: ctx, seq, opts +func (_m *MockStream) GetMsg( + ctx context.Context, + seq uint64, + opts ...jetstream.GetMsgOpt, +) (*jetstream.RawStreamMsg, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, seq) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *jetstream.RawStreamMsg + if rf, ok := ret.Get(0).(func(context.Context, uint64, ...jetstream.GetMsgOpt) *jetstream.RawStreamMsg); ok { + r0 = rf(ctx, seq, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*jetstream.RawStreamMsg) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, uint64, ...jetstream.GetMsgOpt) error); ok { + r1 = rf(ctx, seq, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Info provides a mock function with given fields: ctx, opts +func (_m *MockStream) Info( + ctx context.Context, + opts ...jetstream.StreamInfoOpt, +) (*jetstream.StreamInfo, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *jetstream.StreamInfo + if rf, ok := ret.Get(0).(func(context.Context, ...jetstream.StreamInfoOpt) *jetstream.StreamInfo); ok { + r0 = rf(ctx, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*jetstream.StreamInfo) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, ...jetstream.StreamInfoOpt) error); ok { + r1 = rf(ctx, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ListConsumers provides a mock function with given fields: ctx +func (_m *MockStream) ListConsumers(ctx context.Context) jetstream.ConsumerInfoLister { + ret := _m.Called(ctx) + + var r0 jetstream.ConsumerInfoLister + if rf, ok := ret.Get(0).(func(context.Context) jetstream.ConsumerInfoLister); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(jetstream.ConsumerInfoLister) + } + } + + return r0 +} + +// OrderedConsumer provides a mock function with given fields: ctx, cfg +func (_m *MockStream) OrderedConsumer( + ctx context.Context, + cfg jetstream.OrderedConsumerConfig, +) (jetstream.Consumer, error) { + ret := _m.Called(ctx, cfg) + + var r0 jetstream.Consumer + if rf, ok := ret.Get(0).(func(context.Context, jetstream.OrderedConsumerConfig) jetstream.Consumer); ok { + r0 = rf(ctx, cfg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(jetstream.Consumer) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, jetstream.OrderedConsumerConfig) error); ok { + r1 = rf(ctx, cfg) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Purge provides a mock function with given fields: ctx, opts +func (_m *MockStream) Purge(ctx context.Context, opts ...jetstream.StreamPurgeOpt) error { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, ...jetstream.StreamPurgeOpt) error); ok { + r0 = rf(ctx, opts...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SecureDeleteMsg provides a mock function with given fields: ctx, seq +func (_m *MockStream) SecureDeleteMsg(ctx context.Context, seq uint64) error { + ret := _m.Called(ctx, seq) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) error); ok { + r0 = rf(ctx, seq) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// UpdateConsumer provides a mock function with given fields: ctx, cfg +func (_m *MockStream) UpdateConsumer( + ctx context.Context, + cfg jetstream.ConsumerConfig, +) (jetstream.Consumer, error) { + ret := _m.Called(ctx, cfg) + + var r0 jetstream.Consumer + if rf, ok := ret.Get(0).(func(context.Context, jetstream.ConsumerConfig) jetstream.Consumer); ok { + r0 = rf(ctx, cfg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(jetstream.Consumer) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, jetstream.ConsumerConfig) error); ok { + r1 = rf(ctx, cfg) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/mocks/messaging/natsjscon/natsjscon.go b/mocks/messaging/natsjscon/natsjscon.go new file mode 100644 index 0000000..0689555 --- /dev/null +++ b/mocks/messaging/natsjscon/natsjscon.go @@ -0,0 +1,142 @@ +package natsjscon + +import ( + "context" + "testing" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/simiancreative/simiango/messaging/natsjscon" + "github.com/simiancreative/simiango/mocks/logger" + "github.com/simiancreative/simiango/mocks/messaging/natsjscm" + "github.com/simiancreative/simiango/mocks/messaging/natsjsdlq" + "github.com/stretchr/testify/mock" + "github.com/tj/assert" +) + +func NewDependencies() *Dependencies { + msg := &natsjscm.MockJetStreamMsg{} + msg.On("Headers").Return(nats.Header{}, nil) + msg.On("Metadata").Return(new(jetstream.MsgMetadata), nil) + msg.On("Ack").Return(nil) + msgs := []jetstream.Msg{msg, msg, msg, msg} + + batch := natsjscm.NewMockMessageBatch(msgs, nil) + + processed := make(map[jetstream.Msg]natsjscon.ProcessStatus) + for _, msg := range msgs { + processed[msg] = natsjscon.Success + } + + d := &Dependencies{ + Logger: &logger.MockLogger{}, + Jetstream: &natsjscm.MockJetStream{}, + Connector: &natsjscm.MockConnectionManager{}, + Stream: &natsjscm.MockStream{}, + Consumer: &natsjscm.MockConsumer{}, + Batch: batch, + DLQHandler: &natsjsdlq.MockDLQHandler{}, + Strategy: &MockStrategy{}, + Processor: &ProcessorMock{}, + } + + d.Logger.On("Debug", mock.Anything).Maybe() + d.Logger.On("Debugf", mock.Anything, mock.Anything).Maybe() + d.Logger.On("Error", mock.Anything).Maybe() + + d.Connector.On("Connect").Return(nil).Maybe() + d.Connector.On("IsConnected").Return(true).Maybe() + d.Connector.On("EnsureStream", mock.Anything, mock.Anything).Return(d.Jetstream, nil).Maybe() + d.Connector.On("GetJetStream").Return(d.Jetstream).Maybe() + d.Connector.SetJetStream(d.Jetstream) + + d.Jetstream.On("Stream", mock.Anything, mock.Anything). + Return(d.Stream, nil). + Maybe() + + d.Stream.On("CreateOrUpdateConsumer", mock.Anything, mock.Anything).Return(d.Consumer, nil) + d.Consumer.On("Fetch", mock.Anything, mock.Anything).Return(d.Batch, nil) + + d.Processor.On("Process", mock.Anything, mock.Anything).Return(processed).Maybe() + + // Setup default strategy mocks to avoid test failures + d.Strategy.On("Setup", mock.Anything).Return(nil).Maybe() + d.Strategy.On("Consume", mock.Anything, mock.Anything).Return(d.Batch.Messages(), nil).Maybe() + + return d +} + +type Dependencies struct { + Logger *logger.MockLogger + Jetstream *natsjscm.MockJetStream + Connector *natsjscm.MockConnectionManager + Stream *natsjscm.MockStream + Consumer *natsjscm.MockConsumer + Batch *natsjscm.MockMessageBatch + DLQHandler *natsjsdlq.MockDLQHandler + Strategy *MockStrategy + Processor *ProcessorMock +} + +func (d *Dependencies) NewConsumer( + t *testing.T, + configs ...natsjscon.ConsumerConfig, +) *natsjscon.Consumer { + if len(configs) == 0 { + configs = append(configs, natsjscon.ConsumerConfig{ + StreamName: "test", + ConsumerName: "test-consumer", + Subject: "test.subject.>", + }) + } + + config := configs[0] + + consumer := natsjscon. + NewConsumer(config). + SetLogger(d.Logger). + SetConnector(d.Connector). + SetDLQHandler(d.DLQHandler). + SetStrategy(d.Strategy). + SetProcessor(d.Processor.Process) + + assert.NotNil(t, consumer) + + return consumer +} + +type MockStrategy struct { + mock.Mock +} + +func (m *MockStrategy) Setup(ctx context.Context) error { + c := m.Called(ctx) + return c.Error(0) +} + +func (m *MockStrategy) Consume(ctx context.Context, workerID int) (<-chan jetstream.Msg, error) { + + c := m.Called(ctx, workerID) + result, err := c.Get(0).(<-chan jetstream.Msg), c.Error(1) + + return result, err +} + +func (m *MockStrategy) Reset(batch *natsjscm.MockMessageBatch) { + m.ExpectedCalls = nil + + m.On("Setup", mock.Anything).Return(nil).Maybe() + m.On("Consume", mock.Anything, mock.Anything).Return(batch.Messages(), nil).Maybe() +} + +type ProcessorMock struct { + mock.Mock +} + +func (p *ProcessorMock) Process( + ctx context.Context, + msgs <-chan jetstream.Msg, +) map[jetstream.Msg]natsjscon.ProcessStatus { + args := p.Called(ctx, msgs) + return args.Get(0).(map[jetstream.Msg]natsjscon.ProcessStatus) +} diff --git a/mocks/messaging/natsjsdlq/natsjsdlq.go b/mocks/messaging/natsjsdlq/natsjsdlq.go new file mode 100644 index 0000000..73ebbf5 --- /dev/null +++ b/mocks/messaging/natsjsdlq/natsjsdlq.go @@ -0,0 +1,21 @@ +package natsjsdlq + +import ( + "github.com/nats-io/nats.go" + "github.com/simiancreative/simiango/messaging/natsjsdlq" + "github.com/stretchr/testify/mock" +) + +type MockDLQHandler struct { + mock.Mock +} + +func (m *MockDLQHandler) PublishMessage(msg *nats.Msg, reason string) error { + args := m.Called(msg, reason) + return args.Error(0) +} + +func (m *MockDLQHandler) ShouldDLQ(msg natsjsdlq.Msg) bool { + args := m.Called(msg) + return args.Bool(0) +} diff --git a/mocks/messaging/natsjspub/natsjspub.go b/mocks/messaging/natsjspub/natsjspub.go new file mode 100644 index 0000000..a94ee1e --- /dev/null +++ b/mocks/messaging/natsjspub/natsjspub.go @@ -0,0 +1,33 @@ +package natsjspub + +import ( + "context" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/mock" +) + +type MockPublisher struct { + mock.Mock +} + +func (m *MockPublisher) Publish(ctx context.Context, msg *nats.Msg) (*jetstream.PubAck, error) { + args := m.Called(ctx, msg) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*jetstream.PubAck), args.Error(1) +} + +type MockMsg struct { + mock.Mock +} + +func (m *MockMsg) Metadata() (*nats.MsgMetadata, error) { + args := m.Called() + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*nats.MsgMetadata), args.Error(1) +} diff --git a/simian-go/app/tryitout/connection/connection.go b/simian-go/app/tryitout/connection/connection.go new file mode 100644 index 0000000..abeaa56 --- /dev/null +++ b/simian-go/app/tryitout/connection/connection.go @@ -0,0 +1,25 @@ +package connection + +import ( + "github.com/simiancreative/simiango/logger" + "github.com/simiancreative/simiango/messaging/natsjscm" +) + +var Shared natsjscm.Connector + +func init() { + log := logger.New() + + connectionConfig := natsjscm.ConnectionConfig{ + Logger: log, + URL: "nats://localhost:4222", + } + + connector, err := natsjscm.NewConnectionManager(connectionConfig) + if err != nil { + log.Errorf("failed to create connection manager: %v", err) + panic(err) + } + + Shared = connector +} diff --git a/simian-go/app/tryitout/consumer/index.go b/simian-go/app/tryitout/consumer/index.go new file mode 100644 index 0000000..5602ca9 --- /dev/null +++ b/simian-go/app/tryitout/consumer/index.go @@ -0,0 +1,112 @@ +package consumer + +import ( + "context" + + "github.com/nats-io/nats.go/jetstream" + "github.com/spf13/cobra" + + "github.com/simiancreative/simiango/circuitbreaker" + "github.com/simiancreative/simiango/errors" + "github.com/simiancreative/simiango/logger" + "github.com/simiancreative/simiango/messaging/natsjscm" + "github.com/simiancreative/simiango/messaging/natsjscon" + "github.com/simiancreative/simiango/messaging/natsjsstrategypull" + "github.com/simiancreative/simiango/sig" + + cli "github.com/simiancreative/simiango/simian-go/app/tryitout" + "github.com/simiancreative/simiango/simian-go/app/tryitout/connection" +) + +var cmd = &cobra.Command{ + Use: "consumer", + Short: "run a nats consumer", + RunE: func(cmd *cobra.Command, args []string) error { + return run() + }, +} + +func init() { + cli.Cmd.AddCommand(cmd) +} + +func run() error { + logger := logger.New() + + connector := connection.Shared + breaker := newBreaker() + strategy := newStrategy(breaker) + consumer := newConsumer(logger, connector, strategy) + + done, exit := sig.New().Catch() + + err := consumer.Start(done) + if err != nil { + return errors.Wrap(err, "failed to start consumer") + } + + <-exit.Done() + + return nil +} + +func newConsumer( + logger natsjscon.Logger, + connector natsjscm.Connector, + strategy natsjscon.ConsumptionStrategy, +) natsjscon.Consumer { + return *natsjscon.NewConsumer(natsjscon.ConsumerConfig{ + StreamName: "test", + ConsumerName: "test", + Subject: "test.something.>", + }). + SetLogger(logger). + SetConnector(connector). + SetStrategy(strategy). + SetProcessor(processor) +} + +func newBreaker() circuitbreaker.Breaker { + breaker, err := circuitbreaker.NewDefault() + if err != nil { + logger.Errorf("failed to create circuit breaker: %v", err) + panic(err) + } + + return breaker +} + +func newStrategy(breaker circuitbreaker.Breaker) natsjscon.ConsumptionStrategy { + strategy, err := natsjsstrategypull.New(natsjsstrategypull.Config{ + ConsumerName: "test-consumer", + Breaker: breaker, + }) + + if err != nil { + logger.Errorf("failed to create pull strategy: %v", err) + panic(err) + } + + return strategy +} + +func processor( + ctx context.Context, + msgs <-chan jetstream.Msg, +) map[jetstream.Msg]natsjscon.ProcessStatus { + log := logger.New() + + log.Infof("processing messages: %d", len(msgs)) + + processed := map[jetstream.Msg]natsjscon.ProcessStatus{} + + for msg := range msgs { + processed[msg] = natsjscon.Success + log.Info("processing message", logger.Fields{ + "data": string(msg.Data()), + "hdr": msg.Headers(), + }) + } + + return processed +} diff --git a/simian-go/app/tryitout/index.go b/simian-go/app/tryitout/index.go new file mode 100644 index 0000000..a119084 --- /dev/null +++ b/simian-go/app/tryitout/index.go @@ -0,0 +1,15 @@ +package metacli + +import ( + "github.com/spf13/cobra" + + "github.com/simiancreative/simiango/simian-go/app" +) + +var Cmd = &cobra.Command{ + Use: "tryitout", +} + +func init() { + app.Root.Cmd.AddCommand(Cmd) +} diff --git a/simian-go/app/tryitout/publisher/index.go b/simian-go/app/tryitout/publisher/index.go new file mode 100644 index 0000000..8e0c27d --- /dev/null +++ b/simian-go/app/tryitout/publisher/index.go @@ -0,0 +1,104 @@ +package publisher + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/nats-io/nats.go" + "github.com/spf13/cobra" + + "github.com/simiancreative/simiango/errors" + "github.com/simiancreative/simiango/logger" + "github.com/simiancreative/simiango/messaging/natsjscm" + "github.com/simiancreative/simiango/messaging/natsjspub" + cli "github.com/simiancreative/simiango/simian-go/app/tryitout" +) + +var ( + count int + streamName string + subjectName string +) + +var cmd = &cobra.Command{ + Use: "publisher", + Short: "publish messages to a nats stream", + RunE: func(cmd *cobra.Command, args []string) error { + return run() + }, +} + +func init() { + cmd.Flags().IntVarP(&count, "count", "c", 100, "Number of messages to publish") + cmd.Flags().StringVarP(&streamName, "stream", "s", "test", "Stream name") + cmd.Flags().StringVarP(&subjectName, "subject", "j", "test.something.batch", "Subject name") + + cli.Cmd.AddCommand(cmd) +} + +func run() error { + logger := logger.New() + + // Configure connection + connectionConfig := natsjscm.ConnectionConfig{ + Logger: logger, + URL: "nats://localhost:4222", + } + + // Create connection manager + connector, err := natsjscm.NewConnectionManager(connectionConfig) + if err != nil { + return errors.Wrap(err, "failed to create connection manager") + } + + publisher, err := natsjspub.NewPublisher(natsjspub.Dependencies{ + Connector: connector, + Logger: logger, + }, natsjspub.Config{ + StreamName: streamName, + Subject: subjectName, + }) + if err != nil { + return errors.Wrap(err, "failed to create publisher") + } + + // Publish messages in rapid succession + startTime := time.Now() + + logger.Infof("Publishing %d messages to %s...", count, subjectName) + + for i := 1; i <= count; i++ { + data := fmt.Sprintf("Test message %d", i) + + // Create headers for tracing + hdr := nats.Header{} + hdr.Add("X-Message-ID", strconv.Itoa(i)) + hdr.Add("X-Timestamp", time.Now().Format(time.RFC3339Nano)) + hdr.Add("X-Batch-Total", strconv.Itoa(count)) + + // Publish with message options + msg := &nats.Msg{ + Subject: subjectName, + Header: hdr, + Data: []byte(data), + } + + ctx := context.Background() + _, err := publisher.Publish(ctx, msg) + + if err != nil { + logger.Errorf("Failed to publish message %d: %v", i, err) + continue + } + } + + duration := time.Since(startTime) + rate := float64(count) / duration.Seconds() + + logger.Infof("Published %d messages in %v (%.2f msgs/sec)", count, duration, rate) + + return nil +} + diff --git a/simian-go/main.go b/simian-go/main.go index e8286c0..b1104f0 100644 --- a/simian-go/main.go +++ b/simian-go/main.go @@ -13,6 +13,8 @@ import ( _ "github.com/simiancreative/simiango/simian-go/app/token/decode" _ "github.com/simiancreative/simiango/simian-go/app/token/generate" _ "github.com/simiancreative/simiango/simian-go/app/token/test" + _ "github.com/simiancreative/simiango/simian-go/app/tryitout/consumer" + _ "github.com/simiancreative/simiango/simian-go/app/tryitout/publisher" "github.com/simiancreative/simiango/simian-go/app" )