From 2c46cb4b40a8bac31449a84633ffeaf646881969 Mon Sep 17 00:00:00 2001 From: Ross Nelson Date: Thu, 6 Mar 2025 09:29:43 -0500 Subject: [PATCH 01/15] feat(tests): add table-driven tests for Consumer struct - Added table-driven tests for NewConsumer, SetLogger, SetConnector, SetStrategy, SetDLQHandler, SetProcessor, Setup, IsRunning, Stop, and Validate functions. - Used testify's assert package for matchers to avoid if statements. - Included mock implementations for Logger, Connector, Strategy, and DLQHandler. --- messaging/natsjscon/natsjscon.go | 247 ++++++++++++++++++ .../natsjscon/natsjscon_handle_result.go | 130 +++++++++ messaging/natsjscon/natsjscon_start.go | 46 ++++ messaging/natsjscon/natsjscon_test.go | 146 +++++++++++ messaging/natsjscon/natsjscon_worker.go | 92 +++++++ messaging/natsjsconstrategies/pull.go | 1 + messaging/natsjsconstrategies/push.go | 1 + messaging/natsjsdlq/natsjsdlq.go | 17 +- 8 files changed, 674 insertions(+), 6 deletions(-) create mode 100644 messaging/natsjscon/natsjscon.go create mode 100644 messaging/natsjscon/natsjscon_handle_result.go create mode 100644 messaging/natsjscon/natsjscon_start.go create mode 100644 messaging/natsjscon/natsjscon_test.go create mode 100644 messaging/natsjscon/natsjscon_worker.go create mode 100644 messaging/natsjsconstrategies/pull.go create mode 100644 messaging/natsjsconstrategies/push.go diff --git a/messaging/natsjscon/natsjscon.go b/messaging/natsjscon/natsjscon.go new file mode 100644 index 0000000..217cc9c --- /dev/null +++ b/messaging/natsjscon/natsjscon.go @@ -0,0 +1,247 @@ +package natsjscon + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/simiancreative/simiango/logger" + "github.com/simiancreative/simiango/messaging/natsjscm" + "github.com/simiancreative/simiango/messaging/natsjsdlq" +) + +// Logger is the interface for logging +type Logger interface { + Debug(args ...any) + Error(args ...any) +} + +// ProcessStatus represents the result of message processing +type ProcessStatus int + +const ( + // Success indicates successful processing + Success ProcessStatus = iota + // Failure indicates processing failure (will be retried) + Failure + // TerminalFailure indicates a non-recoverable failure (sent to DLQ) + TerminalFailure +) + +var ResultNames = map[ProcessStatus]string{ + Success: "SUCCESS", + Failure: "FAILURE", + TerminalFailure: "TERMINAL-FAILURE", +} + +// Processor processes messages +type Processor func(ctx context.Context, msgs []jetstream.Msg) map[jetstream.Msg]ProcessStatus + +// ConsumerConfig holds general configuration for a consumer +type ConsumerConfig struct { + // StreamName is the name of the stream to consume from + StreamName string + + // ConsumerName is the name for this consumer + ConsumerName string + + // Subject is the subject to subscribe to + Subject string + + // MaxRetries is the maximum number of retries before sending to DLQ + MaxRetries int + + // ProcessTimeout is the timeout for processing a message/batch + ProcessTimeout time.Duration + + // EnableDLQ determines if messages should be sent to a DLQ after max retries + EnableDLQ bool + + // WorkerCount is the number of concurrent workers + WorkerCount int +} + +// ConsumptionStrategy defines the interface for different message consumption strategies +type ConsumptionStrategy interface { + // Setup prepares the consumption strategy + Setup(ctx context.Context) error + + // Consume retrieves messages for processing + Consume(ctx context.Context, workerID int) ([]jetstream.Msg, error) +} + +// Consumer manages consuming and processing messages +type Consumer struct { + // Configuration + config ConsumerConfig + + // Dependencies + logger Logger + strategy ConsumptionStrategy + processor Processor + cm natsjscm.Connector + dlqHandler natsjsdlq.Handler + + // State + running bool + wg sync.WaitGroup + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc +} + +// NewConsumer creates a new consumer with the specified strategy +func NewConsumer(config ConsumerConfig) *Consumer { + return &Consumer{config: config} +} + +func (c *Consumer) debug(args ...any) { + if c.logger == nil { + return + } + + c.logger.Debug(args...) +} + +// SetLogger sets the logger +func (c *Consumer) SetLogger(logger Logger) *Consumer { + c.mu.Lock() + defer c.mu.Unlock() + c.logger = logger + c.debug("logger set") + return c +} + +// SetConnector sets the connection manager +func (c *Consumer) SetConnector(cm natsjscm.Connector) *Consumer { + c.mu.Lock() + defer c.mu.Unlock() + + c.cm = cm + c.debug("connection manager set") + return c +} + +// SetStrategy sets the consumption strategy +func (c *Consumer) SetStrategy(strategy ConsumptionStrategy) *Consumer { + c.mu.Lock() + defer c.mu.Unlock() + + c.strategy = strategy + c.debug("strategy set") + return c +} + +// SetDLQHandler sets the dead letter queue handler +func (c *Consumer) SetDLQHandler(handler natsjsdlq.Handler) *Consumer { + c.mu.Lock() + defer c.mu.Unlock() + + c.dlqHandler = handler + c.debug("DLQ handler set") + return c +} + +// SetProcessor sets the message processor +func (c *Consumer) SetProcessor(processor Processor) *Consumer { + c.mu.Lock() + defer c.mu.Unlock() + + c.processor = processor + c.debug("processor set") + return c +} + +// Setup prepares the consumer for consumption +func (c *Consumer) Setup(processor Processor) *Consumer { + c.mu.Lock() + defer c.mu.Unlock() + + if c.logger == nil { + c.SetLogger(logger.New()) + } + + // Set defaults + if c.config.MaxRetries <= 0 { + c.debug("setting default max retries to 3") + c.config.MaxRetries = 3 + } + + if c.config.ProcessTimeout <= 0 { + c.debug("setting default process timeout to 30s") + c.config.ProcessTimeout = 30 * time.Second + } + + if c.config.WorkerCount <= 0 { + c.debug("setting default worker count to 1") + c.config.WorkerCount = 1 + } + + // Ensure connection to NATS + if err := c.cm.Connect(); err != nil { + c.debug("connection failed ", err) + } + + return c +} + +// IsRunning returns whether the consumer is currently running +func (c *Consumer) IsRunning() bool { + c.mu.RLock() + defer c.mu.RUnlock() + return c.running +} + +// Stop stops the consumer +func (c *Consumer) Stop() error { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.running { + return nil + } + + c.logger.Debug("stopping consumer", logger.Fields{ + "stream": c.config.StreamName, + "consumer": c.config.ConsumerName, + }) + + c.cancel() + c.running = false + c.wg.Wait() + return nil +} + +// validate checks if the consumer is properly configured +func (c *Consumer) validate() error { + c.mu.RLock() + defer c.mu.RUnlock() + + if c.processor == nil { + return errors.New("processor is required") + } + + if c.cm == nil { + return errors.New("connection manager is required") + } + + if c.strategy == nil { + return errors.New("consumption strategy is required") + } + + if c.config.StreamName == "" { + return errors.New("stream name is required") + } + + if c.config.ConsumerName == "" { + return errors.New("consumer name is required") + } + + if c.config.Subject == "" { + return errors.New("subject is required") + } + + return nil +} diff --git a/messaging/natsjscon/natsjscon_handle_result.go b/messaging/natsjscon/natsjscon_handle_result.go new file mode 100644 index 0000000..92b31e9 --- /dev/null +++ b/messaging/natsjscon/natsjscon_handle_result.go @@ -0,0 +1,130 @@ +package natsjscon + +import ( + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/simiancreative/simiango/logger" +) + +// handleResult handles the result of message processing +func (c *Consumer) handleResult(msg jetstream.Msg, status ProcessStatus) { + if msg == nil { + c.logger.Debug("missing jetstream message reference", nil) + return + } + + metadata, err := msg.Metadata() + if err != nil { + c.logger.Debug("error getting message metadata", logger.Fields{ + "error": err.Error(), + }) + return + } + + switch status { + case Success: + handleSuccess(msg, c) + return + case Failure: + handleFailure(msg, metadata, c) + return + case TerminalFailure: + handleTerminalError(msg, c) + } +} + +func handleSuccess(msg jetstream.Msg, c *Consumer) { + // Acknowledge successful processing + err := msg.Ack() + + if err == nil { + return + } + + c.logger.Error("error acknowledging message", logger.Fields{ + "error": err.Error(), + }) +} + +func handleFailure(msg jetstream.Msg, metadata *jetstream.MsgMetadata, c *Consumer) { + // Check if max retries reached + limitReached := handleMaxRetries(msg, metadata, c) + if limitReached { + return + } + + // Negative ack for retry + err := msg.Nak() + if err == nil { + return + } + + // Log error + c.logger.Error("error negative-acknowledging message", logger.Fields{ + "error": err.Error(), + }) +} + +func handleMaxRetries(msg jetstream.Msg, metadata *jetstream.MsgMetadata, c *Consumer) bool { + if metadata == nil { + return false + } + + if metadata.NumDelivered < uint64(c.config.MaxRetries) { + return false + } + + err := msg.Ack() + if err != nil { + c.logger.Debug("error acknowledging message", logger.Fields{ + "error": err.Error(), + }) + } + + if !c.config.EnableDLQ || c.dlqHandler == nil { + return true + } + + publishToDLQ(msg, "max_retries", c) + + return true +} + +func handleTerminalError(msg jetstream.Msg, c *Consumer) { + // Ack to prevent redelivery + err := msg.Ack() + if err != nil { + c.logger.Debug("error acknowledging message", logger.Fields{ + "error": err.Error(), + }) + } + + // Send to DLQ if enabled + if !c.config.EnableDLQ || c.dlqHandler == nil { + return + } + + // Create a NATS message for the DLQ + publishToDLQ(msg, "terminal_error", c) +} + +func publishToDLQ(original jetstream.Msg, reason string, c *Consumer) { + if c.dlqHandler == nil { + return + } + + msg := &nats.Msg{ + Subject: original.Subject(), + Data: original.Data(), + Header: original.Headers(), + } + + err := c.dlqHandler.PublishMessage(msg, reason) + if err == nil { + return + } + + c.logger.Debug("error sending to DLQ", logger.Fields{ + "error": err.Error(), + }) +} diff --git a/messaging/natsjscon/natsjscon_start.go b/messaging/natsjscon/natsjscon_start.go new file mode 100644 index 0000000..ca925b2 --- /dev/null +++ b/messaging/natsjscon/natsjscon_start.go @@ -0,0 +1,46 @@ +package natsjscon + +import ( + "context" + "errors" + "fmt" + + "github.com/simiancreative/simiango/logger" +) + +// Start begins consuming messages +func (c *Consumer) Start(ctx context.Context) error { + c.mu.Lock() + defer c.mu.Unlock() + + if err := c.validate(); err != nil { + return fmt.Errorf("invalid consumer configuration: %w", err) + } + + if c.running { + return errors.New("consumer is already running") + } + + // Set up the strategy + if err := c.strategy.Setup(ctx); err != nil { + return fmt.Errorf("failed to setup consumption strategy: %w", err) + } + + c.ctx, c.cancel = context.WithCancel(ctx) + c.running = true + + // Start worker pool + for i := 0; i < c.config.WorkerCount; i++ { + c.wg.Add(1) + go c.worker(i) + } + + c.logger.Debug("consumer started", logger.Fields{ + "stream": c.config.StreamName, + "consumer": c.config.ConsumerName, + "subject": c.config.Subject, + "worker_count": c.config.WorkerCount, + }) + + return nil +} diff --git a/messaging/natsjscon/natsjscon_test.go b/messaging/natsjscon/natsjscon_test.go new file mode 100644 index 0000000..64f1ea7 --- /dev/null +++ b/messaging/natsjscon/natsjscon_test.go @@ -0,0 +1,146 @@ +package natsjscon_test + +import ( + "context" + "errors" + "testing" + + "github.com/nats-io/nats.go/jetstream" + "github.com/simiancreative/simiango/messaging/natsjscon" + "github.com/stretchr/testify/assert" +) + +// Mock implementations +type MockLogger struct{} + +func (m *MockLogger) Debug(args ...any) {} +func (m *MockLogger) Error(args ...any) {} + +type MockConnector struct { + shouldFail bool +} + +func (m *MockConnector) Connect() error { + if m.shouldFail { + return errors.New("connection failed") + } + return nil +} + +type MockStrategy struct{} + +func (m *MockStrategy) Setup(ctx context.Context) error { + return nil +} + +func (m *MockStrategy) Consume(ctx context.Context, workerID int) ([]jetstream.Msg, error) { + return nil, nil +} + +type MockDLQHandler struct{} + +func (m *MockDLQHandler) Handle(msg jetstream.Msg) error { + return nil +} + +func TestNewConsumer(t *testing.T) { + config := natsjscon.ConsumerConfig{} + consumer := natsjscon.NewConsumer(config) + assert.NotNil(t, consumer) +} + +func TestSetLogger(t *testing.T) { + consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) + logger := &MockLogger{} + c := consumer.SetLogger(logger) + assert.NotNil(t, c) +} + +func TestSetConnector(t *testing.T) { + consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) + connector := &MockConnector{} + c := consumer.SetConnector(connector) + assert.Equal(t, connector, consumer.cm) +} + +// +// func TestSetStrategy(t *testing.T) { +// consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) +// strategy := &MockStrategy{} +// consumer.SetStrategy(strategy) +// assert.Equal(t, strategy, consumer.strategy) +// } +// +// func TestSetDLQHandler(t *testing.T) { +// consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) +// handler := &MockDLQHandler{} +// consumer.SetDLQHandler(handler) +// assert.Equal(t, handler, consumer.dlqHandler) +// } +// +// func TestSetProcessor(t *testing.T) { +// consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) +// processor := func(ctx context.Context, msgs []jetstream.Msg) map[jetstream.Msg]natsjscon.ProcessStatus { +// return nil +// } +// consumer.SetProcessor(processor) +// assert.Equal(t, processor, consumer.processor) +// } +// +// func TestSetup(t *testing.T) { +// consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) +// consumer.SetLogger(&MockLogger{}) +// consumer.SetConnector(&MockConnector{}) +// consumer.SetStrategy(&MockStrategy{}) +// consumer.SetDLQHandler(&MockDLQHandler{}) +// processor := func(ctx context.Context, msgs []jetstream.Msg) map[jetstream.Msg]natsjscon.ProcessStatus { +// return nil +// } +// consumer.Setup(processor) +// assert.NotNil(t, consumer.logger) +// assert.NotNil(t, consumer.cm) +// assert.NotNil(t, consumer.strategy) +// assert.NotNil(t, consumer.dlqHandler) +// assert.Equal(t, processor, consumer.processor) +// } +// +// func TestIsRunning(t *testing.T) { +// consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) +// assert.False(t, consumer.IsRunning()) +// consumer.running = true +// assert.True(t, consumer.IsRunning()) +// } +// +// func TestStop(t *testing.T) { +// consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) +// consumer.SetLogger(&MockLogger{}) +// consumer.running = true +// ctx, cancel := context.WithCancel(context.Background()) +// consumer.ctx = ctx +// consumer.cancel = cancel +// err := consumer.Stop() +// assert.NoError(t, err) +// assert.False(t, consumer.running) +// } +// +// func TestValidate(t *testing.T) { +// consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) +// consumer.SetLogger(&MockLogger{}) +// consumer.SetConnector(&MockConnector{}) +// consumer.SetStrategy(&MockStrategy{}) +// consumer.SetDLQHandler(&MockDLQHandler{}) +// processor := func(ctx context.Context, msgs []jetstream.Msg) map[jetstream.Msg]natsjscon.ProcessStatus { +// return nil +// } +// consumer.SetProcessor(processor) +// +// err := consumer.Validate() +// assert.Error(t, err) +// +// consumer.config.StreamName = "test-stream" +// consumer.config.ConsumerName = "test-consumer" +// consumer.config.Subject = "test-subject" +// +// err = consumer.Validate() +// assert.NoError(t, err) +// } diff --git a/messaging/natsjscon/natsjscon_worker.go b/messaging/natsjscon/natsjscon_worker.go new file mode 100644 index 0000000..cf6344f --- /dev/null +++ b/messaging/natsjscon/natsjscon_worker.go @@ -0,0 +1,92 @@ +package natsjscon + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/simiancreative/simiango/logger" +) + +// worker processes messages using the configured strategy +func (c *Consumer) worker(id int) { + defer c.wg.Done() + + c.logger.Debug("starting worker", logger.Fields{ + "worker_id": id, + "consumer": c.config.ConsumerName, + }) + + for { + select { + case <-c.ctx.Done(): + c.logger.Debug("worker stopping", logger.Fields{ + "worker_id": id, + "reason": "context cancelled", + }) + + return + default: + err := c.consumeAndProcess(id) + + if err == nil { + continue + } + + if errors.Is(err, context.Canceled) { + continue + } + + c.logger.Debug("error in worker", logger.Fields{ + "worker_id": id, + "error": err.Error(), + }) + + // Brief backoff on error + time.Sleep(100 * time.Millisecond) + } + } +} + +// consumeAndProcess uses the strategy to get messages and processes them +func (c *Consumer) consumeAndProcess(workerID int) error { + // Use strategy to consume messages + msgs, err := c.strategy.Consume(c.ctx, workerID) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + // This is normal when no messages are available + return nil + } + + return fmt.Errorf("failed to consume messages: %w", err) + } + + if len(msgs) == 0 { + return nil + } + + c.logger.Debug("processing batch", logger.Fields{ + "worker_id": workerID, + "batch_size": len(msgs), + }) + + // Create processing context with timeout + procCtx, procCancel := context.WithTimeout(c.ctx, c.config.ProcessTimeout) + defer procCancel() + + // Process batch + c.mu.RLock() + processor := c.processor + c.mu.RUnlock() + + // Call the batch processor + results := processor(procCtx, msgs) + + // Handle results + for msg, status := range results { + c.handleResult(msg, status) + } + + return nil +} diff --git a/messaging/natsjsconstrategies/pull.go b/messaging/natsjsconstrategies/pull.go new file mode 100644 index 0000000..4a60a7c --- /dev/null +++ b/messaging/natsjsconstrategies/pull.go @@ -0,0 +1 @@ +package natsjsconstrategies diff --git a/messaging/natsjsconstrategies/push.go b/messaging/natsjsconstrategies/push.go new file mode 100644 index 0000000..4a60a7c --- /dev/null +++ b/messaging/natsjsconstrategies/push.go @@ -0,0 +1 @@ +package natsjsconstrategies diff --git a/messaging/natsjsdlq/natsjsdlq.go b/messaging/natsjsdlq/natsjsdlq.go index 04aad28..bf831df 100644 --- a/messaging/natsjsdlq/natsjsdlq.go +++ b/messaging/natsjsdlq/natsjsdlq.go @@ -15,6 +15,11 @@ type Msg interface { Metadata() (*nats.MsgMetadata, error) } +type Handler interface { + PublishMessage(msg *nats.Msg, reason string) error + ShouldDLQ(msg Msg) bool +} + // Config holds DLQ configuration type Config struct { // StreamName for the DLQ @@ -42,7 +47,7 @@ type Dependencies struct { } // Handler manages dead letter queue operations -type Handler struct { +type DefaultHandler struct { config Config cm natsjscm.Connector p natsjspub.Publisher @@ -50,12 +55,12 @@ type Handler struct { } // NewHandler creates a new DLQ handler -func NewHandler(deps Dependencies, config Config) (*Handler, error) { +func NewHandler(deps Dependencies, config Config) (Handler, error) { if err := validateConfig(deps, config); err != nil { return nil, fmt.Errorf("invalid DLQ configuration: %w", err) } - handler := &Handler{ + handler := &DefaultHandler{ config: config, ctx: config.Context, cm: deps.ConnectionManager, @@ -102,7 +107,7 @@ func validateConfig(deps Dependencies, config Config) error { } // setup ensures the DLQ stream exists -func (h *Handler) setup() error { +func (h *DefaultHandler) setup() error { streamConfig := jetstream.StreamConfig{ Name: h.config.StreamName, Subjects: []string{h.config.Subject}, @@ -119,7 +124,7 @@ func (h *Handler) setup() error { } // PublishMessage sends a message to the DLQ -func (h *Handler) PublishMessage(msg *nats.Msg, reason string) error { +func (h *DefaultHandler) PublishMessage(msg *nats.Msg, reason string) error { // Clone original message headers headers := nats.Header{} if msg.Header != nil { @@ -150,7 +155,7 @@ func (h *Handler) PublishMessage(msg *nats.Msg, reason string) error { } // ShouldDLQ determines if a message should be sent to DLQ based on delivery count -func (h *Handler) ShouldDLQ(msg Msg) bool { +func (h *DefaultHandler) ShouldDLQ(msg Msg) bool { metadata, err := msg.Metadata() if err != nil { if h.config.ErrorHandler != nil { From 7b126db43dd15fb46844b9bb56b98cd3fdd43820 Mon Sep 17 00:00:00 2001 From: Ross Nelson Date: Fri, 7 Mar 2025 11:45:43 -0500 Subject: [PATCH 02/15] feat(consumer): add NATS consumer command - Added a new Cobra command to run a NATS consumer. - Implemented the consumer setup and start logic. - Integrated with the existing logger and connection manager. - Added mock implementations for unit testing. - Removed unused EnableDLQ configuration. --- messaging/natsjscm/natsjscm.go | 7 + messaging/natsjscm/natsjscm_test.go | 28 +--- messaging/natsjscon/natsjscon.go | 81 ++++----- .../natsjscon/natsjscon_handle_result.go | 4 +- messaging/natsjscon/natsjscon_start.go | 9 + messaging/natsjscon/natsjscon_test.go | 157 ++++-------------- messaging/natsjsdlq/natsjsdlq_test.go | 100 +++-------- messaging/natsjspub/natsjspub_test.go | 18 +- .../circuitbreaker/circuitbreaker.go | 2 +- mocks/logger/logger.go | 23 +++ .../messaging/natsjscm/natsjscm.go | 2 +- mocks/messaging/natsjscon/natsjscon.go | 35 ++++ mocks/messaging/natsjsdlq/natsjsdlq.go | 21 +++ mocks/messaging/natsjspub/natsjspub.go | 33 ++++ simian-go/app/tryitout/consumer/index.go | 87 ++++++++++ simian-go/app/tryitout/index.go | 15 ++ simian-go/main.go | 1 + 17 files changed, 336 insertions(+), 287 deletions(-) rename messaging/natsjspub/natsjspub_mcb_test.go => mocks/circuitbreaker/circuitbreaker.go (96%) create mode 100644 mocks/logger/logger.go rename messaging/natsjspub/natsjspub_mcm_test.go => mocks/messaging/natsjscm/natsjscm.go (98%) create mode 100644 mocks/messaging/natsjscon/natsjscon.go create mode 100644 mocks/messaging/natsjsdlq/natsjsdlq.go create mode 100644 mocks/messaging/natsjspub/natsjspub.go create mode 100644 simian-go/app/tryitout/consumer/index.go create mode 100644 simian-go/app/tryitout/index.go diff --git a/messaging/natsjscm/natsjscm.go b/messaging/natsjscm/natsjscm.go index bbaf84f..516b4b2 100644 --- a/messaging/natsjscm/natsjscm.go +++ b/messaging/natsjscm/natsjscm.go @@ -148,21 +148,26 @@ func (cm *ConnectionManager) retryConnection() { // Connect establishes a connection to NATS if not already connected func (cm *ConnectionManager) Connect() error { + cm.log.Debugf("Starting Connect") + cm.mu.Lock() defer cm.mu.Unlock() + cm.log.Debugf("Checking NATS connection") // If already connected, increment reference count if cm.nc != nil && cm.nc.IsConnected() { cm.refs++ return nil } + cm.log.Debugf("Connecting to NATS") // Connect to NATS nc, err := nats.Connect(cm.config.URL, cm.config.Options...) if err != nil { return fmt.Errorf("failed to connect to NATS: %w", err) } + cm.log.Debugf("Getting JetStream context") // Create JetStream context js, err := cm.createJetStreamContext(nc) if err != nil { @@ -174,6 +179,8 @@ func (cm *ConnectionManager) Connect() error { cm.js = js cm.refs = 1 + cm.log.Debugf("Connected to NATS") + return nil } diff --git a/messaging/natsjscm/natsjscm_test.go b/messaging/natsjscm/natsjscm_test.go index a4aa137..fcd9d23 100644 --- a/messaging/natsjscm/natsjscm_test.go +++ b/messaging/natsjscm/natsjscm_test.go @@ -14,31 +14,11 @@ import ( "github.com/nats-io/nats.go" "github.com/simiancreative/simiango/messaging/natsjscm" + "github.com/simiancreative/simiango/mocks/logger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -// Mock Logger for testing -type mockLogger struct { - debugMessages []string - errorMessages []string -} - -func newMockLogger() *mockLogger { - return &mockLogger{ - debugMessages: []string{}, - errorMessages: []string{}, - } -} - -func (m *mockLogger) Debugf(format string, args ...interface{}) { - m.debugMessages = append(m.debugMessages, fmt.Sprintf(format, args...)) -} - -func (m *mockLogger) Errorf(format string, args ...interface{}) { - m.errorMessages = append(m.errorMessages, fmt.Sprintf(format, args...)) -} - func MockServer(args ...int) func() { port := 0 if len(args) > 0 { @@ -105,7 +85,7 @@ func TestNewConnectionManager(t *testing.T) { config: natsjscm.ConnectionConfig{ URL: "nats://localhost:4222", ReconnectWait: 5 * time.Second, - Logger: newMockLogger(), + Logger: &logger.MockLogger{}, }, expectError: false, }, @@ -302,14 +282,10 @@ func TestEnsureStream(t *testing.T) { func TestConnectionEvents(t *testing.T) { shutdown := MockServer() - // Create a mock logger to capture log messages - logger := newMockLogger() - // Create a connection manager with event handlers cm, err := natsjscm.NewConnectionManager(natsjscm.ConnectionConfig{ URL: os.Getenv("NATS_HOST"), ReconnectWait: 100 * time.Millisecond, - Logger: logger, }) require.NoError(t, err) diff --git a/messaging/natsjscon/natsjscon.go b/messaging/natsjscon/natsjscon.go index 217cc9c..3097648 100644 --- a/messaging/natsjscon/natsjscon.go +++ b/messaging/natsjscon/natsjscon.go @@ -56,9 +56,6 @@ type ConsumerConfig struct { // ProcessTimeout is the timeout for processing a message/batch ProcessTimeout time.Duration - // EnableDLQ determines if messages should be sent to a DLQ after max retries - EnableDLQ bool - // WorkerCount is the number of concurrent workers WorkerCount int } @@ -154,53 +151,13 @@ func (c *Consumer) SetProcessor(processor Processor) *Consumer { return c } -// Setup prepares the consumer for consumption -func (c *Consumer) Setup(processor Processor) *Consumer { - c.mu.Lock() - defer c.mu.Unlock() - - if c.logger == nil { - c.SetLogger(logger.New()) - } - - // Set defaults - if c.config.MaxRetries <= 0 { - c.debug("setting default max retries to 3") - c.config.MaxRetries = 3 - } - - if c.config.ProcessTimeout <= 0 { - c.debug("setting default process timeout to 30s") - c.config.ProcessTimeout = 30 * time.Second - } - - if c.config.WorkerCount <= 0 { - c.debug("setting default worker count to 1") - c.config.WorkerCount = 1 - } - - // Ensure connection to NATS - if err := c.cm.Connect(); err != nil { - c.debug("connection failed ", err) - } - - return c -} - -// IsRunning returns whether the consumer is currently running -func (c *Consumer) IsRunning() bool { - c.mu.RLock() - defer c.mu.RUnlock() - return c.running -} - // Stop stops the consumer -func (c *Consumer) Stop() error { +func (c *Consumer) Stop() { c.mu.Lock() defer c.mu.Unlock() if !c.running { - return nil + return } c.logger.Debug("stopping consumer", logger.Fields{ @@ -211,14 +168,10 @@ func (c *Consumer) Stop() error { c.cancel() c.running = false c.wg.Wait() - return nil } // validate checks if the consumer is properly configured func (c *Consumer) validate() error { - c.mu.RLock() - defer c.mu.RUnlock() - if c.processor == nil { return errors.New("processor is required") } @@ -245,3 +198,33 @@ func (c *Consumer) validate() error { return nil } + +func (c *Consumer) setup() error { + c.debug("starting consumer setup") + + if c.logger == nil { + c.debug("setting logger") + c.SetLogger(logger.New()) + } + + // Set defaults + if c.config.MaxRetries <= 0 { + c.debug("setting default max retries to 3") + c.config.MaxRetries = 3 + } + + if c.config.ProcessTimeout <= 0 { + c.debug("setting default process timeout to 30s") + c.config.ProcessTimeout = 30 * time.Second + } + + if c.config.WorkerCount <= 0 { + c.debug("setting default worker count to 1") + c.config.WorkerCount = 1 + } + + c.debug("consumer setup complete, connecting to NATS") + + // Ensure connection to NATS + return c.cm.Connect() +} diff --git a/messaging/natsjscon/natsjscon_handle_result.go b/messaging/natsjscon/natsjscon_handle_result.go index 92b31e9..b07d1b9 100644 --- a/messaging/natsjscon/natsjscon_handle_result.go +++ b/messaging/natsjscon/natsjscon_handle_result.go @@ -81,7 +81,7 @@ func handleMaxRetries(msg jetstream.Msg, metadata *jetstream.MsgMetadata, c *Con }) } - if !c.config.EnableDLQ || c.dlqHandler == nil { + if c.dlqHandler == nil { return true } @@ -100,7 +100,7 @@ func handleTerminalError(msg jetstream.Msg, c *Consumer) { } // Send to DLQ if enabled - if !c.config.EnableDLQ || c.dlqHandler == nil { + if c.dlqHandler == nil { return } diff --git a/messaging/natsjscon/natsjscon_start.go b/messaging/natsjscon/natsjscon_start.go index ca925b2..132b165 100644 --- a/messaging/natsjscon/natsjscon_start.go +++ b/messaging/natsjscon/natsjscon_start.go @@ -13,15 +13,23 @@ func (c *Consumer) Start(ctx context.Context) error { c.mu.Lock() defer c.mu.Unlock() + c.debug("setting up consumer") + if err := c.setup(); err != nil { + return fmt.Errorf("failed to setup consumer: %w", err) + } + + c.debug("validating consumer configuration") if err := c.validate(); err != nil { return fmt.Errorf("invalid consumer configuration: %w", err) } + c.debug("is consumer already running?") if c.running { return errors.New("consumer is already running") } // Set up the strategy + c.debug("setting up consumption strategy") if err := c.strategy.Setup(ctx); err != nil { return fmt.Errorf("failed to setup consumption strategy: %w", err) } @@ -30,6 +38,7 @@ func (c *Consumer) Start(ctx context.Context) error { c.running = true // Start worker pool + c.debug("starting worker pool") for i := 0; i < c.config.WorkerCount; i++ { c.wg.Add(1) go c.worker(i) diff --git a/messaging/natsjscon/natsjscon_test.go b/messaging/natsjscon/natsjscon_test.go index 64f1ea7..fef7253 100644 --- a/messaging/natsjscon/natsjscon_test.go +++ b/messaging/natsjscon/natsjscon_test.go @@ -2,145 +2,52 @@ package natsjscon_test import ( "context" - "errors" "testing" + "time" - "github.com/nats-io/nats.go/jetstream" "github.com/simiancreative/simiango/messaging/natsjscon" + "github.com/simiancreative/simiango/mocks/logger" + "github.com/simiancreative/simiango/mocks/messaging/natsjscm" + conmock "github.com/simiancreative/simiango/mocks/messaging/natsjscon" + "github.com/simiancreative/simiango/mocks/messaging/natsjsdlq" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) -// Mock implementations -type MockLogger struct{} - -func (m *MockLogger) Debug(args ...any) {} -func (m *MockLogger) Error(args ...any) {} - -type MockConnector struct { - shouldFail bool -} - -func (m *MockConnector) Connect() error { - if m.shouldFail { - return errors.New("connection failed") - } - return nil -} +func newConsumer(t *testing.T) *natsjscon.Consumer { + logger := &logger.MockLogger{} + jetstream := &natsjscm.MockJetStream{} + connector := &natsjscm.MockConnectionManager{} + config := natsjscon.ConsumerConfig{} + dlqHandler := &natsjsdlq.MockDLQHandler{} + strategy := &conmock.MockStrategy{} + processor := &conmock.ProcessorMock{} -type MockStrategy struct{} + connector.SetJetStream(jetstream) -func (m *MockStrategy) Setup(ctx context.Context) error { - return nil -} + logger.On("Debug", mock.Anything) -func (m *MockStrategy) Consume(ctx context.Context, workerID int) ([]jetstream.Msg, error) { - return nil, nil -} + consumer := natsjscon. + NewConsumer(config). + SetLogger(logger). + SetConnector(connector). + SetDLQHandler(dlqHandler). + SetStrategy(strategy). + SetProcessor(processor.Process) -type MockDLQHandler struct{} + assert.NotNil(t, consumer) -func (m *MockDLQHandler) Handle(msg jetstream.Msg) error { - return nil + return consumer } func TestNewConsumer(t *testing.T) { - config := natsjscon.ConsumerConfig{} - consumer := natsjscon.NewConsumer(config) - assert.NotNil(t, consumer) -} + c := newConsumer(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() -func TestSetLogger(t *testing.T) { - consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) - logger := &MockLogger{} - c := consumer.SetLogger(logger) - assert.NotNil(t, c) -} + err := c.Start(ctx) + assert.NoError(t, err) -func TestSetConnector(t *testing.T) { - consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) - connector := &MockConnector{} - c := consumer.SetConnector(connector) - assert.Equal(t, connector, consumer.cm) + // wait + time.Sleep(500 * time.Millisecond) } - -// -// func TestSetStrategy(t *testing.T) { -// consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) -// strategy := &MockStrategy{} -// consumer.SetStrategy(strategy) -// assert.Equal(t, strategy, consumer.strategy) -// } -// -// func TestSetDLQHandler(t *testing.T) { -// consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) -// handler := &MockDLQHandler{} -// consumer.SetDLQHandler(handler) -// assert.Equal(t, handler, consumer.dlqHandler) -// } -// -// func TestSetProcessor(t *testing.T) { -// consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) -// processor := func(ctx context.Context, msgs []jetstream.Msg) map[jetstream.Msg]natsjscon.ProcessStatus { -// return nil -// } -// consumer.SetProcessor(processor) -// assert.Equal(t, processor, consumer.processor) -// } -// -// func TestSetup(t *testing.T) { -// consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) -// consumer.SetLogger(&MockLogger{}) -// consumer.SetConnector(&MockConnector{}) -// consumer.SetStrategy(&MockStrategy{}) -// consumer.SetDLQHandler(&MockDLQHandler{}) -// processor := func(ctx context.Context, msgs []jetstream.Msg) map[jetstream.Msg]natsjscon.ProcessStatus { -// return nil -// } -// consumer.Setup(processor) -// assert.NotNil(t, consumer.logger) -// assert.NotNil(t, consumer.cm) -// assert.NotNil(t, consumer.strategy) -// assert.NotNil(t, consumer.dlqHandler) -// assert.Equal(t, processor, consumer.processor) -// } -// -// func TestIsRunning(t *testing.T) { -// consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) -// assert.False(t, consumer.IsRunning()) -// consumer.running = true -// assert.True(t, consumer.IsRunning()) -// } -// -// func TestStop(t *testing.T) { -// consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) -// consumer.SetLogger(&MockLogger{}) -// consumer.running = true -// ctx, cancel := context.WithCancel(context.Background()) -// consumer.ctx = ctx -// consumer.cancel = cancel -// err := consumer.Stop() -// assert.NoError(t, err) -// assert.False(t, consumer.running) -// } -// -// func TestValidate(t *testing.T) { -// consumer := natsjscon.NewConsumer(natsjscon.ConsumerConfig{}) -// consumer.SetLogger(&MockLogger{}) -// consumer.SetConnector(&MockConnector{}) -// consumer.SetStrategy(&MockStrategy{}) -// consumer.SetDLQHandler(&MockDLQHandler{}) -// processor := func(ctx context.Context, msgs []jetstream.Msg) map[jetstream.Msg]natsjscon.ProcessStatus { -// return nil -// } -// consumer.SetProcessor(processor) -// -// err := consumer.Validate() -// assert.Error(t, err) -// -// consumer.config.StreamName = "test-stream" -// consumer.config.ConsumerName = "test-consumer" -// consumer.config.Subject = "test-subject" -// -// err = consumer.Validate() -// assert.NoError(t, err) -// } diff --git a/messaging/natsjsdlq/natsjsdlq_test.go b/messaging/natsjsdlq/natsjsdlq_test.go index 4f5b5e0..93588cb 100644 --- a/messaging/natsjsdlq/natsjsdlq_test.go +++ b/messaging/natsjsdlq/natsjsdlq_test.go @@ -8,87 +8,27 @@ import ( "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" - "github.com/simiancreative/simiango/messaging/natsjscm" "github.com/simiancreative/simiango/messaging/natsjsdlq" + "github.com/simiancreative/simiango/mocks/messaging/natsjscm" + "github.com/simiancreative/simiango/mocks/messaging/natsjspub" "github.com/stretchr/testify/mock" "github.com/tj/assert" ) -// Mock implementations -type MockConnectionManager struct { - mock.Mock -} - -func (m *MockConnectionManager) Connect() error { - args := m.Called() - return args.Error(0) -} - -func (m *MockConnectionManager) GetConnection() *nats.Conn { - args := m.Called() - return args.Get(0).(*nats.Conn) -} - -func (m *MockConnectionManager) GetJetStream() natsjscm.JetStream { - args := m.Called() - return args.Get(0).(jetstream.JetStream) -} - -func (m *MockConnectionManager) EnsureStream( - ctx context.Context, - config jetstream.StreamConfig, -) (natsjscm.JetStream, error) { - args := m.Called(ctx, config) - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).(jetstream.JetStream), args.Error(1) -} - -func (m *MockConnectionManager) Disconnect() error { - args := m.Called() - return args.Error(0) -} - -func (m *MockConnectionManager) IsConnected() bool { - args := m.Called() - return args.Bool(0) -} - -type MockPublisher struct { - mock.Mock -} - -func (m *MockPublisher) Publish(ctx context.Context, msg *nats.Msg) (*jetstream.PubAck, error) { - args := m.Called(ctx, msg) - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).(*jetstream.PubAck), args.Error(1) -} - -type MockMsg struct { - mock.Mock -} - -func (m *MockMsg) Metadata() (*nats.MsgMetadata, error) { - args := m.Called() - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).(*nats.MsgMetadata), args.Error(1) -} - // Test Suite type DLQHandlerTestSuite struct { - mockCM *MockConnectionManager - mockPub *MockPublisher + mockCM *natsjscm.MockConnectionManager + mockPub *natsjspub.MockPublisher + mockJS *natsjscm.MockJetStream ctx context.Context } func (suite *DLQHandlerTestSuite) SetupTest() { - suite.mockCM = new(MockConnectionManager) - suite.mockPub = new(MockPublisher) + suite.mockCM = new(natsjscm.MockConnectionManager) + suite.mockPub = new(natsjspub.MockPublisher) + suite.mockJS = new(natsjscm.MockJetStream) + + suite.mockCM.SetJetStream(suite.mockJS) suite.ctx = context.Background() } @@ -112,7 +52,7 @@ func TestNewHandlerValidation(t *testing.T) { StreamName: "test-dlq", Subject: "test.dlq", MaxDeliveries: 3, - Storage: jetstream.FileStorage, + Storage: jetstream.MemoryStorage, Context: suite.ctx, }, expectedErr: "", @@ -195,7 +135,11 @@ func TestNewHandlerValidation(t *testing.T) { Storage: tc.config.Storage, Retention: jetstream.WorkQueuePolicy, } - suite.mockCM.On("EnsureStream", mock.Anything, streamConfig).Return(nil, nil).Once() + + suite.mockCM. + On("EnsureStream", mock.Anything, streamConfig). + Return(suite.mockJS, nil). + Once() } handler, err := natsjsdlq.NewHandler(tc.deps, tc.config) @@ -273,7 +217,10 @@ func TestShouldDLQ(t *testing.T) { Storage: jetstream.FileStorage, Retention: jetstream.WorkQueuePolicy, } - suite.mockCM.On("EnsureStream", mock.Anything, streamConfig).Return(nil, nil).Once() + suite.mockCM. + On("EnsureStream", mock.Anything, streamConfig). + Return(suite.mockJS, nil). + Once() handler, err := natsjsdlq.NewHandler(deps, validConfig) assert.NoError(t, err) @@ -282,7 +229,7 @@ func TestShouldDLQ(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { // Create a mock message with the configured metadata - mockMsg := new(MockMsg) + mockMsg := new(natsjspub.MockMsg) metadata := &nats.MsgMetadata{ NumDelivered: tc.numDelivered, } @@ -380,7 +327,10 @@ func TestPublishMessage(t *testing.T) { Storage: jetstream.FileStorage, Retention: jetstream.WorkQueuePolicy, } - suite.mockCM.On("EnsureStream", mock.Anything, streamConfig).Return(nil, nil).Once() + suite.mockCM. + On("EnsureStream", mock.Anything, streamConfig). + Return(suite.mockJS, nil). + Once() handler, err := natsjsdlq.NewHandler(deps, validConfig) assert.NoError(t, err) diff --git a/messaging/natsjspub/natsjspub_test.go b/messaging/natsjspub/natsjspub_test.go index ecea436..e98b4cf 100644 --- a/messaging/natsjspub/natsjspub_test.go +++ b/messaging/natsjspub/natsjspub_test.go @@ -8,12 +8,14 @@ import ( "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" "github.com/simiancreative/simiango/messaging/natsjspub" + "github.com/simiancreative/simiango/mocks/circuitbreaker" + "github.com/simiancreative/simiango/mocks/messaging/natsjscm" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) func TestNewPublisher(t *testing.T) { - mockCM := new(MockConnectionManager) + mockCM := new(natsjscm.MockConnectionManager) config := natsjspub.Config{ StreamName: "test-stream", Subject: "test-subject", @@ -23,7 +25,7 @@ func TestNewPublisher(t *testing.T) { t.Run("success", func(t *testing.T) { mockCM. On("EnsureStream", mock.Anything, mock.Anything). - Return(new(MockJetStream), nil) + Return(new(natsjscm.MockJetStream), nil) mockCM. On("IsConnected"). @@ -59,16 +61,16 @@ func TestNewPublisher(t *testing.T) { } type Mocks struct { - cm *MockConnectionManager - js *MockJetStream - cb *MockCircuitBreaker + cm *natsjscm.MockConnectionManager + js *natsjscm.MockJetStream + cb *circuitbreaker.MockCircuitBreaker } func createPublisher(t *testing.T) (natsjspub.PublishMulti, *Mocks) { mocks := &Mocks{ - cm: new(MockConnectionManager), - js: new(MockJetStream), - cb: new(MockCircuitBreaker), + cm: new(natsjscm.MockConnectionManager), + js: new(natsjscm.MockJetStream), + cb: new(circuitbreaker.MockCircuitBreaker), } mocks.cm.SetJetStream(mocks.js) diff --git a/messaging/natsjspub/natsjspub_mcb_test.go b/mocks/circuitbreaker/circuitbreaker.go similarity index 96% rename from messaging/natsjspub/natsjspub_mcb_test.go rename to mocks/circuitbreaker/circuitbreaker.go index dbd0744..5d2880a 100644 --- a/messaging/natsjspub/natsjspub_mcb_test.go +++ b/mocks/circuitbreaker/circuitbreaker.go @@ -1,4 +1,4 @@ -package natsjspub_test +package circuitbreaker import ( "github.com/sanity-io/litter" diff --git a/mocks/logger/logger.go b/mocks/logger/logger.go new file mode 100644 index 0000000..3b5a070 --- /dev/null +++ b/mocks/logger/logger.go @@ -0,0 +1,23 @@ +package logger + +import "github.com/stretchr/testify/mock" + +type MockLogger struct { + mock.Mock +} + +func (m *MockLogger) Debugf(fmt string, args ...any) { + m.Called(args) +} + +func (m *MockLogger) Errorf(fmt string, args ...any) { + m.Called(args) +} + +func (m *MockLogger) Debug(args ...any) { + m.Called(args) +} + +func (m *MockLogger) Error(args ...any) { + m.Called(args) +} diff --git a/messaging/natsjspub/natsjspub_mcm_test.go b/mocks/messaging/natsjscm/natsjscm.go similarity index 98% rename from messaging/natsjspub/natsjspub_mcm_test.go rename to mocks/messaging/natsjscm/natsjscm.go index b658fc4..7a7a010 100644 --- a/messaging/natsjspub/natsjspub_mcm_test.go +++ b/mocks/messaging/natsjscm/natsjscm.go @@ -1,4 +1,4 @@ -package natsjspub_test +package natsjscm import ( "context" diff --git a/mocks/messaging/natsjscon/natsjscon.go b/mocks/messaging/natsjscon/natsjscon.go new file mode 100644 index 0000000..d956973 --- /dev/null +++ b/mocks/messaging/natsjscon/natsjscon.go @@ -0,0 +1,35 @@ +package natsjscon + +import ( + "context" + + "github.com/nats-io/nats.go/jetstream" + "github.com/simiancreative/simiango/messaging/natsjscon" + "github.com/stretchr/testify/mock" +) + +type MockStrategy struct { + mock.Mock +} + +func (m *MockStrategy) Setup(ctx context.Context) error { + c := m.Called(ctx) + return c.Error(0) +} + +func (m *MockStrategy) Consume(ctx context.Context, workerID int) ([]jetstream.Msg, error) { + c := m.Called(ctx, workerID) + return c.Get(0).([]jetstream.Msg), c.Error(1) +} + +type ProcessorMock struct { + mock.Mock +} + +func (p *ProcessorMock) Process( + ctx context.Context, + msgs []jetstream.Msg, +) map[jetstream.Msg]natsjscon.ProcessStatus { + args := p.Called(ctx, msgs) + return args.Get(0).(map[jetstream.Msg]natsjscon.ProcessStatus) +} diff --git a/mocks/messaging/natsjsdlq/natsjsdlq.go b/mocks/messaging/natsjsdlq/natsjsdlq.go new file mode 100644 index 0000000..73ebbf5 --- /dev/null +++ b/mocks/messaging/natsjsdlq/natsjsdlq.go @@ -0,0 +1,21 @@ +package natsjsdlq + +import ( + "github.com/nats-io/nats.go" + "github.com/simiancreative/simiango/messaging/natsjsdlq" + "github.com/stretchr/testify/mock" +) + +type MockDLQHandler struct { + mock.Mock +} + +func (m *MockDLQHandler) PublishMessage(msg *nats.Msg, reason string) error { + args := m.Called(msg, reason) + return args.Error(0) +} + +func (m *MockDLQHandler) ShouldDLQ(msg natsjsdlq.Msg) bool { + args := m.Called(msg) + return args.Bool(0) +} diff --git a/mocks/messaging/natsjspub/natsjspub.go b/mocks/messaging/natsjspub/natsjspub.go new file mode 100644 index 0000000..a94ee1e --- /dev/null +++ b/mocks/messaging/natsjspub/natsjspub.go @@ -0,0 +1,33 @@ +package natsjspub + +import ( + "context" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/mock" +) + +type MockPublisher struct { + mock.Mock +} + +func (m *MockPublisher) Publish(ctx context.Context, msg *nats.Msg) (*jetstream.PubAck, error) { + args := m.Called(ctx, msg) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*jetstream.PubAck), args.Error(1) +} + +type MockMsg struct { + mock.Mock +} + +func (m *MockMsg) Metadata() (*nats.MsgMetadata, error) { + args := m.Called() + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*nats.MsgMetadata), args.Error(1) +} diff --git a/simian-go/app/tryitout/consumer/index.go b/simian-go/app/tryitout/consumer/index.go new file mode 100644 index 0000000..ea2039c --- /dev/null +++ b/simian-go/app/tryitout/consumer/index.go @@ -0,0 +1,87 @@ +package consumer + +import ( + "context" + + "github.com/nats-io/nats.go/jetstream" + "github.com/spf13/cobra" + + "github.com/simiancreative/simiango/errors" + "github.com/simiancreative/simiango/logger" + "github.com/simiancreative/simiango/messaging/natsjscm" + "github.com/simiancreative/simiango/messaging/natsjscon" + "github.com/simiancreative/simiango/sig" + cli "github.com/simiancreative/simiango/simian-go/app/tryitout" +) + +var cmd = &cobra.Command{ + Use: "consumer", + Short: "run a nats consumer", + RunE: func(cmd *cobra.Command, args []string) error { + return run() + }, +} + +func init() { + cli.Cmd.AddCommand(cmd) +} + +func run() error { + logger := logger.New() + + config := natsjscon.ConsumerConfig{ + StreamName: "test", + ConsumerName: "test", + Subject: "test", + } + + connectionConfig := natsjscm.ConnectionConfig{ + Logger: logger, + URL: "nats://localhost:4222", + } + connector, err := natsjscm.NewConnectionManager(connectionConfig) + if err != nil { + return errors.Wrap(err, "failed to create connection manager") + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err = natsjscon. + NewConsumer(config). + SetLogger(logger). + SetConnector(connector). + SetStrategy(&Strategy{}). + SetProcessor(processor). + Start(ctx) + + if err != nil { + return errors.Wrap(err, "failed to start consumer") + } + + _, exit := sig. + New(). + Catch() + + <-exit.Done() + + return nil +} + +func processor( + ctx context.Context, + msgs []jetstream.Msg, +) map[jetstream.Msg]natsjscon.ProcessStatus { + return map[jetstream.Msg]natsjscon.ProcessStatus{} +} + +type Strategy struct { +} + +func (s *Strategy) Setup(ctx context.Context) error { + return nil +} + +func (s *Strategy) Consume(ctx context.Context, workerID int) ([]jetstream.Msg, error) { + return nil, nil +} diff --git a/simian-go/app/tryitout/index.go b/simian-go/app/tryitout/index.go new file mode 100644 index 0000000..a119084 --- /dev/null +++ b/simian-go/app/tryitout/index.go @@ -0,0 +1,15 @@ +package metacli + +import ( + "github.com/spf13/cobra" + + "github.com/simiancreative/simiango/simian-go/app" +) + +var Cmd = &cobra.Command{ + Use: "tryitout", +} + +func init() { + app.Root.Cmd.AddCommand(Cmd) +} diff --git a/simian-go/main.go b/simian-go/main.go index e8286c0..d7d50fe 100644 --- a/simian-go/main.go +++ b/simian-go/main.go @@ -13,6 +13,7 @@ import ( _ "github.com/simiancreative/simiango/simian-go/app/token/decode" _ "github.com/simiancreative/simiango/simian-go/app/token/generate" _ "github.com/simiancreative/simiango/simian-go/app/token/test" + _ "github.com/simiancreative/simiango/simian-go/app/tryitout/consumer" "github.com/simiancreative/simiango/simian-go/app" ) From f115454bdd8a5113d4405d13560f5b13e24b7ada Mon Sep 17 00:00:00 2001 From: Ross Nelson Date: Sun, 9 Mar 2025 22:12:46 -0400 Subject: [PATCH 03/15] feat(natsjscon): implement pull-based consumer strategy with TLS support - Added pull-based consumer strategy for NATS JetStream - Added NATS service configuration with TLS support - Added certificate generation script - Updated consumer example to use the new strategy - Added .gitignore rule for certificate files --- .gitignore | 1 + bin/serve/docker.sh | 2 + bin/serve/docker/docker-compose.yml | 15 ++ bin/serve/gen-certs.sh | 66 ++++++++ messaging/natsjscon/natsjscon_start.go | 20 +++ messaging/natsjsconstrategies/pull.go | 1 - messaging/natsjsconstrategies/push.go | 1 - messaging/natsjsstrategypull/pull.go | 196 +++++++++++++++++++++++ simian-go/app/tryitout/consumer/index.go | 28 ++-- 9 files changed, 316 insertions(+), 14 deletions(-) create mode 100755 bin/serve/gen-certs.sh delete mode 100644 messaging/natsjsconstrategies/pull.go delete mode 100644 messaging/natsjsconstrategies/push.go create mode 100644 messaging/natsjsstrategypull/pull.go diff --git a/.gitignore b/.gitignore index a7519ab..963f51b 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ test.log /tmp/* /examples/tmp/* /bin/serve/docker/prometheus/data +/bin/serve/docker/nats/certs/*.pem diff --git a/bin/serve/docker.sh b/bin/serve/docker.sh index b28d3ae..d562aa8 100755 --- a/bin/serve/docker.sh +++ b/bin/serve/docker.sh @@ -1,5 +1,7 @@ #!/usr/bin/env bash +./bin/serve/gen-certs.sh + pushd ./bin/serve/docker make start diff --git a/bin/serve/docker/docker-compose.yml b/bin/serve/docker/docker-compose.yml index c893432..ae95f4b 100644 --- a/bin/serve/docker/docker-compose.yml +++ b/bin/serve/docker/docker-compose.yml @@ -1,6 +1,21 @@ version: '3' services: + nats: + image: docker.io/nats:2.9.20 + ports: + - "4222:4222" + - "6222:6222" + - "8222:8222" + volumes: + - "./nats/certs/server.pem:/certs/server.pem" + - "./nats/certs/server-key.pem:/certs/server-key.pem" + command: + - "-tls" + - "-tlscert=/certs/server.pem" + - "-tlskey=/certs/server-key.pem" + - "-js" + # grafana: # build: # context: ./grafana diff --git a/bin/serve/gen-certs.sh b/bin/serve/gen-certs.sh new file mode 100755 index 0000000..a54eff4 --- /dev/null +++ b/bin/serve/gen-certs.sh @@ -0,0 +1,66 @@ +#!/usr/bin/env bash + +set -e + +if [ -z "$(which mkcert)" ]; then + echo "mkcert not installed. Installing....." + HOMEBREW_NO_AUTO_UPDATE=1 brew install mkcert +fi + +check_and_generate_certificate() { + if [ ! -f "$(mkcert -CAROOT)/rootCA.pem" ]; then + cat << EOF +##################################################################### +# ERROR: The mkcert root CA certificate has not been generated. # +# Please run the following command to install it: # +# # +# mkcert --install # +# # +# This is required for nats to start properly. # +##################################################################### +EOF + return 1 + fi + + + # Check if server.pem exists and if it is a directory + if [ -d ./server.pem ]; then + echo "server.pem is a directory. Deleting certs contents." + rm -rf ./* + echo "Directory cleaned up." + fi + + # Check if the server.pem file exists + if [ ! -f ./server.pem ]; then + echo "server.pem does not exist. Generating a new certificate." + mkcert -cert-file server.pem -key-file server-key.pem localhost 127.0.0.1 ::1 + return 0 + fi + + # Get the certificate expiration date + enddate=$(openssl x509 -enddate -noout -in ./server.pem | cut -d= -f2) + + # Convert the expiration date to a Unix timestamp + enddate_timestamp=$(date -j -f "%b %d %T %Y %Z" "$enddate" "+%Y%m%d%H%M%S") + + # Get today's date as a Unix timestamp + current_timestamp=$(date "+%Y%m%d%H%M%S") + + # Compare the dates + if [ "$enddate_timestamp" -le "$current_timestamp" ]; then + echo "Certificate has expired or is expiring today. Generating a new one." + mkcert -cert-file server.pem -key-file server-key.pem localhost 127.0.0.1 ::1 + return 0 + fi + + echo "Certificate is still valid." +} + +mkdir -p ./bin/serve/docker/nats/certs + +pushd ./bin/serve/docker/nats/certs + + check_and_generate_certificate + +popd + diff --git a/messaging/natsjscon/natsjscon_start.go b/messaging/natsjscon/natsjscon_start.go index 132b165..ae789bc 100644 --- a/messaging/natsjscon/natsjscon_start.go +++ b/messaging/natsjscon/natsjscon_start.go @@ -8,11 +8,15 @@ import ( "github.com/simiancreative/simiango/logger" ) +type CtxKey string + // Start begins consuming messages func (c *Consumer) Start(ctx context.Context) error { c.mu.Lock() defer c.mu.Unlock() + ctx = c.setupCtx(ctx) + c.debug("setting up consumer") if err := c.setup(); err != nil { return fmt.Errorf("failed to setup consumer: %w", err) @@ -53,3 +57,19 @@ func (c *Consumer) Start(ctx context.Context) error { return nil } + +func (c *Consumer) setupCtx(ctx context.Context) context.Context { + key := CtxKey("stream-name") + ctx = context.WithValue(ctx, key, c.config.StreamName) + + key = CtxKey("subject") + ctx = context.WithValue(ctx, key, c.config.Subject) + + key = CtxKey("logger") + ctx = context.WithValue(ctx, key, c.logger) + + key = CtxKey("connection-manager") + ctx = context.WithValue(ctx, key, c.cm) + + return ctx +} diff --git a/messaging/natsjsconstrategies/pull.go b/messaging/natsjsconstrategies/pull.go deleted file mode 100644 index 4a60a7c..0000000 --- a/messaging/natsjsconstrategies/pull.go +++ /dev/null @@ -1 +0,0 @@ -package natsjsconstrategies diff --git a/messaging/natsjsconstrategies/push.go b/messaging/natsjsconstrategies/push.go deleted file mode 100644 index 4a60a7c..0000000 --- a/messaging/natsjsconstrategies/push.go +++ /dev/null @@ -1 +0,0 @@ -package natsjsconstrategies diff --git a/messaging/natsjsstrategypull/pull.go b/messaging/natsjsstrategypull/pull.go new file mode 100644 index 0000000..2cfc951 --- /dev/null +++ b/messaging/natsjsstrategypull/pull.go @@ -0,0 +1,196 @@ +package natsjsstrategypull + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/simiancreative/simiango/logger" + "github.com/simiancreative/simiango/messaging/natsjscm" + "github.com/simiancreative/simiango/messaging/natsjscon" +) + +type Logger interface { + Debugf(format string, args ...interface{}) + Debug(args ...any) + Error(args ...any) +} + +// PullStrategyConfig holds configuration specific to pull-based consumption +type Config struct { + // ConsumerName is the name for this consumer + ConsumerName string + + // BatchSize for processing + BatchSize int + + // PollTimeout is how long to wait when polling for new messages + PollTimeout time.Duration + + // AckWait is how long the server waits for an ack before redelivery + AckWait time.Duration + + // MaxAckPending is the maximum number of pending acks + MaxAckPending int + + // MaxRetries is the maximum number of retries + MaxRetries int + + // RetentionPolicy is the retention policy for the stream + RetentionPolicy jetstream.RetentionPolicy +} + +// PullStrategy implements pull-based message consumption +type PullStrategy struct { + config Config + cm natsjscm.Connector + consumer jetstream.Consumer + + streamName string + subject string + logger Logger +} + +// NewPullStrategy creates a new pull-based consumption strategy +func New(config Config) (*PullStrategy, error) { + if config.ConsumerName == "" { + return nil, errors.New("consumer name is required") + } + + // Set defaults for unspecified configuration + if config.BatchSize <= 0 { + config.BatchSize = 100 + } + + if config.PollTimeout <= 0 { + config.PollTimeout = 1 * time.Second + } + + if config.AckWait <= 0 { + config.AckWait = 30 * time.Second + } + + if config.MaxAckPending <= 0 { + config.MaxAckPending = 1000 + } + + if config.MaxRetries <= 0 { + config.MaxRetries = 3 + } + + return &PullStrategy{ + config: config, + }, nil +} + +// Setup creates or updates the JetStream consumer +func (p *PullStrategy) Setup(ctx context.Context) error { + p.streamName = ctx.Value(natsjscon.CtxKey("stream-name")).(string) + p.subject = ctx.Value(natsjscon.CtxKey("subject")).(string) + p.logger = ctx.Value(natsjscon.CtxKey("logger")).(Logger) + p.cm = ctx.Value(natsjscon.CtxKey("connection-manager")).(natsjscm.Connector) + + err := p.ensureStream(ctx) + if err != nil { + return fmt.Errorf("failed to ensure stream: %w", err) + } + + js := p.cm.GetJetStream() + if js == nil { + return errors.New("JetStream connection not available") + } + + // Get stream + stream, err := js.Stream(ctx, p.streamName) + if err != nil { + return fmt.Errorf("failed to get stream: %w", err) + } + + // Configure durable pull consumer + consumerConfig := jetstream.ConsumerConfig{ + Name: p.config.ConsumerName, + Durable: p.config.ConsumerName, + AckPolicy: jetstream.AckExplicitPolicy, + AckWait: p.config.AckWait, + MaxAckPending: p.config.MaxAckPending, + FilterSubject: p.subject, + DeliverPolicy: jetstream.DeliverAllPolicy, + MaxDeliver: p.config.MaxRetries + 1, // Include first delivery + } + + // Create or update the consumer + consumer, err := stream.CreateOrUpdateConsumer(ctx, consumerConfig) + if err != nil { + return fmt.Errorf("failed to create strategy consumer: %w", err) + } + + p.consumer = consumer + return nil +} + +// ensureStream makes sure the configured stream exists +func (p *PullStrategy) ensureStream(ctx context.Context) error { + p.logger.Debugf("ensuring stream is connected") + + // Get JetStream connection + if !p.cm.IsConnected() { + p.logger.Debugf("connecting to NATS") + if err := p.cm.Connect(); err != nil { + return fmt.Errorf("failed to connect to NATS: %w", err) + } + } + + // Set default retention policy if not configured + retentionPolicy := p.config.RetentionPolicy + if retentionPolicy == 0 { + retentionPolicy = jetstream.WorkQueuePolicy + } + + // Create stream config + streamConfig := jetstream.StreamConfig{ + Name: p.streamName, + Subjects: []string{fmt.Sprintf("%v.>", p.streamName)}, + Retention: retentionPolicy, + } + + // Ensure stream exists + p.logger.Debugf("ensuring stream %s exists", p.streamName) + _, err := p.cm.EnsureStream(ctx, streamConfig) + return err +} + +// Consume pulls a batch of messages and converts them to our Message type +func (p *PullStrategy) Consume(ctx context.Context, workerID int) ([]jetstream.Msg, error) { + if p.consumer == nil { + return nil, errors.New("consumer not initialized") + } + + p.logger.Debug("pulling messages", logger.Fields{ + "worker_id": workerID, + "batch_size": p.config.BatchSize, + "timeout": p.config.PollTimeout, + }) + + // Fetch messages from stream + jsMsgs, err := p.consumer.Fetch( + p.config.BatchSize, + jetstream.FetchMaxWait(p.config.PollTimeout), + ) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + // This is normal for pull-based when no messages are available + return []jetstream.Msg{}, nil + } + return nil, fmt.Errorf("failed to fetch messages: %w", err) + } + + // Convert jetstream messages to our Message type + messages := []jetstream.Msg{} + for jsMsg := range jsMsgs.Messages() { + messages = append(messages, jsMsg) + } + + return messages, nil +} diff --git a/simian-go/app/tryitout/consumer/index.go b/simian-go/app/tryitout/consumer/index.go index ea2039c..b7e90a1 100644 --- a/simian-go/app/tryitout/consumer/index.go +++ b/simian-go/app/tryitout/consumer/index.go @@ -4,12 +4,14 @@ import ( "context" "github.com/nats-io/nats.go/jetstream" + "github.com/sanity-io/litter" "github.com/spf13/cobra" "github.com/simiancreative/simiango/errors" "github.com/simiancreative/simiango/logger" "github.com/simiancreative/simiango/messaging/natsjscm" "github.com/simiancreative/simiango/messaging/natsjscon" + "github.com/simiancreative/simiango/messaging/natsjsstrategypull" "github.com/simiancreative/simiango/sig" cli "github.com/simiancreative/simiango/simian-go/app/tryitout" ) @@ -32,7 +34,7 @@ func run() error { config := natsjscon.ConsumerConfig{ StreamName: "test", ConsumerName: "test", - Subject: "test", + Subject: "test.something.>", } connectionConfig := natsjscm.ConnectionConfig{ @@ -44,6 +46,13 @@ func run() error { return errors.Wrap(err, "failed to create connection manager") } + strategy, err := natsjsstrategypull.New(natsjsstrategypull.Config{ + ConsumerName: "test-consumer", + }) + if err != nil { + return errors.Wrap(err, "failed to create pull strategy") + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -51,7 +60,7 @@ func run() error { NewConsumer(config). SetLogger(logger). SetConnector(connector). - SetStrategy(&Strategy{}). + SetStrategy(strategy). SetProcessor(processor). Start(ctx) @@ -72,16 +81,11 @@ func processor( ctx context.Context, msgs []jetstream.Msg, ) map[jetstream.Msg]natsjscon.ProcessStatus { - return map[jetstream.Msg]natsjscon.ProcessStatus{} -} + logger.Debug("processing messages", logger.Fields{ + "messages": msgs, + }) -type Strategy struct { -} + litter.Dump(msgs[0].Data()) -func (s *Strategy) Setup(ctx context.Context) error { - return nil -} - -func (s *Strategy) Consume(ctx context.Context, workerID int) ([]jetstream.Msg, error) { - return nil, nil + return map[jetstream.Msg]natsjscon.ProcessStatus{} } From 34ee6189eb4eecde89bc804ef67be81260fd6d8a Mon Sep 17 00:00:00 2001 From: Ross Nelson Date: Sun, 9 Mar 2025 22:47:44 -0400 Subject: [PATCH 04/15] feat(messaging): add NATS JetStream publisher command and fix logger integration - Add publisher command to simian-go CLI for publishing test messages to NATS - Fix circuit breaker logger to use configured logger instead of global - Update NATS consumer interface to require Debugf method - Add debug logging to message acknowledgement - Improve consumer example to properly process messages --- circuitbreaker/circuitbreaker.go | 8 +- messaging/natsjscon/natsjscon.go | 1 + .../natsjscon/natsjscon_handle_result.go | 2 + simian-go/app/tryitout/consumer/index.go | 9 +- simian-go/app/tryitout/publisher/index.go | 104 ++++++++++++++++++ simian-go/main.go | 1 + 6 files changed, 119 insertions(+), 6 deletions(-) create mode 100644 simian-go/app/tryitout/publisher/index.go diff --git a/circuitbreaker/circuitbreaker.go b/circuitbreaker/circuitbreaker.go index 3e4b579..6019dd6 100644 --- a/circuitbreaker/circuitbreaker.go +++ b/circuitbreaker/circuitbreaker.go @@ -100,8 +100,8 @@ func (cb *CircuitBreaker) Allow() bool { allowed = true } - logger.Debug("circuit breaker allow check", logger.Fields{ - "state": cb.state.String(), + cb.config.Logger.Debug("circuit breaker allow check", logger.Fields{ + "state": cb.state, "allowed": allowed, "attempts": cb.attempts, "max_calls": cb.config.HalfOpenMaxCalls, @@ -123,13 +123,13 @@ func (cb *CircuitBreaker) RecordStart() bool { switch cb.state { case StateOpen: - logger.Debug("attempt rejected - circuit open", logger.Fields{ + cb.config.Logger.Debug("attempt rejected - circuit open", logger.Fields{ "state": cb.state.String(), }) return false case StateHalfOpen: if cb.attempts >= cb.config.HalfOpenMaxCalls { - logger.Debug("attempt rejected - max half-open calls reached", logger.Fields{ + cb.config.Logger.Debug("attempt rejected - max half-open calls reached", logger.Fields{ "attempts": cb.attempts, "max_calls": cb.config.HalfOpenMaxCalls, }) diff --git a/messaging/natsjscon/natsjscon.go b/messaging/natsjscon/natsjscon.go index 3097648..d9744ee 100644 --- a/messaging/natsjscon/natsjscon.go +++ b/messaging/natsjscon/natsjscon.go @@ -15,6 +15,7 @@ import ( // Logger is the interface for logging type Logger interface { Debug(args ...any) + Debugf(format string, args ...any) Error(args ...any) } diff --git a/messaging/natsjscon/natsjscon_handle_result.go b/messaging/natsjscon/natsjscon_handle_result.go index b07d1b9..53b4a06 100644 --- a/messaging/natsjscon/natsjscon_handle_result.go +++ b/messaging/natsjscon/natsjscon_handle_result.go @@ -34,6 +34,8 @@ func (c *Consumer) handleResult(msg jetstream.Msg, status ProcessStatus) { } func handleSuccess(msg jetstream.Msg, c *Consumer) { + c.logger.Debugf("message processed successfully, acknowledging") + // Acknowledge successful processing err := msg.Ack() diff --git a/simian-go/app/tryitout/consumer/index.go b/simian-go/app/tryitout/consumer/index.go index b7e90a1..bc0dc00 100644 --- a/simian-go/app/tryitout/consumer/index.go +++ b/simian-go/app/tryitout/consumer/index.go @@ -85,7 +85,12 @@ func processor( "messages": msgs, }) - litter.Dump(msgs[0].Data()) + processed := map[jetstream.Msg]natsjscon.ProcessStatus{} - return map[jetstream.Msg]natsjscon.ProcessStatus{} + for _, msg := range msgs { + processed[msg] = natsjscon.Success + logger.Debugf("processing message: %s", litter.Sdump(msg)) + } + + return processed } diff --git a/simian-go/app/tryitout/publisher/index.go b/simian-go/app/tryitout/publisher/index.go new file mode 100644 index 0000000..8e0c27d --- /dev/null +++ b/simian-go/app/tryitout/publisher/index.go @@ -0,0 +1,104 @@ +package publisher + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/nats-io/nats.go" + "github.com/spf13/cobra" + + "github.com/simiancreative/simiango/errors" + "github.com/simiancreative/simiango/logger" + "github.com/simiancreative/simiango/messaging/natsjscm" + "github.com/simiancreative/simiango/messaging/natsjspub" + cli "github.com/simiancreative/simiango/simian-go/app/tryitout" +) + +var ( + count int + streamName string + subjectName string +) + +var cmd = &cobra.Command{ + Use: "publisher", + Short: "publish messages to a nats stream", + RunE: func(cmd *cobra.Command, args []string) error { + return run() + }, +} + +func init() { + cmd.Flags().IntVarP(&count, "count", "c", 100, "Number of messages to publish") + cmd.Flags().StringVarP(&streamName, "stream", "s", "test", "Stream name") + cmd.Flags().StringVarP(&subjectName, "subject", "j", "test.something.batch", "Subject name") + + cli.Cmd.AddCommand(cmd) +} + +func run() error { + logger := logger.New() + + // Configure connection + connectionConfig := natsjscm.ConnectionConfig{ + Logger: logger, + URL: "nats://localhost:4222", + } + + // Create connection manager + connector, err := natsjscm.NewConnectionManager(connectionConfig) + if err != nil { + return errors.Wrap(err, "failed to create connection manager") + } + + publisher, err := natsjspub.NewPublisher(natsjspub.Dependencies{ + Connector: connector, + Logger: logger, + }, natsjspub.Config{ + StreamName: streamName, + Subject: subjectName, + }) + if err != nil { + return errors.Wrap(err, "failed to create publisher") + } + + // Publish messages in rapid succession + startTime := time.Now() + + logger.Infof("Publishing %d messages to %s...", count, subjectName) + + for i := 1; i <= count; i++ { + data := fmt.Sprintf("Test message %d", i) + + // Create headers for tracing + hdr := nats.Header{} + hdr.Add("X-Message-ID", strconv.Itoa(i)) + hdr.Add("X-Timestamp", time.Now().Format(time.RFC3339Nano)) + hdr.Add("X-Batch-Total", strconv.Itoa(count)) + + // Publish with message options + msg := &nats.Msg{ + Subject: subjectName, + Header: hdr, + Data: []byte(data), + } + + ctx := context.Background() + _, err := publisher.Publish(ctx, msg) + + if err != nil { + logger.Errorf("Failed to publish message %d: %v", i, err) + continue + } + } + + duration := time.Since(startTime) + rate := float64(count) / duration.Seconds() + + logger.Infof("Published %d messages in %v (%.2f msgs/sec)", count, duration, rate) + + return nil +} + diff --git a/simian-go/main.go b/simian-go/main.go index d7d50fe..b1104f0 100644 --- a/simian-go/main.go +++ b/simian-go/main.go @@ -14,6 +14,7 @@ import ( _ "github.com/simiancreative/simiango/simian-go/app/token/generate" _ "github.com/simiancreative/simiango/simian-go/app/token/test" _ "github.com/simiancreative/simiango/simian-go/app/tryitout/consumer" + _ "github.com/simiancreative/simiango/simian-go/app/tryitout/publisher" "github.com/simiancreative/simiango/simian-go/app" ) From b12874b5ff0dc5f64e6ef9fd6b75e309fbc8622e Mon Sep 17 00:00:00 2001 From: Ross Nelson Date: Mon, 10 Mar 2025 10:01:26 -0400 Subject: [PATCH 05/15] feat: add circuit breaker to pull strategy - Integrated circuit breaker into PullStrategy to handle message consumption failures. - Added circuit breaker configuration to PullStrategy's Config struct. - Implemented circuit breaker checks and recording in the Consume method. - Updated consumer example to include circuit breaker initialization. --- messaging/natsjsstrategypull/pull.go | 31 ++++++++++++++++++++++++ simian-go/app/tryitout/consumer/index.go | 7 ++++++ 2 files changed, 38 insertions(+) diff --git a/messaging/natsjsstrategypull/pull.go b/messaging/natsjsstrategypull/pull.go index 2cfc951..68379b5 100644 --- a/messaging/natsjsstrategypull/pull.go +++ b/messaging/natsjsstrategypull/pull.go @@ -7,6 +7,7 @@ import ( "time" "github.com/nats-io/nats.go/jetstream" + "github.com/simiancreative/simiango/circuitbreaker" "github.com/simiancreative/simiango/logger" "github.com/simiancreative/simiango/messaging/natsjscm" "github.com/simiancreative/simiango/messaging/natsjscon" @@ -40,6 +41,9 @@ type Config struct { // RetentionPolicy is the retention policy for the stream RetentionPolicy jetstream.RetentionPolicy + + // Breaker is the configuration for the circuit breaker + Breaker circuitbreaker.Breaker } // PullStrategy implements pull-based message consumption @@ -167,17 +171,40 @@ func (p *PullStrategy) Consume(ctx context.Context, workerID int) ([]jetstream.M return nil, errors.New("consumer not initialized") } + if p.breaker() != nil && !p.breaker().Allow() { + p.logger.Debug("circuit breaker open", logger.Fields{ + "worker_id": workerID, + "state": p.breaker().GetState(), + }) + return nil, errors.New("circuit breaker open") + } + p.logger.Debug("pulling messages", logger.Fields{ "worker_id": workerID, "batch_size": p.config.BatchSize, "timeout": p.config.PollTimeout, }) + // Record attempt start if circuit breaker is configured + var cbRecorded bool + if p.breaker() != nil { + cbRecorded = p.breaker().RecordStart() + if !cbRecorded { + return nil, fmt.Errorf("circuit breaker rejected request") + } + } + // Fetch messages from stream jsMsgs, err := p.consumer.Fetch( p.config.BatchSize, jetstream.FetchMaxWait(p.config.PollTimeout), ) + + // Record result in circuit breaker + if p.breaker() != nil && cbRecorded { + p.breaker().RecordResult(err == nil) + } + if err != nil { if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { // This is normal for pull-based when no messages are available @@ -194,3 +221,7 @@ func (p *PullStrategy) Consume(ctx context.Context, workerID int) ([]jetstream.M return messages, nil } + +func (p *PullStrategy) breaker() circuitbreaker.Breaker { + return p.config.Breaker +} diff --git a/simian-go/app/tryitout/consumer/index.go b/simian-go/app/tryitout/consumer/index.go index bc0dc00..668aae0 100644 --- a/simian-go/app/tryitout/consumer/index.go +++ b/simian-go/app/tryitout/consumer/index.go @@ -7,6 +7,7 @@ import ( "github.com/sanity-io/litter" "github.com/spf13/cobra" + "github.com/simiancreative/simiango/circuitbreaker" "github.com/simiancreative/simiango/errors" "github.com/simiancreative/simiango/logger" "github.com/simiancreative/simiango/messaging/natsjscm" @@ -46,8 +47,14 @@ func run() error { return errors.Wrap(err, "failed to create connection manager") } + breaker, err := circuitbreaker.NewDefault() + if err != nil { + return errors.Wrap(err, "failed to create circuit breaker") + } + strategy, err := natsjsstrategypull.New(natsjsstrategypull.Config{ ConsumerName: "test-consumer", + Breaker: breaker, }) if err != nil { return errors.Wrap(err, "failed to create pull strategy") From 4bb1ecba5cfe24e112a0174ce8c326e41a1970d6 Mon Sep 17 00:00:00 2001 From: Ross Nelson Date: Mon, 10 Mar 2025 11:21:53 -0400 Subject: [PATCH 06/15] fix: ensure test.sh fails CI when tests fail MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Properly capture and propagate the exit code from go test to ensure GitHub Actions fails when tests fail, while still generating junit reports. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- bin/test.sh | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/bin/test.sh b/bin/test.sh index aa75cb0..8e6e055 100755 --- a/bin/test.sh +++ b/bin/test.sh @@ -6,11 +6,14 @@ go install github.com/jstemmer/go-junit-report/v2@latest list=`go list ./... | grep -v mocks | grep -v docs | grep -v errors | grep -v examples` -go test -v -coverpkg=./... -race -covermode=atomic -coverprofile=coverage.out 2>&1 $list > test.log +go test -v -coverpkg=./... -race -covermode=atomic -coverprofile=coverage.out $list 2>&1 | tee test.log +test_exit=${PIPESTATUS[0]} -cat test.log +cat test.log | go-junit-report > junit.xml -cat test.log | go-junit-report -set-exit-code > junit.xml +if [ $test_exit -ne 0 ]; then + exit $test_exit +fi coverage=$(go tool cover -func coverage.out | grep total | awk '{print $3}') From e9c0b6d11e740976c954a5d4a5b7ea11a9c87fc4 Mon Sep 17 00:00:00 2001 From: Ross Nelson Date: Mon, 10 Mar 2025 14:17:59 -0400 Subject: [PATCH 07/15] fix: add reconnection handling to NATS JetStream pull strategy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ensure pull strategy can reconnect when the connection manager loses and recreates its connection. Previously, the strategy would fail to recover when the JetStream object was recreated. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- messaging/natsjsstrategypull/pull.go | 71 ++++++++++++++++++++++++---- 1 file changed, 61 insertions(+), 10 deletions(-) diff --git a/messaging/natsjsstrategypull/pull.go b/messaging/natsjsstrategypull/pull.go index 68379b5..344c6f7 100644 --- a/messaging/natsjsstrategypull/pull.go +++ b/messaging/natsjsstrategypull/pull.go @@ -50,6 +50,7 @@ type Config struct { type PullStrategy struct { config Config cm natsjscm.Connector + cb circuitbreaker.Breaker consumer jetstream.Consumer streamName string @@ -95,6 +96,7 @@ func (p *PullStrategy) Setup(ctx context.Context) error { p.subject = ctx.Value(natsjscon.CtxKey("subject")).(string) p.logger = ctx.Value(natsjscon.CtxKey("logger")).(Logger) p.cm = ctx.Value(natsjscon.CtxKey("connection-manager")).(natsjscm.Connector) + p.cb = p.config.Breaker err := p.ensureStream(ctx) if err != nil { @@ -167,14 +169,15 @@ func (p *PullStrategy) ensureStream(ctx context.Context) error { // Consume pulls a batch of messages and converts them to our Message type func (p *PullStrategy) Consume(ctx context.Context, workerID int) ([]jetstream.Msg, error) { - if p.consumer == nil { - return nil, errors.New("consumer not initialized") + // Check if we need to reconnect/reinitialize + if err := p.ensureConsumerActive(ctx); err != nil { + return nil, fmt.Errorf("failed to ensure consumer: %w", err) } - if p.breaker() != nil && !p.breaker().Allow() { + if p.cb != nil && !p.cb.Allow() { p.logger.Debug("circuit breaker open", logger.Fields{ "worker_id": workerID, - "state": p.breaker().GetState(), + "state": p.cb.GetState(), }) return nil, errors.New("circuit breaker open") } @@ -187,8 +190,8 @@ func (p *PullStrategy) Consume(ctx context.Context, workerID int) ([]jetstream.M // Record attempt start if circuit breaker is configured var cbRecorded bool - if p.breaker() != nil { - cbRecorded = p.breaker().RecordStart() + if p.cb != nil { + cbRecorded = p.cb.RecordStart() if !cbRecorded { return nil, fmt.Errorf("circuit breaker rejected request") } @@ -201,8 +204,8 @@ func (p *PullStrategy) Consume(ctx context.Context, workerID int) ([]jetstream.M ) // Record result in circuit breaker - if p.breaker() != nil && cbRecorded { - p.breaker().RecordResult(err == nil) + if p.cb != nil && cbRecorded { + p.cb.RecordResult(err == nil) } if err != nil { @@ -222,6 +225,54 @@ func (p *PullStrategy) Consume(ctx context.Context, workerID int) ([]jetstream.M return messages, nil } -func (p *PullStrategy) breaker() circuitbreaker.Breaker { - return p.config.Breaker +// ensureConsumerActive checks the connection status and reinitializes the consumer if needed +func (p *PullStrategy) ensureConsumerActive(ctx context.Context) error { + if p.consumer != nil && p.cm.IsConnected() { + // Check if the JS context in the connection manager has changed + currentJS := p.cm.GetJetStream() + if currentJS != nil { + return nil // All good, connection is active + } + } + + p.logger.Debug("reconnecting consumer") + + // Ensure connection and stream + if err := p.ensureStream(ctx); err != nil { + return fmt.Errorf("failed to ensure stream: %w", err) + } + + // Get current JetStream instance + js := p.cm.GetJetStream() + if js == nil { + return errors.New("JetStream connection not available") + } + + // Get stream + stream, err := js.Stream(ctx, p.streamName) + if err != nil { + return fmt.Errorf("failed to get stream: %w", err) + } + + // Configure durable pull consumer + consumerConfig := jetstream.ConsumerConfig{ + Name: p.config.ConsumerName, + Durable: p.config.ConsumerName, + AckPolicy: jetstream.AckExplicitPolicy, + AckWait: p.config.AckWait, + MaxAckPending: p.config.MaxAckPending, + FilterSubject: p.subject, + DeliverPolicy: jetstream.DeliverAllPolicy, + MaxDeliver: p.config.MaxRetries + 1, // Include first delivery + } + + // Create or update the consumer + consumer, err := stream.CreateOrUpdateConsumer(ctx, consumerConfig) + if err != nil { + return fmt.Errorf("failed to create/update consumer: %w", err) + } + + p.consumer = consumer + p.logger.Debug("consumer reconnected successfully") + return nil } From 827674c8f16ddd1b543e49778dcc5ae4405cf633 Mon Sep 17 00:00:00 2001 From: Ross Nelson Date: Tue, 11 Mar 2025 11:47:34 -0400 Subject: [PATCH 08/15] feat: refactor consumer creation and setup - Moved consumer creation logic to a new `createConsumer` method - Updated `Setup` and `ensureConsumerActive` methods to use `createConsumer` - Added new `connection` package for shared connection management - Refactored `consumer` package to use shared connection and modularized setup - Improved logging in `processor` function --- messaging/natsjsstrategypull/pull.go | 46 +++------- .../app/tryitout/connection/connection.go | 25 ++++++ simian-go/app/tryitout/consumer/index.go | 87 ++++++++++--------- 3 files changed, 86 insertions(+), 72 deletions(-) create mode 100644 simian-go/app/tryitout/connection/connection.go diff --git a/messaging/natsjsstrategypull/pull.go b/messaging/natsjsstrategypull/pull.go index 344c6f7..7e1826d 100644 --- a/messaging/natsjsstrategypull/pull.go +++ b/messaging/natsjsstrategypull/pull.go @@ -103,37 +103,7 @@ func (p *PullStrategy) Setup(ctx context.Context) error { return fmt.Errorf("failed to ensure stream: %w", err) } - js := p.cm.GetJetStream() - if js == nil { - return errors.New("JetStream connection not available") - } - - // Get stream - stream, err := js.Stream(ctx, p.streamName) - if err != nil { - return fmt.Errorf("failed to get stream: %w", err) - } - - // Configure durable pull consumer - consumerConfig := jetstream.ConsumerConfig{ - Name: p.config.ConsumerName, - Durable: p.config.ConsumerName, - AckPolicy: jetstream.AckExplicitPolicy, - AckWait: p.config.AckWait, - MaxAckPending: p.config.MaxAckPending, - FilterSubject: p.subject, - DeliverPolicy: jetstream.DeliverAllPolicy, - MaxDeliver: p.config.MaxRetries + 1, // Include first delivery - } - - // Create or update the consumer - consumer, err := stream.CreateOrUpdateConsumer(ctx, consumerConfig) - if err != nil { - return fmt.Errorf("failed to create strategy consumer: %w", err) - } - - p.consumer = consumer - return nil + return p.createConsumer(ctx) } // ensureStream makes sure the configured stream exists @@ -236,12 +206,22 @@ func (p *PullStrategy) ensureConsumerActive(ctx context.Context) error { } p.logger.Debug("reconnecting consumer") - + // Ensure connection and stream if err := p.ensureStream(ctx); err != nil { return fmt.Errorf("failed to ensure stream: %w", err) } + // Create consumer + if err := p.createConsumer(ctx); err != nil { + return fmt.Errorf("failed to create consumer: %w", err) + } + + p.logger.Debug("consumer reconnected successfully") + return nil +} + +func (p *PullStrategy) createConsumer(ctx context.Context) error { // Get current JetStream instance js := p.cm.GetJetStream() if js == nil { @@ -273,6 +253,6 @@ func (p *PullStrategy) ensureConsumerActive(ctx context.Context) error { } p.consumer = consumer - p.logger.Debug("consumer reconnected successfully") + return nil } diff --git a/simian-go/app/tryitout/connection/connection.go b/simian-go/app/tryitout/connection/connection.go new file mode 100644 index 0000000..abeaa56 --- /dev/null +++ b/simian-go/app/tryitout/connection/connection.go @@ -0,0 +1,25 @@ +package connection + +import ( + "github.com/simiancreative/simiango/logger" + "github.com/simiancreative/simiango/messaging/natsjscm" +) + +var Shared natsjscm.Connector + +func init() { + log := logger.New() + + connectionConfig := natsjscm.ConnectionConfig{ + Logger: log, + URL: "nats://localhost:4222", + } + + connector, err := natsjscm.NewConnectionManager(connectionConfig) + if err != nil { + log.Errorf("failed to create connection manager: %v", err) + panic(err) + } + + Shared = connector +} diff --git a/simian-go/app/tryitout/consumer/index.go b/simian-go/app/tryitout/consumer/index.go index 668aae0..8120a0d 100644 --- a/simian-go/app/tryitout/consumer/index.go +++ b/simian-go/app/tryitout/consumer/index.go @@ -4,7 +4,6 @@ import ( "context" "github.com/nats-io/nats.go/jetstream" - "github.com/sanity-io/litter" "github.com/spf13/cobra" "github.com/simiancreative/simiango/circuitbreaker" @@ -14,7 +13,9 @@ import ( "github.com/simiancreative/simiango/messaging/natsjscon" "github.com/simiancreative/simiango/messaging/natsjsstrategypull" "github.com/simiancreative/simiango/sig" + cli "github.com/simiancreative/simiango/simian-go/app/tryitout" + "github.com/simiancreative/simiango/simian-go/app/tryitout/connection" ) var cmd = &cobra.Command{ @@ -32,71 +33,79 @@ func init() { func run() error { logger := logger.New() - config := natsjscon.ConsumerConfig{ - StreamName: "test", - ConsumerName: "test", - Subject: "test.something.>", - } + connector := connection.Shared + breaker := newBreaker() + strategy := newStrategy(breaker) + consumer := newConsumer(logger, connector, strategy) - connectionConfig := natsjscm.ConnectionConfig{ - Logger: logger, - URL: "nats://localhost:4222", - } - connector, err := natsjscm.NewConnectionManager(connectionConfig) - if err != nil { - return errors.Wrap(err, "failed to create connection manager") - } + done, exit := sig.New().Catch() - breaker, err := circuitbreaker.NewDefault() + err := consumer.Start(done) if err != nil { - return errors.Wrap(err, "failed to create circuit breaker") + return errors.Wrap(err, "failed to start consumer") } - strategy, err := natsjsstrategypull.New(natsjsstrategypull.Config{ - ConsumerName: "test-consumer", - Breaker: breaker, - }) - if err != nil { - return errors.Wrap(err, "failed to create pull strategy") - } + <-exit.Done() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + return nil +} - err = natsjscon. - NewConsumer(config). +func newConsumer( + logger natsjscon.Logger, + connector natsjscm.Connector, + strategy natsjscon.ConsumptionStrategy, +) natsjscon.Consumer { + return *natsjscon.NewConsumer(natsjscon.ConsumerConfig{ + StreamName: "test", + ConsumerName: "test", + Subject: "test.something.>", + }). SetLogger(logger). SetConnector(connector). SetStrategy(strategy). - SetProcessor(processor). - Start(ctx) + SetProcessor(processor) +} +func newBreaker() circuitbreaker.Breaker { + breaker, err := circuitbreaker.NewDefault() if err != nil { - return errors.Wrap(err, "failed to start consumer") + logger.Errorf("failed to create circuit breaker: %v", err) + panic(err) } - _, exit := sig. - New(). - Catch() + return breaker +} - <-exit.Done() +func newStrategy(breaker circuitbreaker.Breaker) natsjscon.ConsumptionStrategy { + strategy, err := natsjsstrategypull.New(natsjsstrategypull.Config{ + ConsumerName: "test-consumer", + Breaker: breaker, + }) - return nil + if err != nil { + logger.Errorf("failed to create pull strategy: %v", err) + panic(err) + } + + return strategy } func processor( ctx context.Context, msgs []jetstream.Msg, ) map[jetstream.Msg]natsjscon.ProcessStatus { - logger.Debug("processing messages", logger.Fields{ - "messages": msgs, - }) + log := logger.New() + + log.Infof("processing messages: %d", len(msgs)) processed := map[jetstream.Msg]natsjscon.ProcessStatus{} for _, msg := range msgs { processed[msg] = natsjscon.Success - logger.Debugf("processing message: %s", litter.Sdump(msg)) + log.Info("processing message", logger.Fields{ + "data": string(msg.Data()), + "hdr": msg.Headers(), + }) } return processed From f768662b6db6a3534ea04265165ba0de515bb63a Mon Sep 17 00:00:00 2001 From: Ross Nelson Date: Tue, 11 Mar 2025 14:41:05 -0400 Subject: [PATCH 09/15] test --- messaging/natsjscon/natsjscon_test.go | 55 +++++++++++++++++++-------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/messaging/natsjscon/natsjscon_test.go b/messaging/natsjscon/natsjscon_test.go index fef7253..762872d 100644 --- a/messaging/natsjscon/natsjscon_test.go +++ b/messaging/natsjscon/natsjscon_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/nats-io/nats.go/jetstream" "github.com/simiancreative/simiango/messaging/natsjscon" "github.com/simiancreative/simiango/mocks/logger" "github.com/simiancreative/simiango/mocks/messaging/natsjscm" @@ -14,26 +15,41 @@ import ( "github.com/stretchr/testify/mock" ) -func newConsumer(t *testing.T) *natsjscon.Consumer { - logger := &logger.MockLogger{} - jetstream := &natsjscm.MockJetStream{} - connector := &natsjscm.MockConnectionManager{} - config := natsjscon.ConsumerConfig{} - dlqHandler := &natsjsdlq.MockDLQHandler{} - strategy := &conmock.MockStrategy{} - processor := &conmock.ProcessorMock{} +type dependencies struct { + logger *logger.MockLogger + jetstream *natsjscm.MockJetStream + connector *natsjscm.MockConnectionManager + dlqHandler *natsjsdlq.MockDLQHandler + strategy *conmock.MockStrategy + processor *conmock.ProcessorMock +} + +func (d *dependencies) newConsumer(t *testing.T) *natsjscon.Consumer { + config := natsjscon.ConsumerConfig{ + StreamName: "test", + ConsumerName: "test-consumer", + Subject: "test.subject.>", + } + + d.logger = &logger.MockLogger{} + d.jetstream = &natsjscm.MockJetStream{} + d.connector = &natsjscm.MockConnectionManager{} + d.dlqHandler = &natsjsdlq.MockDLQHandler{} + d.strategy = &conmock.MockStrategy{} + d.processor = &conmock.ProcessorMock{} - connector.SetJetStream(jetstream) + d.connector.SetJetStream(d.jetstream) - logger.On("Debug", mock.Anything) + d.logger.On("Debug", mock.Anything) + d.connector.On("Connect", mock.Anything).Return(nil) consumer := natsjscon. NewConsumer(config). - SetLogger(logger). - SetConnector(connector). - SetDLQHandler(dlqHandler). - SetStrategy(strategy). - SetProcessor(processor.Process) + SetLogger(d.logger). + SetConnector(d.connector). + SetDLQHandler(d.dlqHandler). + SetStrategy(d.strategy). + SetProcessor(d.processor.Process) assert.NotNil(t, consumer) @@ -41,10 +57,17 @@ func newConsumer(t *testing.T) *natsjscon.Consumer { } func TestNewConsumer(t *testing.T) { - c := newConsumer(t) + d := new(dependencies) + c := d.newConsumer(t) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + d.strategy.On("Setup", mock.Anything).Return(nil) + d.strategy. + On("Consume", mock.Anything, mock.Anything). + Return([]jetstream.Msg{}, nil) + err := c.Start(ctx) assert.NoError(t, err) From 31ef4700569fe5114ef98bdcf9438c2f4f7a381b Mon Sep 17 00:00:00 2001 From: Ross Nelson Date: Tue, 11 Mar 2025 16:15:37 -0400 Subject: [PATCH 10/15] Add msg mock --- messaging/natsjscon/natsjscon_test.go | 55 ++++++++++----- mocks/messaging/natsjscm/msg.go | 99 +++++++++++++++++++++++++++ 2 files changed, 136 insertions(+), 18 deletions(-) create mode 100644 mocks/messaging/natsjscm/msg.go diff --git a/messaging/natsjscon/natsjscon_test.go b/messaging/natsjscon/natsjscon_test.go index 762872d..0d6d434 100644 --- a/messaging/natsjscon/natsjscon_test.go +++ b/messaging/natsjscon/natsjscon_test.go @@ -15,6 +15,42 @@ import ( "github.com/stretchr/testify/mock" ) +func newDependencies() *dependencies { + d := &dependencies{ + logger: &logger.MockLogger{}, + jetstream: &natsjscm.MockJetStream{}, + connector: &natsjscm.MockConnectionManager{}, + dlqHandler: &natsjsdlq.MockDLQHandler{}, + strategy: &conmock.MockStrategy{}, + processor: &conmock.ProcessorMock{}, + } + + msg := &natsjscm.JetStreamMsg{} + msg.On("Metadata").Return(new(jetstream.MsgMetadata), nil) + msg.On("Ack").Return(nil) + msgs := []jetstream.Msg{msg, msg, msg, msg} + + processed := make(map[jetstream.Msg]natsjscon.ProcessStatus) + for _, msg := range msgs { + processed[msg] = natsjscon.Success + } + + d.logger.On("Debug", mock.Anything).Maybe() + d.logger.On("Debugf", mock.Anything, mock.Anything).Maybe() + d.logger.On("Error", mock.Anything).Maybe() + d.connector.On("Connect").Return(nil).Maybe() + d.processor.On("Process", mock.Anything, mock.Anything).Return(processed).Maybe() + + // Setup default strategy mocks to avoid test failures + d.strategy.On("Setup", mock.Anything).Return(nil).Maybe() + + d.strategy.On("Consume", mock.Anything, mock.Anything).Return(msgs, nil).Maybe() + + d.connector.SetJetStream(d.jetstream) + + return d +} + type dependencies struct { logger *logger.MockLogger jetstream *natsjscm.MockJetStream @@ -31,18 +67,6 @@ func (d *dependencies) newConsumer(t *testing.T) *natsjscon.Consumer { Subject: "test.subject.>", } - d.logger = &logger.MockLogger{} - d.jetstream = &natsjscm.MockJetStream{} - d.connector = &natsjscm.MockConnectionManager{} - d.dlqHandler = &natsjsdlq.MockDLQHandler{} - d.strategy = &conmock.MockStrategy{} - d.processor = &conmock.ProcessorMock{} - - d.connector.SetJetStream(d.jetstream) - - d.logger.On("Debug", mock.Anything) - d.connector.On("Connect", mock.Anything).Return(nil) - consumer := natsjscon. NewConsumer(config). SetLogger(d.logger). @@ -57,17 +81,12 @@ func (d *dependencies) newConsumer(t *testing.T) *natsjscon.Consumer { } func TestNewConsumer(t *testing.T) { - d := new(dependencies) + d := newDependencies() c := d.newConsumer(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - d.strategy.On("Setup", mock.Anything).Return(nil) - d.strategy. - On("Consume", mock.Anything, mock.Anything). - Return([]jetstream.Msg{}, nil) - err := c.Start(ctx) assert.NoError(t, err) diff --git a/mocks/messaging/natsjscm/msg.go b/mocks/messaging/natsjscm/msg.go new file mode 100644 index 0000000..8170552 --- /dev/null +++ b/mocks/messaging/natsjscm/msg.go @@ -0,0 +1,99 @@ +package natsjscm + +import ( + "context" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/mock" +) + +type JetStreamMsg struct { + mock.Mock +} + +// Metadata returns [MsgMetadata] for a JetStream message. +func (m *JetStreamMsg) Metadata() (*jetstream.MsgMetadata, error) { + called := m.Called() + return called.Get(0).(*jetstream.MsgMetadata), called.Error(1) +} + +// Data returns the message body. +func (m *JetStreamMsg) Data() []byte { + called := m.Called() + return called.Get(0).([]byte) +} + +// Headers returns a map of headers for a message. +func (m *JetStreamMsg) Headers() nats.Header { + called := m.Called() + return called.Get(0).(nats.Header) +} + +// Subject returns a subject on which a message is published. +func (m *JetStreamMsg) Subject() string { + called := m.Called() + return called.String(0) +} + +// Reply returns a reply subject for a JetStream message. +func (m *JetStreamMsg) Reply() string { + called := m.Called() + return called.String(0) +} + +// Ack acknowledges a message. This tells the server that the message was +// successfully processed and it can move on to the next message. +func (m *JetStreamMsg) Ack() error { + call := m.Called() + return call.Error(0) +} + +// DoubleAck acknowledges a message and waits for ack reply from the server. +// While it impacts performance, it is useful for scenarios where +// message loss is not acceptable. +func (m *JetStreamMsg) DoubleAck(ctx context.Context) error { + call := m.Called(ctx) + return call.Error(0) +} + +// Nak negatively acknowledges a message. This tells the server to +// redeliver the message. +func (m *JetStreamMsg) Nak() error { + call := m.Called() + return call.Error(0) +} + +// NakWithDelay negatively acknowledges a message. This tells the server +// to redeliver the message after the given delay. +func (m *JetStreamMsg) NakWithDelay(delay time.Duration) error { + called := m.Called(delay) + return called.Error(0) +} + +// InProgress tells the server that this message is being worked on. It +// resets the redelivery timer on the server. +func (m *JetStreamMsg) InProgress() error { + called := m.Called() + return called.Error(0) +} + +// Term tells the server to not redeliver this message, regardless of +// the value of MaxDeliver. +func (m *JetStreamMsg) Term() error { + called := m.Called() + return called.Error(0) +} + +// TermWithReason tells the server to not redeliver this message, regardless of +// the value of MaxDeliver. The provided reason will be included in JetStream +// advisory event sent by the server. +// +// Note: This will only work with JetStream servers >= 2.10.4. +// For older servers, TermWithReason will be ignored by the server and the message +// will not be terminated. +func (m *JetStreamMsg) TermWithReason(reason string) error { + called := m.Called(reason) + return called.Error(0) +} From 544272449b6371ff8d834472c04e2ff99255ea5d Mon Sep 17 00:00:00 2001 From: Ross Nelson Date: Wed, 12 Mar 2025 15:28:33 -0400 Subject: [PATCH 11/15] refactor: update mocks and tests for natsjscon package - Replaced inline mock dependencies with reusable mock dependencies from conmock package. - Updated tests in natsjscon_test.go to use new mock dependencies. - Added new tests for natsjsstrategypull package. - Created new mock implementations for jetstream.Consumer and other interfaces in natsjscm package. - Refactored existing mocks to follow a consistent naming convention. --- messaging/natsjscon/natsjscon_test.go | 75 +---- messaging/natsjsstrategypull/pull_test.go | 45 +++ mocks/messaging/natsjscm/consumer.go | 143 +++++++++ mocks/messaging/natsjscm/msg.go | 26 +- mocks/messaging/natsjscm/stream.go | 344 ++++++++++++++++++++++ mocks/messaging/natsjscon/natsjscon.go | 83 ++++++ 6 files changed, 630 insertions(+), 86 deletions(-) create mode 100644 messaging/natsjsstrategypull/pull_test.go create mode 100644 mocks/messaging/natsjscm/consumer.go create mode 100644 mocks/messaging/natsjscm/stream.go diff --git a/messaging/natsjscon/natsjscon_test.go b/messaging/natsjscon/natsjscon_test.go index 0d6d434..c3d9dde 100644 --- a/messaging/natsjscon/natsjscon_test.go +++ b/messaging/natsjscon/natsjscon_test.go @@ -5,84 +5,13 @@ import ( "testing" "time" - "github.com/nats-io/nats.go/jetstream" - "github.com/simiancreative/simiango/messaging/natsjscon" - "github.com/simiancreative/simiango/mocks/logger" - "github.com/simiancreative/simiango/mocks/messaging/natsjscm" conmock "github.com/simiancreative/simiango/mocks/messaging/natsjscon" - "github.com/simiancreative/simiango/mocks/messaging/natsjsdlq" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" ) -func newDependencies() *dependencies { - d := &dependencies{ - logger: &logger.MockLogger{}, - jetstream: &natsjscm.MockJetStream{}, - connector: &natsjscm.MockConnectionManager{}, - dlqHandler: &natsjsdlq.MockDLQHandler{}, - strategy: &conmock.MockStrategy{}, - processor: &conmock.ProcessorMock{}, - } - - msg := &natsjscm.JetStreamMsg{} - msg.On("Metadata").Return(new(jetstream.MsgMetadata), nil) - msg.On("Ack").Return(nil) - msgs := []jetstream.Msg{msg, msg, msg, msg} - - processed := make(map[jetstream.Msg]natsjscon.ProcessStatus) - for _, msg := range msgs { - processed[msg] = natsjscon.Success - } - - d.logger.On("Debug", mock.Anything).Maybe() - d.logger.On("Debugf", mock.Anything, mock.Anything).Maybe() - d.logger.On("Error", mock.Anything).Maybe() - d.connector.On("Connect").Return(nil).Maybe() - d.processor.On("Process", mock.Anything, mock.Anything).Return(processed).Maybe() - - // Setup default strategy mocks to avoid test failures - d.strategy.On("Setup", mock.Anything).Return(nil).Maybe() - - d.strategy.On("Consume", mock.Anything, mock.Anything).Return(msgs, nil).Maybe() - - d.connector.SetJetStream(d.jetstream) - - return d -} - -type dependencies struct { - logger *logger.MockLogger - jetstream *natsjscm.MockJetStream - connector *natsjscm.MockConnectionManager - dlqHandler *natsjsdlq.MockDLQHandler - strategy *conmock.MockStrategy - processor *conmock.ProcessorMock -} - -func (d *dependencies) newConsumer(t *testing.T) *natsjscon.Consumer { - config := natsjscon.ConsumerConfig{ - StreamName: "test", - ConsumerName: "test-consumer", - Subject: "test.subject.>", - } - - consumer := natsjscon. - NewConsumer(config). - SetLogger(d.logger). - SetConnector(d.connector). - SetDLQHandler(d.dlqHandler). - SetStrategy(d.strategy). - SetProcessor(d.processor.Process) - - assert.NotNil(t, consumer) - - return consumer -} - func TestNewConsumer(t *testing.T) { - d := newDependencies() - c := d.newConsumer(t) + d := conmock.NewDependencies() + c := d.NewConsumer(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/messaging/natsjsstrategypull/pull_test.go b/messaging/natsjsstrategypull/pull_test.go new file mode 100644 index 0000000..039c655 --- /dev/null +++ b/messaging/natsjsstrategypull/pull_test.go @@ -0,0 +1,45 @@ +package natsjsstrategypull_test + +import ( + "context" + "testing" + + "github.com/simiancreative/simiango/messaging/natsjscon" + "github.com/simiancreative/simiango/messaging/natsjsstrategypull" + conmock "github.com/simiancreative/simiango/mocks/messaging/natsjscon" + "github.com/stretchr/testify/assert" +) + +func TestNew(t *testing.T) { + config := natsjsstrategypull.Config{ + ConsumerName: "test-consumer", + } + strategy, err := natsjsstrategypull.New(config) + + assert.NoError(t, err) + assert.NotNil(t, strategy) +} + +func TestSetup(t *testing.T) { + d := conmock.NewDependencies() + + config := natsjsstrategypull.Config{ + ConsumerName: "test-consumer", + } + strategy, err := natsjsstrategypull.New(config) + assert.NoError(t, err) + + ctx := context.TODO() + + key := natsjscon.CtxKey("stream-name") + ctx = context.WithValue(ctx, key, "test") + key = natsjscon.CtxKey("subject") + ctx = context.WithValue(ctx, key, "test.subject") + key = natsjscon.CtxKey("logger") + ctx = context.WithValue(ctx, key, d.Logger) + key = natsjscon.CtxKey("connection-manager") + ctx = context.WithValue(ctx, key, d.Connector) + + err = strategy.Setup(ctx) + assert.NoError(t, err) +} diff --git a/mocks/messaging/natsjscm/consumer.go b/mocks/messaging/natsjscm/consumer.go new file mode 100644 index 0000000..3290402 --- /dev/null +++ b/mocks/messaging/natsjscm/consumer.go @@ -0,0 +1,143 @@ +package natsjscm + +import ( + "context" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/mock" +) + +// MockConsumer is a stretcher-based mock for the jetstream.Consumer interface +type MockConsumer struct { + mock.Mock +} + +// Fetch mocks the Consumer.Fetch method +func (m *MockConsumer) Fetch( + batch int, + opts ...jetstream.FetchOpt, +) (jetstream.MessageBatch, error) { + args := []interface{}{batch} + for _, opt := range opts { + args = append(args, opt) + } + result := m.Called(args...) + + var messageBatch jetstream.MessageBatch + if result.Get(0) != nil { + messageBatch = result.Get(0).(jetstream.MessageBatch) + } + + return messageBatch, result.Error(1) +} + +// FetchBytes mocks the Consumer.FetchBytes method +func (m *MockConsumer) FetchBytes( + maxBytes int, + opts ...jetstream.FetchOpt, +) (jetstream.MessageBatch, error) { + args := []interface{}{maxBytes} + for _, opt := range opts { + args = append(args, opt) + } + result := m.Called(args...) + + var messageBatch jetstream.MessageBatch + if result.Get(0) != nil { + messageBatch = result.Get(0).(jetstream.MessageBatch) + } + + return messageBatch, result.Error(1) +} + +// FetchNoWait mocks the Consumer.FetchNoWait method +func (m *MockConsumer) FetchNoWait(batch int) (jetstream.MessageBatch, error) { + result := m.Called(batch) + + var messageBatch jetstream.MessageBatch + if result.Get(0) != nil { + messageBatch = result.Get(0).(jetstream.MessageBatch) + } + + return messageBatch, result.Error(1) +} + +// Consume mocks the Consumer.Consume method +func (m *MockConsumer) Consume( + handler jetstream.MessageHandler, + opts ...jetstream.PullConsumeOpt, +) (jetstream.ConsumeContext, error) { + args := []interface{}{handler} + for _, opt := range opts { + args = append(args, opt) + } + result := m.Called(args...) + + var consumeContext jetstream.ConsumeContext + if result.Get(0) != nil { + consumeContext = result.Get(0).(jetstream.ConsumeContext) + } + + return consumeContext, result.Error(1) +} + +// Messages mocks the Consumer.Messages method +func (m *MockConsumer) Messages( + opts ...jetstream.PullMessagesOpt, +) (jetstream.MessagesContext, error) { + args := []interface{}{} + for _, opt := range opts { + args = append(args, opt) + } + result := m.Called(args...) + + var messagesContext jetstream.MessagesContext + if result.Get(0) != nil { + messagesContext = result.Get(0).(jetstream.MessagesContext) + } + + return messagesContext, result.Error(1) +} + +// Next mocks the Consumer.Next method +func (m *MockConsumer) Next(opts ...jetstream.FetchOpt) (jetstream.Msg, error) { + args := []interface{}{} + for _, opt := range opts { + args = append(args, opt) + } + result := m.Called(args...) + + var msg jetstream.Msg + if result.Get(0) != nil { + msg = result.Get(0).(jetstream.Msg) + } + + return msg, result.Error(1) +} + +// Info mocks the Consumer.Info method +func (m *MockConsumer) Info(ctx context.Context) (*jetstream.ConsumerInfo, error) { + result := m.Called(ctx) + + var consumerInfo *jetstream.ConsumerInfo + if result.Get(0) != nil { + consumerInfo = result.Get(0).(*jetstream.ConsumerInfo) + } + + return consumerInfo, result.Error(1) +} + +// CachedInfo mocks the Consumer.CachedInfo method +func (m *MockConsumer) CachedInfo() *jetstream.ConsumerInfo { + result := m.Called() + + var consumerInfo *jetstream.ConsumerInfo + if result.Get(0) != nil { + consumerInfo = result.Get(0).(*jetstream.ConsumerInfo) + } + + return consumerInfo +} + +// Ensure the mock implements the interface +var _ jetstream.Consumer = (*MockConsumer)(nil) diff --git a/mocks/messaging/natsjscm/msg.go b/mocks/messaging/natsjscm/msg.go index 8170552..b1ebc55 100644 --- a/mocks/messaging/natsjscm/msg.go +++ b/mocks/messaging/natsjscm/msg.go @@ -9,43 +9,43 @@ import ( "github.com/stretchr/testify/mock" ) -type JetStreamMsg struct { +type MockJetStreamMsg struct { mock.Mock } // Metadata returns [MsgMetadata] for a JetStream message. -func (m *JetStreamMsg) Metadata() (*jetstream.MsgMetadata, error) { +func (m *MockJetStreamMsg) Metadata() (*jetstream.MsgMetadata, error) { called := m.Called() return called.Get(0).(*jetstream.MsgMetadata), called.Error(1) } // Data returns the message body. -func (m *JetStreamMsg) Data() []byte { +func (m *MockJetStreamMsg) Data() []byte { called := m.Called() return called.Get(0).([]byte) } // Headers returns a map of headers for a message. -func (m *JetStreamMsg) Headers() nats.Header { +func (m *MockJetStreamMsg) Headers() nats.Header { called := m.Called() return called.Get(0).(nats.Header) } // Subject returns a subject on which a message is published. -func (m *JetStreamMsg) Subject() string { +func (m *MockJetStreamMsg) Subject() string { called := m.Called() return called.String(0) } // Reply returns a reply subject for a JetStream message. -func (m *JetStreamMsg) Reply() string { +func (m *MockJetStreamMsg) Reply() string { called := m.Called() return called.String(0) } // Ack acknowledges a message. This tells the server that the message was // successfully processed and it can move on to the next message. -func (m *JetStreamMsg) Ack() error { +func (m *MockJetStreamMsg) Ack() error { call := m.Called() return call.Error(0) } @@ -53,35 +53,35 @@ func (m *JetStreamMsg) Ack() error { // DoubleAck acknowledges a message and waits for ack reply from the server. // While it impacts performance, it is useful for scenarios where // message loss is not acceptable. -func (m *JetStreamMsg) DoubleAck(ctx context.Context) error { +func (m *MockJetStreamMsg) DoubleAck(ctx context.Context) error { call := m.Called(ctx) return call.Error(0) } // Nak negatively acknowledges a message. This tells the server to // redeliver the message. -func (m *JetStreamMsg) Nak() error { +func (m *MockJetStreamMsg) Nak() error { call := m.Called() return call.Error(0) } // NakWithDelay negatively acknowledges a message. This tells the server // to redeliver the message after the given delay. -func (m *JetStreamMsg) NakWithDelay(delay time.Duration) error { +func (m *MockJetStreamMsg) NakWithDelay(delay time.Duration) error { called := m.Called(delay) return called.Error(0) } // InProgress tells the server that this message is being worked on. It // resets the redelivery timer on the server. -func (m *JetStreamMsg) InProgress() error { +func (m *MockJetStreamMsg) InProgress() error { called := m.Called() return called.Error(0) } // Term tells the server to not redeliver this message, regardless of // the value of MaxDeliver. -func (m *JetStreamMsg) Term() error { +func (m *MockJetStreamMsg) Term() error { called := m.Called() return called.Error(0) } @@ -93,7 +93,7 @@ func (m *JetStreamMsg) Term() error { // Note: This will only work with JetStream servers >= 2.10.4. // For older servers, TermWithReason will be ignored by the server and the message // will not be terminated. -func (m *JetStreamMsg) TermWithReason(reason string) error { +func (m *MockJetStreamMsg) TermWithReason(reason string) error { called := m.Called(reason) return called.Error(0) } diff --git a/mocks/messaging/natsjscm/stream.go b/mocks/messaging/natsjscm/stream.go new file mode 100644 index 0000000..1a7ef8a --- /dev/null +++ b/mocks/messaging/natsjscm/stream.go @@ -0,0 +1,344 @@ +package natsjscm + +import ( + "context" + + "github.com/nats-io/nats.go/jetstream" + "github.com/stretchr/testify/mock" +) + +// MockStream is a mock implementation of the Stream interface +type MockStream struct { + mock.Mock +} + +// CachedInfo provides a mock function with given fields: +func (_m *MockStream) CachedInfo() *jetstream.StreamInfo { + ret := _m.Called() + + var r0 *jetstream.StreamInfo + if rf, ok := ret.Get(0).(func() *jetstream.StreamInfo); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*jetstream.StreamInfo) + } + } + + return r0 +} + +// Consumer provides a mock function with given fields: ctx, name +func (_m *MockStream) Consumer(ctx context.Context, name string) (jetstream.Consumer, error) { + ret := _m.Called(ctx, name) + + var r0 jetstream.Consumer + if rf, ok := ret.Get(0).(func(context.Context, string) jetstream.Consumer); ok { + r0 = rf(ctx, name) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(jetstream.Consumer) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, name) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ConsumerNames provides a mock function with given fields: ctx +func (_m *MockStream) ConsumerNames(ctx context.Context) jetstream.ConsumerNameLister { + ret := _m.Called(ctx) + + var r0 jetstream.ConsumerNameLister + if rf, ok := ret.Get(0).(func(context.Context) jetstream.ConsumerNameLister); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(jetstream.ConsumerNameLister) + } + } + + return r0 +} + +// CreateConsumer provides a mock function with given fields: ctx, cfg +func (_m *MockStream) CreateConsumer( + ctx context.Context, + cfg jetstream.ConsumerConfig, +) (jetstream.Consumer, error) { + ret := _m.Called(ctx, cfg) + + var r0 jetstream.Consumer + if rf, ok := ret.Get(0).(func(context.Context, jetstream.ConsumerConfig) jetstream.Consumer); ok { + r0 = rf(ctx, cfg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(jetstream.Consumer) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, jetstream.ConsumerConfig) error); ok { + r1 = rf(ctx, cfg) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// CreateOrUpdateConsumer provides a mock function with given fields: ctx, cfg +func (_m *MockStream) CreateOrUpdateConsumer( + ctx context.Context, + cfg jetstream.ConsumerConfig, +) (jetstream.Consumer, error) { + ret := _m.Called(ctx, cfg) + + var r0 jetstream.Consumer + if rf, ok := ret.Get(0).(func(context.Context, jetstream.ConsumerConfig) jetstream.Consumer); ok { + r0 = rf(ctx, cfg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(jetstream.Consumer) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, jetstream.ConsumerConfig) error); ok { + r1 = rf(ctx, cfg) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DeleteConsumer provides a mock function with given fields: ctx, name +func (_m *MockStream) DeleteConsumer(ctx context.Context, name string) error { + ret := _m.Called(ctx, name) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, name) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// DeleteMsg provides a mock function with given fields: ctx, seq +func (_m *MockStream) DeleteMsg(ctx context.Context, seq uint64) error { + ret := _m.Called(ctx, seq) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) error); ok { + r0 = rf(ctx, seq) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetLastMsgForSubject provides a mock function with given fields: ctx, subject +func (_m *MockStream) GetLastMsgForSubject( + ctx context.Context, + subject string, +) (*jetstream.RawStreamMsg, error) { + ret := _m.Called(ctx, subject) + + var r0 *jetstream.RawStreamMsg + if rf, ok := ret.Get(0).(func(context.Context, string) *jetstream.RawStreamMsg); ok { + r0 = rf(ctx, subject) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*jetstream.RawStreamMsg) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, subject) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetMsg provides a mock function with given fields: ctx, seq, opts +func (_m *MockStream) GetMsg( + ctx context.Context, + seq uint64, + opts ...jetstream.GetMsgOpt, +) (*jetstream.RawStreamMsg, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, seq) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *jetstream.RawStreamMsg + if rf, ok := ret.Get(0).(func(context.Context, uint64, ...jetstream.GetMsgOpt) *jetstream.RawStreamMsg); ok { + r0 = rf(ctx, seq, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*jetstream.RawStreamMsg) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, uint64, ...jetstream.GetMsgOpt) error); ok { + r1 = rf(ctx, seq, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Info provides a mock function with given fields: ctx, opts +func (_m *MockStream) Info( + ctx context.Context, + opts ...jetstream.StreamInfoOpt, +) (*jetstream.StreamInfo, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *jetstream.StreamInfo + if rf, ok := ret.Get(0).(func(context.Context, ...jetstream.StreamInfoOpt) *jetstream.StreamInfo); ok { + r0 = rf(ctx, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*jetstream.StreamInfo) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, ...jetstream.StreamInfoOpt) error); ok { + r1 = rf(ctx, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ListConsumers provides a mock function with given fields: ctx +func (_m *MockStream) ListConsumers(ctx context.Context) jetstream.ConsumerInfoLister { + ret := _m.Called(ctx) + + var r0 jetstream.ConsumerInfoLister + if rf, ok := ret.Get(0).(func(context.Context) jetstream.ConsumerInfoLister); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(jetstream.ConsumerInfoLister) + } + } + + return r0 +} + +// OrderedConsumer provides a mock function with given fields: ctx, cfg +func (_m *MockStream) OrderedConsumer( + ctx context.Context, + cfg jetstream.OrderedConsumerConfig, +) (jetstream.Consumer, error) { + ret := _m.Called(ctx, cfg) + + var r0 jetstream.Consumer + if rf, ok := ret.Get(0).(func(context.Context, jetstream.OrderedConsumerConfig) jetstream.Consumer); ok { + r0 = rf(ctx, cfg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(jetstream.Consumer) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, jetstream.OrderedConsumerConfig) error); ok { + r1 = rf(ctx, cfg) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Purge provides a mock function with given fields: ctx, opts +func (_m *MockStream) Purge(ctx context.Context, opts ...jetstream.StreamPurgeOpt) error { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, ...jetstream.StreamPurgeOpt) error); ok { + r0 = rf(ctx, opts...) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SecureDeleteMsg provides a mock function with given fields: ctx, seq +func (_m *MockStream) SecureDeleteMsg(ctx context.Context, seq uint64) error { + ret := _m.Called(ctx, seq) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) error); ok { + r0 = rf(ctx, seq) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// UpdateConsumer provides a mock function with given fields: ctx, cfg +func (_m *MockStream) UpdateConsumer( + ctx context.Context, + cfg jetstream.ConsumerConfig, +) (jetstream.Consumer, error) { + ret := _m.Called(ctx, cfg) + + var r0 jetstream.Consumer + if rf, ok := ret.Get(0).(func(context.Context, jetstream.ConsumerConfig) jetstream.Consumer); ok { + r0 = rf(ctx, cfg) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(jetstream.Consumer) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, jetstream.ConsumerConfig) error); ok { + r1 = rf(ctx, cfg) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/mocks/messaging/natsjscon/natsjscon.go b/mocks/messaging/natsjscon/natsjscon.go index d956973..5e920ec 100644 --- a/mocks/messaging/natsjscon/natsjscon.go +++ b/mocks/messaging/natsjscon/natsjscon.go @@ -2,12 +2,95 @@ package natsjscon import ( "context" + "testing" "github.com/nats-io/nats.go/jetstream" "github.com/simiancreative/simiango/messaging/natsjscon" + "github.com/simiancreative/simiango/mocks/logger" + "github.com/simiancreative/simiango/mocks/messaging/natsjscm" + "github.com/simiancreative/simiango/mocks/messaging/natsjsdlq" "github.com/stretchr/testify/mock" + "github.com/tj/assert" ) +func NewDependencies() *Dependencies { + d := &Dependencies{ + Logger: &logger.MockLogger{}, + Jetstream: &natsjscm.MockJetStream{}, + Connector: &natsjscm.MockConnectionManager{}, + Stream: &natsjscm.MockStream{}, + Consumer: &natsjscm.MockConsumer{}, + DLQHandler: &natsjsdlq.MockDLQHandler{}, + Strategy: &MockStrategy{}, + Processor: &ProcessorMock{}, + } + + msg := &natsjscm.MockJetStreamMsg{} + msg.On("Metadata").Return(new(jetstream.MsgMetadata), nil) + msg.On("Ack").Return(nil) + msgs := []jetstream.Msg{msg, msg, msg, msg} + + processed := make(map[jetstream.Msg]natsjscon.ProcessStatus) + for _, msg := range msgs { + processed[msg] = natsjscon.Success + } + + d.Logger.On("Debug", mock.Anything).Maybe() + d.Logger.On("Debugf", mock.Anything, mock.Anything).Maybe() + d.Logger.On("Error", mock.Anything).Maybe() + + d.Connector.On("Connect").Return(nil).Maybe() + d.Connector.On("IsConnected").Return(true).Maybe() + d.Connector.On("EnsureStream", mock.Anything, mock.Anything).Return(d.Jetstream, nil).Maybe() + d.Connector.On("GetJetStream").Return(d.Jetstream).Maybe() + d.Connector.SetJetStream(d.Jetstream) + + d.Jetstream.On("Stream", mock.Anything, mock.Anything). + Return(d.Stream, nil). + Maybe() + + d.Stream.On("CreateOrUpdateConsumer", mock.Anything, mock.Anything).Return(d.Consumer, nil) + + d.Processor.On("Process", mock.Anything, mock.Anything).Return(processed).Maybe() + + // Setup default strategy mocks to avoid test failures + d.Strategy.On("Setup", mock.Anything).Return(nil).Maybe() + d.Strategy.On("Consume", mock.Anything, mock.Anything).Return(msgs, nil).Maybe() + + return d +} + +type Dependencies struct { + Logger *logger.MockLogger + Jetstream *natsjscm.MockJetStream + Connector *natsjscm.MockConnectionManager + Stream *natsjscm.MockStream + Consumer *natsjscm.MockConsumer + DLQHandler *natsjsdlq.MockDLQHandler + Strategy *MockStrategy + Processor *ProcessorMock +} + +func (d *Dependencies) NewConsumer(t *testing.T) *natsjscon.Consumer { + config := natsjscon.ConsumerConfig{ + StreamName: "test", + ConsumerName: "test-consumer", + Subject: "test.subject.>", + } + + consumer := natsjscon. + NewConsumer(config). + SetLogger(d.Logger). + SetConnector(d.Connector). + SetDLQHandler(d.DLQHandler). + SetStrategy(d.Strategy). + SetProcessor(d.Processor.Process) + + assert.NotNil(t, consumer) + + return consumer +} + type MockStrategy struct { mock.Mock } From fd4ffb7e63e0aa60723e2269bd812ede12f69a64 Mon Sep 17 00:00:00 2001 From: Ross Nelson Date: Wed, 12 Mar 2025 16:15:04 -0400 Subject: [PATCH 12/15] feat: add MockMessageBatch for testing - Implemented MockMessageBatch to mock MessageBatch interface. - Added NewMockMessageBatch constructor to create mock instances. - Updated NewDependencies to include MockMessageBatch. - Modified TestConsume to use MockMessageBatch for testing. --- .tool-versions | 1 + bin/coverage.sh | 52 +++++++++++++++++++++++ messaging/natsjsstrategypull/pull_test.go | 28 ++++++++++++ mocks/messaging/natsjscm/batch.go | 39 +++++++++++++++++ mocks/messaging/natsjscon/natsjscon.go | 25 ++++++----- 5 files changed, 135 insertions(+), 10 deletions(-) create mode 100755 bin/coverage.sh create mode 100644 mocks/messaging/natsjscm/batch.go diff --git a/.tool-versions b/.tool-versions index 1e576d1..e4df726 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1 +1,2 @@ golang 1.21.7 +golangci-lint 1.64.5 diff --git a/bin/coverage.sh b/bin/coverage.sh new file mode 100755 index 0000000..45d4f55 --- /dev/null +++ b/bin/coverage.sh @@ -0,0 +1,52 @@ +#!/usr/bin/env bash + +set -e + +# Default values +should_open=false +package_path="./..." + +# Function to display usage +usage() { + echo "Usage: $0 [-p|--package ] [-o|--open] [-h|--help]" + echo " -p, --package Specify the package path to test (default: all packages)" + echo " -o, --open Open the coverage report in the default browser" + echo " -h, --help Display this help message" +} + +# Parse arguments +while [[ "$#" -gt 0 ]]; do + case $1 in + -p|--package) package_path="$2"; shift ;; + -o|--open) should_open=true ;; + -h|--help) + usage + exit 0 + ;; + *) + echo "Unknown parameter passed: $1" + usage + exit 1 + ;; + esac + shift +done + +# TODO: work through lint issues and enable golanci-lint +# lint +# golangci-lint run $package_path + +# unit test +go test $package_path -coverpkg=$package_path -race -covermode=atomic -coverprofile=coverage.out +go tool cover -html=coverage.out -o ~/Desktop/coverage.html + +# report +echo "" +echo "===" +echo "Coverage report is generated at ~/Desktop/coverage.html" +echo "===" + +# open +if [ "$should_open" = true ]; then + open ~/Desktop/coverage.html +fi diff --git a/messaging/natsjsstrategypull/pull_test.go b/messaging/natsjsstrategypull/pull_test.go index 039c655..38fc4b9 100644 --- a/messaging/natsjsstrategypull/pull_test.go +++ b/messaging/natsjsstrategypull/pull_test.go @@ -43,3 +43,31 @@ func TestSetup(t *testing.T) { err = strategy.Setup(ctx) assert.NoError(t, err) } + +func TestConsume(t *testing.T) { + d := conmock.NewDependencies() + + config := natsjsstrategypull.Config{ + ConsumerName: "test-consumer", + } + strategy, err := natsjsstrategypull.New(config) + assert.NoError(t, err) + + ctx := context.TODO() + + key := natsjscon.CtxKey("stream-name") + ctx = context.WithValue(ctx, key, "test") + key = natsjscon.CtxKey("subject") + ctx = context.WithValue(ctx, key, "test.subject") + key = natsjscon.CtxKey("logger") + ctx = context.WithValue(ctx, key, d.Logger) + key = natsjscon.CtxKey("connection-manager") + ctx = context.WithValue(ctx, key, d.Connector) + + err = strategy.Setup(ctx) + assert.NoError(t, err) + + msgs, err := strategy.Consume(ctx, 1) + assert.NoError(t, err) + assert.NotNil(t, msgs) +} diff --git a/mocks/messaging/natsjscm/batch.go b/mocks/messaging/natsjscm/batch.go new file mode 100644 index 0000000..f37b1c2 --- /dev/null +++ b/mocks/messaging/natsjscm/batch.go @@ -0,0 +1,39 @@ +package natsjscm + +// type MockMessageBatch interface { +// Messages() <-chan jetstream.Msg +// Error() error +// } + +import ( + "github.com/nats-io/nats.go/jetstream" +) + +// MockMessageBatch is a mock implementation of the MockMessageBatch interface. +type MockMessageBatch struct { + messages chan jetstream.Msg + err error +} + +// NewMockMessageBatch creates a new instance of MockMessageBatch. +func NewMockMessageBatch(messages []jetstream.Msg, err error) *MockMessageBatch { + msgChan := make(chan jetstream.Msg, len(messages)) + for _, msg := range messages { + msgChan <- msg + } + close(msgChan) + return &MockMessageBatch{ + messages: msgChan, + err: err, + } +} + +// Messages returns a channel of jetstream.Msg. +func (m *MockMessageBatch) Messages() <-chan jetstream.Msg { + return m.messages +} + +// Error returns an error. +func (m *MockMessageBatch) Error() error { + return m.err +} diff --git a/mocks/messaging/natsjscon/natsjscon.go b/mocks/messaging/natsjscon/natsjscon.go index 5e920ec..3b7c1b4 100644 --- a/mocks/messaging/natsjscon/natsjscon.go +++ b/mocks/messaging/natsjscon/natsjscon.go @@ -14,27 +14,30 @@ import ( ) func NewDependencies() *Dependencies { + msg := &natsjscm.MockJetStreamMsg{} + msg.On("Metadata").Return(new(jetstream.MsgMetadata), nil) + msg.On("Ack").Return(nil) + msgs := []jetstream.Msg{msg, msg, msg, msg} + + batch := natsjscm.NewMockMessageBatch(msgs, nil) + + processed := make(map[jetstream.Msg]natsjscon.ProcessStatus) + for _, msg := range msgs { + processed[msg] = natsjscon.Success + } + d := &Dependencies{ Logger: &logger.MockLogger{}, Jetstream: &natsjscm.MockJetStream{}, Connector: &natsjscm.MockConnectionManager{}, Stream: &natsjscm.MockStream{}, Consumer: &natsjscm.MockConsumer{}, + Batch: batch, DLQHandler: &natsjsdlq.MockDLQHandler{}, Strategy: &MockStrategy{}, Processor: &ProcessorMock{}, } - msg := &natsjscm.MockJetStreamMsg{} - msg.On("Metadata").Return(new(jetstream.MsgMetadata), nil) - msg.On("Ack").Return(nil) - msgs := []jetstream.Msg{msg, msg, msg, msg} - - processed := make(map[jetstream.Msg]natsjscon.ProcessStatus) - for _, msg := range msgs { - processed[msg] = natsjscon.Success - } - d.Logger.On("Debug", mock.Anything).Maybe() d.Logger.On("Debugf", mock.Anything, mock.Anything).Maybe() d.Logger.On("Error", mock.Anything).Maybe() @@ -50,6 +53,7 @@ func NewDependencies() *Dependencies { Maybe() d.Stream.On("CreateOrUpdateConsumer", mock.Anything, mock.Anything).Return(d.Consumer, nil) + d.Consumer.On("Fetch", mock.Anything, mock.Anything).Return(batch, nil) d.Processor.On("Process", mock.Anything, mock.Anything).Return(processed).Maybe() @@ -66,6 +70,7 @@ type Dependencies struct { Connector *natsjscm.MockConnectionManager Stream *natsjscm.MockStream Consumer *natsjscm.MockConsumer + Batch *natsjscm.MockMessageBatch DLQHandler *natsjsdlq.MockDLQHandler Strategy *MockStrategy Processor *ProcessorMock From 51e847a7b11754afc3a684bfbbdfa633679dfdc3 Mon Sep 17 00:00:00 2001 From: Ross Nelson Date: Wed, 12 Mar 2025 16:33:38 -0400 Subject: [PATCH 13/15] fix: remove unnecessary locking in Stop method - Removed locking in the Stop method of Consumer to prevent deadlocks. - Added a test case to ensure Stop method does not panic. --- messaging/natsjscon/natsjscon.go | 3 --- messaging/natsjscon/natsjscon_test.go | 5 +++++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/messaging/natsjscon/natsjscon.go b/messaging/natsjscon/natsjscon.go index d9744ee..259c34f 100644 --- a/messaging/natsjscon/natsjscon.go +++ b/messaging/natsjscon/natsjscon.go @@ -154,9 +154,6 @@ func (c *Consumer) SetProcessor(processor Processor) *Consumer { // Stop stops the consumer func (c *Consumer) Stop() { - c.mu.Lock() - defer c.mu.Unlock() - if !c.running { return } diff --git a/messaging/natsjscon/natsjscon_test.go b/messaging/natsjscon/natsjscon_test.go index c3d9dde..53327c2 100644 --- a/messaging/natsjscon/natsjscon_test.go +++ b/messaging/natsjscon/natsjscon_test.go @@ -21,4 +21,9 @@ func TestNewConsumer(t *testing.T) { // wait time.Sleep(500 * time.Millisecond) + + // does not panic + assert.NotPanics(t, func() { + c.Stop() + }, "Stop should not panic") } From 21ed42d90d014978abdfe512997f388ebd5691af Mon Sep 17 00:00:00 2001 From: Ross Nelson Date: Wed, 12 Mar 2025 22:08:03 -0400 Subject: [PATCH 14/15] feat: add tests for Consumer configuration validation - Added tests to validate Consumer configuration in natsjscon package. - Ensured tests cover invalid processor, connector, stream name, consumer name, and subject. - Updated Dependencies.NewConsumer to accept optional ConsumerConfig. --- messaging/natsjscon/natsjscon_start.go | 16 +++--- messaging/natsjscon/natsjscon_test.go | 75 ++++++++++++++++++++++++++ mocks/messaging/natsjscon/natsjscon.go | 17 ++++-- 3 files changed, 95 insertions(+), 13 deletions(-) diff --git a/messaging/natsjscon/natsjscon_start.go b/messaging/natsjscon/natsjscon_start.go index ae789bc..fc35211 100644 --- a/messaging/natsjscon/natsjscon_start.go +++ b/messaging/natsjscon/natsjscon_start.go @@ -15,11 +15,9 @@ func (c *Consumer) Start(ctx context.Context) error { c.mu.Lock() defer c.mu.Unlock() - ctx = c.setupCtx(ctx) - - c.debug("setting up consumer") - if err := c.setup(); err != nil { - return fmt.Errorf("failed to setup consumer: %w", err) + c.debug("is consumer already running?") + if c.running { + return errors.New("consumer is already running") } c.debug("validating consumer configuration") @@ -27,11 +25,13 @@ func (c *Consumer) Start(ctx context.Context) error { return fmt.Errorf("invalid consumer configuration: %w", err) } - c.debug("is consumer already running?") - if c.running { - return errors.New("consumer is already running") + c.debug("setting up consumer") + if err := c.setup(); err != nil { + return fmt.Errorf("failed to setup consumer: %w", err) } + ctx = c.setupCtx(ctx) + // Set up the strategy c.debug("setting up consumption strategy") if err := c.strategy.Setup(ctx); err != nil { diff --git a/messaging/natsjscon/natsjscon_test.go b/messaging/natsjscon/natsjscon_test.go index 53327c2..b0cac48 100644 --- a/messaging/natsjscon/natsjscon_test.go +++ b/messaging/natsjscon/natsjscon_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/simiancreative/simiango/messaging/natsjscon" conmock "github.com/simiancreative/simiango/mocks/messaging/natsjscon" "github.com/stretchr/testify/assert" ) @@ -27,3 +28,77 @@ func TestNewConsumer(t *testing.T) { c.Stop() }, "Stop should not panic") } + +func TestNewConsumerWithInvalidConfig(t *testing.T) { + d := conmock.NewDependencies() + + tests := []struct { + name string + fn func(c *natsjscon.Consumer) + shouldError bool + }{ + { + name: "SetProcessor", + fn: func(c *natsjscon.Consumer) { + c.SetProcessor(nil) + }, + shouldError: true, + }, + { + name: "SetConnector", + fn: func(c *natsjscon.Consumer) { + c.SetConnector(nil) + }, + shouldError: true, + }, + { + name: "ValidateStream", + fn: func(c *natsjscon.Consumer) { + *c = *d.NewConsumer(t, natsjscon.ConsumerConfig{ + StreamName: "", + ConsumerName: "test-consumer", + Subject: "test.subject.>", + }) + }, + shouldError: true, + }, + { + name: "ValidateConsumerName", + fn: func(c *natsjscon.Consumer) { + *c = *d.NewConsumer(t, natsjscon.ConsumerConfig{ + StreamName: "test-stream", + ConsumerName: "", + Subject: "test.subject.>", + }) + }, + shouldError: true, + }, + { + name: "ValidateSubject", + fn: func(c *natsjscon.Consumer) { + *c = *d.NewConsumer(t, natsjscon.ConsumerConfig{ + StreamName: "test-stream", + ConsumerName: "test-consumer", + Subject: "", + }) + }, + shouldError: true, + }, + } + + for _, tt := range tests { + c := d.NewConsumer(t) + + t.Run(tt.name, func(t *testing.T) { + tt.fn(c) + + if tt.shouldError { + assert.Error(t, c.Start(context.Background())) + } + + if !tt.shouldError { + assert.NoError(t, c.Start(context.Background())) + } + }) + } +} diff --git a/mocks/messaging/natsjscon/natsjscon.go b/mocks/messaging/natsjscon/natsjscon.go index 3b7c1b4..b9969e1 100644 --- a/mocks/messaging/natsjscon/natsjscon.go +++ b/mocks/messaging/natsjscon/natsjscon.go @@ -76,13 +76,20 @@ type Dependencies struct { Processor *ProcessorMock } -func (d *Dependencies) NewConsumer(t *testing.T) *natsjscon.Consumer { - config := natsjscon.ConsumerConfig{ - StreamName: "test", - ConsumerName: "test-consumer", - Subject: "test.subject.>", +func (d *Dependencies) NewConsumer( + t *testing.T, + configs ...natsjscon.ConsumerConfig, +) *natsjscon.Consumer { + if len(configs) == 0 { + configs = append(configs, natsjscon.ConsumerConfig{ + StreamName: "test", + ConsumerName: "test-consumer", + Subject: "test.subject.>", + }) } + config := configs[0] + consumer := natsjscon. NewConsumer(config). SetLogger(d.Logger). From 5de8f4c55e6ab5f4065b24086faac16b4a2ca8b8 Mon Sep 17 00:00:00 2001 From: Ross Nelson Date: Fri, 21 Mar 2025 16:21:59 -0400 Subject: [PATCH 15/15] refactor: update NATS handlers and message processing - Replace ReconnectHandler with DisconnectHandler in ConnectionManager - Remove handleReconnect method and inline JetStream context creation - Change Processor and ConsumptionStrategy to use channels for messages - Update tests and mocks to reflect new message handling approach - Add default case in handleResult to log unknown process status --- messaging/natsjscm/natsjscm.go | 36 +++---------------- messaging/natsjscon/natsjscon.go | 4 +-- .../natsjscon/natsjscon_handle_result.go | 6 ++++ messaging/natsjscon/natsjscon_test.go | 23 ++++++++++++ messaging/natsjsstrategypull/pull.go | 12 ++----- mocks/messaging/natsjscm/batch.go | 22 ++++++------ mocks/messaging/natsjscm/msg.go | 8 +++++ mocks/messaging/natsjscon/natsjscon.go | 22 +++++++++--- simian-go/app/tryitout/consumer/index.go | 4 +-- 9 files changed, 76 insertions(+), 61 deletions(-) diff --git a/messaging/natsjscm/natsjscm.go b/messaging/natsjscm/natsjscm.go index 516b4b2..914a4f8 100644 --- a/messaging/natsjscm/natsjscm.go +++ b/messaging/natsjscm/natsjscm.go @@ -73,8 +73,8 @@ func NewConnectionManager(config ConnectionConfig) (Connector, error) { // Add connection event handlers options = append(options, - nats.ReconnectHandler(func(_ *nats.Conn) { - cm.handleReconnect() + nats.DisconnectHandler(func(_ *nats.Conn) { + cm.retryConnection() }), ) @@ -82,34 +82,6 @@ func NewConnectionManager(config ConnectionConfig) (Connector, error) { return cm, nil } -func (cm *ConnectionManager) handleReconnect() { - cm.mu.Lock() - defer cm.mu.Unlock() - - cm.log.Debugf("NATS connection reconnected") - - // Recreate JetStream context after reconnect - if cm.nc == nil { - return - } - - js, err := cm.createJetStreamContext(cm.nc) - if err != nil { - cm.log.Errorf("Failed to recreate JetStream context %s", err) - go cm.retryConnection() - return - } - - cm.js = js - cm.log.Debugf("JetStream context recreated") -} - -// createJetStreamContext creates a new JetStream context with the current configuration -func (cm *ConnectionManager) createJetStreamContext(nc *nats.Conn) (jetstream.JetStream, error) { - // Create JetStream context - return jetstream.New(nc) -} - // retryConnection attempts to reconnect to NATS periodically func (cm *ConnectionManager) retryConnection() { for { @@ -129,7 +101,7 @@ func (cm *ConnectionManager) retryConnection() { continue } - js, err := cm.createJetStreamContext(nc) + js, err := jetstream.New(nc) if err != nil { cm.log.Errorf("Failed to recreate JetStream context: %s", err) nc.Close() @@ -169,7 +141,7 @@ func (cm *ConnectionManager) Connect() error { cm.log.Debugf("Getting JetStream context") // Create JetStream context - js, err := cm.createJetStreamContext(nc) + js, err := jetstream.New(nc) if err != nil { nc.Close() return fmt.Errorf("failed to create JetStream context: %w", err) diff --git a/messaging/natsjscon/natsjscon.go b/messaging/natsjscon/natsjscon.go index 259c34f..09cef52 100644 --- a/messaging/natsjscon/natsjscon.go +++ b/messaging/natsjscon/natsjscon.go @@ -38,7 +38,7 @@ var ResultNames = map[ProcessStatus]string{ } // Processor processes messages -type Processor func(ctx context.Context, msgs []jetstream.Msg) map[jetstream.Msg]ProcessStatus +type Processor func(ctx context.Context, msgs <-chan jetstream.Msg) map[jetstream.Msg]ProcessStatus // ConsumerConfig holds general configuration for a consumer type ConsumerConfig struct { @@ -67,7 +67,7 @@ type ConsumptionStrategy interface { Setup(ctx context.Context) error // Consume retrieves messages for processing - Consume(ctx context.Context, workerID int) ([]jetstream.Msg, error) + Consume(ctx context.Context, workerID int) (<-chan jetstream.Msg, error) } // Consumer manages consuming and processing messages diff --git a/messaging/natsjscon/natsjscon_handle_result.go b/messaging/natsjscon/natsjscon_handle_result.go index 53b4a06..24defef 100644 --- a/messaging/natsjscon/natsjscon_handle_result.go +++ b/messaging/natsjscon/natsjscon_handle_result.go @@ -30,6 +30,12 @@ func (c *Consumer) handleResult(msg jetstream.Msg, status ProcessStatus) { return case TerminalFailure: handleTerminalError(msg, c) + return + default: + c.logger.Debug("unknown process status", logger.Fields{ + "status": status, + }) + handleFailure(msg, metadata, c) } } diff --git a/messaging/natsjscon/natsjscon_test.go b/messaging/natsjscon/natsjscon_test.go index b0cac48..46771bf 100644 --- a/messaging/natsjscon/natsjscon_test.go +++ b/messaging/natsjscon/natsjscon_test.go @@ -5,7 +5,9 @@ import ( "testing" "time" + "github.com/nats-io/nats.go/jetstream" "github.com/simiancreative/simiango/messaging/natsjscon" + "github.com/simiancreative/simiango/mocks/messaging/natsjscm" conmock "github.com/simiancreative/simiango/mocks/messaging/natsjscon" "github.com/stretchr/testify/assert" ) @@ -102,3 +104,24 @@ func TestNewConsumerWithInvalidConfig(t *testing.T) { }) } } + +func TestHandleResult(t *testing.T) { + d := conmock.NewDependencies() + c := d.NewConsumer(t) + + badAck := natsjscm.NewMockJetStreamMsg(jetstream.ErrMsgAlreadyAckd) + + batch := natsjscm.NewMockMessageBatch( + []jetstream.Msg{badAck}, + nil, + ) + + d.Strategy.Reset(batch) + + err := c.Start(context.Background()) + assert.NoError(t, err) + + assert.Eventually(t, func() bool { + return d.Processor.AssertExpectations(t) && badAck.AssertExpectations(t) + }, 500*time.Millisecond, 100*time.Millisecond) +} diff --git a/messaging/natsjsstrategypull/pull.go b/messaging/natsjsstrategypull/pull.go index 7e1826d..df56c5b 100644 --- a/messaging/natsjsstrategypull/pull.go +++ b/messaging/natsjsstrategypull/pull.go @@ -138,7 +138,7 @@ func (p *PullStrategy) ensureStream(ctx context.Context) error { } // Consume pulls a batch of messages and converts them to our Message type -func (p *PullStrategy) Consume(ctx context.Context, workerID int) ([]jetstream.Msg, error) { +func (p *PullStrategy) Consume(ctx context.Context, workerID int) (<-chan jetstream.Msg, error) { // Check if we need to reconnect/reinitialize if err := p.ensureConsumerActive(ctx); err != nil { return nil, fmt.Errorf("failed to ensure consumer: %w", err) @@ -181,18 +181,12 @@ func (p *PullStrategy) Consume(ctx context.Context, workerID int) ([]jetstream.M if err != nil { if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { // This is normal for pull-based when no messages are available - return []jetstream.Msg{}, nil + return nil, nil } return nil, fmt.Errorf("failed to fetch messages: %w", err) } - // Convert jetstream messages to our Message type - messages := []jetstream.Msg{} - for jsMsg := range jsMsgs.Messages() { - messages = append(messages, jsMsg) - } - - return messages, nil + return jsMsgs.Messages(), nil } // ensureConsumerActive checks the connection status and reinitializes the consumer if needed diff --git a/mocks/messaging/natsjscm/batch.go b/mocks/messaging/natsjscm/batch.go index f37b1c2..b8a2eaf 100644 --- a/mocks/messaging/natsjscm/batch.go +++ b/mocks/messaging/natsjscm/batch.go @@ -11,29 +11,29 @@ import ( // MockMessageBatch is a mock implementation of the MockMessageBatch interface. type MockMessageBatch struct { - messages chan jetstream.Msg - err error + Msgs []jetstream.Msg + Err error } // NewMockMessageBatch creates a new instance of MockMessageBatch. func NewMockMessageBatch(messages []jetstream.Msg, err error) *MockMessageBatch { - msgChan := make(chan jetstream.Msg, len(messages)) - for _, msg := range messages { - msgChan <- msg - } - close(msgChan) return &MockMessageBatch{ - messages: msgChan, - err: err, + Msgs: messages, + Err: err, } } // Messages returns a channel of jetstream.Msg. func (m *MockMessageBatch) Messages() <-chan jetstream.Msg { - return m.messages + msgChan := make(chan jetstream.Msg, len(m.Msgs)) + for _, msg := range m.Msgs { + msgChan <- msg + } + close(msgChan) + return msgChan } // Error returns an error. func (m *MockMessageBatch) Error() error { - return m.err + return m.Err } diff --git a/mocks/messaging/natsjscm/msg.go b/mocks/messaging/natsjscm/msg.go index b1ebc55..7517b04 100644 --- a/mocks/messaging/natsjscm/msg.go +++ b/mocks/messaging/natsjscm/msg.go @@ -13,6 +13,14 @@ type MockJetStreamMsg struct { mock.Mock } +func NewMockJetStreamMsg(err error) *MockJetStreamMsg { + m := &MockJetStreamMsg{} + m.On("Metadata").Return(&jetstream.MsgMetadata{}, nil) + m.On("Ack").Return(err) + + return m +} + // Metadata returns [MsgMetadata] for a JetStream message. func (m *MockJetStreamMsg) Metadata() (*jetstream.MsgMetadata, error) { called := m.Called() diff --git a/mocks/messaging/natsjscon/natsjscon.go b/mocks/messaging/natsjscon/natsjscon.go index b9969e1..0689555 100644 --- a/mocks/messaging/natsjscon/natsjscon.go +++ b/mocks/messaging/natsjscon/natsjscon.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" "github.com/simiancreative/simiango/messaging/natsjscon" "github.com/simiancreative/simiango/mocks/logger" @@ -15,6 +16,7 @@ import ( func NewDependencies() *Dependencies { msg := &natsjscm.MockJetStreamMsg{} + msg.On("Headers").Return(nats.Header{}, nil) msg.On("Metadata").Return(new(jetstream.MsgMetadata), nil) msg.On("Ack").Return(nil) msgs := []jetstream.Msg{msg, msg, msg, msg} @@ -53,13 +55,13 @@ func NewDependencies() *Dependencies { Maybe() d.Stream.On("CreateOrUpdateConsumer", mock.Anything, mock.Anything).Return(d.Consumer, nil) - d.Consumer.On("Fetch", mock.Anything, mock.Anything).Return(batch, nil) + d.Consumer.On("Fetch", mock.Anything, mock.Anything).Return(d.Batch, nil) d.Processor.On("Process", mock.Anything, mock.Anything).Return(processed).Maybe() // Setup default strategy mocks to avoid test failures d.Strategy.On("Setup", mock.Anything).Return(nil).Maybe() - d.Strategy.On("Consume", mock.Anything, mock.Anything).Return(msgs, nil).Maybe() + d.Strategy.On("Consume", mock.Anything, mock.Anything).Return(d.Batch.Messages(), nil).Maybe() return d } @@ -112,9 +114,19 @@ func (m *MockStrategy) Setup(ctx context.Context) error { return c.Error(0) } -func (m *MockStrategy) Consume(ctx context.Context, workerID int) ([]jetstream.Msg, error) { +func (m *MockStrategy) Consume(ctx context.Context, workerID int) (<-chan jetstream.Msg, error) { + c := m.Called(ctx, workerID) - return c.Get(0).([]jetstream.Msg), c.Error(1) + result, err := c.Get(0).(<-chan jetstream.Msg), c.Error(1) + + return result, err +} + +func (m *MockStrategy) Reset(batch *natsjscm.MockMessageBatch) { + m.ExpectedCalls = nil + + m.On("Setup", mock.Anything).Return(nil).Maybe() + m.On("Consume", mock.Anything, mock.Anything).Return(batch.Messages(), nil).Maybe() } type ProcessorMock struct { @@ -123,7 +135,7 @@ type ProcessorMock struct { func (p *ProcessorMock) Process( ctx context.Context, - msgs []jetstream.Msg, + msgs <-chan jetstream.Msg, ) map[jetstream.Msg]natsjscon.ProcessStatus { args := p.Called(ctx, msgs) return args.Get(0).(map[jetstream.Msg]natsjscon.ProcessStatus) diff --git a/simian-go/app/tryitout/consumer/index.go b/simian-go/app/tryitout/consumer/index.go index 8120a0d..5602ca9 100644 --- a/simian-go/app/tryitout/consumer/index.go +++ b/simian-go/app/tryitout/consumer/index.go @@ -92,7 +92,7 @@ func newStrategy(breaker circuitbreaker.Breaker) natsjscon.ConsumptionStrategy { func processor( ctx context.Context, - msgs []jetstream.Msg, + msgs <-chan jetstream.Msg, ) map[jetstream.Msg]natsjscon.ProcessStatus { log := logger.New() @@ -100,7 +100,7 @@ func processor( processed := map[jetstream.Msg]natsjscon.ProcessStatus{} - for _, msg := range msgs { + for msg := range msgs { processed[msg] = natsjscon.Success log.Info("processing message", logger.Fields{ "data": string(msg.Data()),