From c08c0aa28f84f898023e160f3f6303777dc4b7dd Mon Sep 17 00:00:00 2001 From: atjhoendz Date: Mon, 14 Jul 2025 09:47:20 +0700 Subject: [PATCH 1/2] hotfix: fix register jetstream to register all clients first before init stream and subscribe --- jetstream.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/jetstream.go b/jetstream.go index 2dea848..44c8d0c 100644 --- a/jetstream.go +++ b/jetstream.go @@ -219,19 +219,26 @@ func NewNATSConnection(NATSJSHost string, clients []JetStreamRegistrar, natsOpts func registerJetStreamClient(js JetStream, clients []JetStreamRegistrar) error { for _, client := range clients { client.RegisterNATSJetStream(js) + } + + for _, client := range clients { if streamRegistrar, ok := client.(StreamRegistrar); ok { err := streamRegistrar.InitStream() if err != nil { + logrus.WithField("client", client).Error(err) return err } } + if subscriber, ok := client.(Subscriber); ok { err := subscriber.SubscribeJetStreamEvent() if err != nil { + logrus.WithField("client", client).Error(err) return err } } } + return nil } From 2a06c4b1ba870ed1841fbfb6cbd5742c982d73f6 Mon Sep 17 00:00:00 2001 From: atjhoendz Date: Mon, 14 Jul 2025 13:53:05 +0700 Subject: [PATCH 2/2] fix logs, and add unit test --- jetstream.go | 4 ++-- jetstream_test.go | 56 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/jetstream.go b/jetstream.go index 44c8d0c..ee853c1 100644 --- a/jetstream.go +++ b/jetstream.go @@ -225,7 +225,7 @@ func registerJetStreamClient(js JetStream, clients []JetStreamRegistrar) error { if streamRegistrar, ok := client.(StreamRegistrar); ok { err := streamRegistrar.InitStream() if err != nil { - logrus.WithField("client", client).Error(err) + logrus.WithField("client", fmt.Sprintf("%T", client)).Error(err) return err } } @@ -233,7 +233,7 @@ func registerJetStreamClient(js JetStream, clients []JetStreamRegistrar) error { if subscriber, ok := client.(Subscriber); ok { err := subscriber.SubscribeJetStreamEvent() if err != nil { - logrus.WithField("client", client).Error(err) + logrus.WithField("client", fmt.Sprintf("%T", client)).Error(err) return err } } diff --git a/jetstream_test.go b/jetstream_test.go index c410f4a..1bfa3e6 100644 --- a/jetstream_test.go +++ b/jetstream_test.go @@ -6,11 +6,13 @@ import ( "testing" "time" + "github.com/kumparan/ferstream/mock" "github.com/nats-io/nats-server/v2/server" natsserver "github.com/nats-io/nats-server/v2/test" "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" ) const defaultURL = "nats://127.0.0.1:4222" @@ -407,3 +409,57 @@ func TestConsumerInfo(t *testing.T) { assert.Nil(t, consumerInfo2) assert.Equal(t, nats.ErrConsumerNotFound, err) } + +type sClient struct { + js JetStream + isInitError bool + isSubscribeError bool +} + +func (c *sClient) RegisterNATSJetStream(js JetStream) { + c.js = js +} + +func (c *sClient) InitStream() error { + if c.isInitError { + return assert.AnError + } + return nil +} + +func (c *sClient) SubscribeJetStreamEvent() error { + if c.isSubscribeError { + return assert.AnError + } + return nil +} + +func TestRegisterJetStreamClient(t *testing.T) { + ctrl := gomock.NewController(t) + mockJS := mock.NewMockJetStream(ctrl) + + t.Run("success - register, init, and subscribe", func(t *testing.T) { + testClient := new(sClient) + + err := registerJetStreamClient(mockJS, []JetStreamRegistrar{testClient}) + assert.NoError(t, err) + }) + + t.Run("error on init stream", func(t *testing.T) { + testClient := new(sClient) + testClient.isInitError = true + + err := registerJetStreamClient(mockJS, []JetStreamRegistrar{testClient}) + assert.Error(t, err) + assert.NotNil(t, testClient.js) + }) + + t.Run("error on subscribe", func(t *testing.T) { + testClient := new(sClient) + testClient.isSubscribeError = true + + err := registerJetStreamClient(mockJS, []JetStreamRegistrar{testClient}) + assert.Error(t, err) + assert.NotNil(t, testClient.js) + }) +}