diff --git a/mocks/authmocks/plugin.go b/mocks/authmocks/plugin.go index 85e78141..0ca63b3b 100644 --- a/mocks/authmocks/plugin.go +++ b/mocks/authmocks/plugin.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.53.4. DO NOT EDIT. package authmocks @@ -58,7 +58,7 @@ func (_m *Plugin) InitConfig(_a0 config.Section) { _m.Called(_a0) } -// Name provides a mock function with given fields: +// Name provides a mock function with no fields func (_m *Plugin) Name() string { ret := _m.Called() diff --git a/mocks/crudmocks/crud.go b/mocks/crudmocks/crud.go index 3352a41a..ca8768d8 100644 --- a/mocks/crudmocks/crud.go +++ b/mocks/crudmocks/crud.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.53.4. DO NOT EDIT. package crudmocks @@ -119,7 +119,9 @@ func (_m *CRUD[T]) GetByID(ctx context.Context, id string, getOpts ...dbsql.GetO if rf, ok := ret.Get(0).(func(context.Context, string, ...dbsql.GetOption) T); ok { r0 = rf(ctx, id, getOpts...) } else { - r0 = ret.Get(0).(T) + if ret.Get(0) != nil { + r0 = ret.Get(0).(T) + } } if rf, ok := ret.Get(1).(func(context.Context, string, ...dbsql.GetOption) error); ok { @@ -154,7 +156,9 @@ func (_m *CRUD[T]) GetByName(ctx context.Context, name string, getOpts ...dbsql. if rf, ok := ret.Get(0).(func(context.Context, string, ...dbsql.GetOption) T); ok { r0 = rf(ctx, name, getOpts...) } else { - r0 = ret.Get(0).(T) + if ret.Get(0) != nil { + r0 = ret.Get(0).(T) + } } if rf, ok := ret.Get(1).(func(context.Context, string, ...dbsql.GetOption) error); ok { @@ -189,7 +193,9 @@ func (_m *CRUD[T]) GetByUUIDOrName(ctx context.Context, uuidOrName string, getOp if rf, ok := ret.Get(0).(func(context.Context, string, ...dbsql.GetOption) T); ok { r0 = rf(ctx, uuidOrName, getOpts...) } else { - r0 = ret.Get(0).(T) + if ret.Get(0) != nil { + r0 = ret.Get(0).(T) + } } if rf, ok := ret.Get(1).(func(context.Context, string, ...dbsql.GetOption) error); ok { @@ -224,7 +230,9 @@ func (_m *CRUD[T]) GetFirst(ctx context.Context, filter ffapi.Filter, getOpts .. if rf, ok := ret.Get(0).(func(context.Context, ffapi.Filter, ...dbsql.GetOption) T); ok { r0 = rf(ctx, filter, getOpts...) } else { - r0 = ret.Get(0).(T) + if ret.Get(0) != nil { + r0 = ret.Get(0).(T) + } } if rf, ok := ret.Get(1).(func(context.Context, ffapi.Filter, ...dbsql.GetOption) error); ok { @@ -275,7 +283,7 @@ func (_m *CRUD[T]) GetMany(ctx context.Context, filter ffapi.Filter) ([]T, *ffap return r0, r1, r2 } -// GetQueryFactory provides a mock function with given fields: +// GetQueryFactory provides a mock function with no fields func (_m *CRUD[T]) GetQueryFactory() ffapi.QueryFactory { ret := _m.Called() @@ -478,7 +486,7 @@ func (_m *CRUD[T]) Scoped(scope squirrel.Eq) dbsql.CRUD[T] { return r0 } -// TableAlias provides a mock function with given fields: +// TableAlias provides a mock function with no fields func (_m *CRUD[T]) TableAlias() string { ret := _m.Called() @@ -606,7 +614,7 @@ func (_m *CRUD[T]) Upsert(ctx context.Context, inst T, optimization dbsql.Upsert return r0, r1 } -// Validate provides a mock function with given fields: +// Validate provides a mock function with no fields func (_m *CRUD[T]) Validate() { _m.Called() } diff --git a/mocks/dbmigratemocks/driver.go b/mocks/dbmigratemocks/driver.go index d5a25f2f..50b5a6d4 100644 --- a/mocks/dbmigratemocks/driver.go +++ b/mocks/dbmigratemocks/driver.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.53.4. DO NOT EDIT. package dbmigratemocks @@ -15,7 +15,7 @@ type Driver struct { mock.Mock } -// Close provides a mock function with given fields: +// Close provides a mock function with no fields func (_m *Driver) Close() error { ret := _m.Called() @@ -33,7 +33,7 @@ func (_m *Driver) Close() error { return r0 } -// Drop provides a mock function with given fields: +// Drop provides a mock function with no fields func (_m *Driver) Drop() error { ret := _m.Called() @@ -51,7 +51,7 @@ func (_m *Driver) Drop() error { return r0 } -// Lock provides a mock function with given fields: +// Lock provides a mock function with no fields func (_m *Driver) Lock() error { ret := _m.Called() @@ -135,7 +135,7 @@ func (_m *Driver) SetVersion(version int, dirty bool) error { return r0 } -// Unlock provides a mock function with given fields: +// Unlock provides a mock function with no fields func (_m *Driver) Unlock() error { ret := _m.Called() @@ -153,7 +153,7 @@ func (_m *Driver) Unlock() error { return r0 } -// Version provides a mock function with given fields: +// Version provides a mock function with no fields func (_m *Driver) Version() (int, bool, error) { ret := _m.Called() diff --git a/mocks/httpservermocks/go_http_server.go b/mocks/httpservermocks/go_http_server.go index 31e13d33..418d1665 100644 --- a/mocks/httpservermocks/go_http_server.go +++ b/mocks/httpservermocks/go_http_server.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.53.4. DO NOT EDIT. package httpservermocks @@ -15,7 +15,7 @@ type GoHTTPServer struct { mock.Mock } -// Close provides a mock function with given fields: +// Close provides a mock function with no fields func (_m *GoHTTPServer) Close() error { ret := _m.Called() diff --git a/mocks/wsservermocks/protocol.go b/mocks/wsservermocks/protocol.go index 999545dd..9f6e82bc 100644 --- a/mocks/wsservermocks/protocol.go +++ b/mocks/wsservermocks/protocol.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.53.4. DO NOT EDIT. package wsservermocks diff --git a/mocks/wsservermocks/web_socket_server.go b/mocks/wsservermocks/web_socket_server.go index a1183c60..23a534de 100644 --- a/mocks/wsservermocks/web_socket_server.go +++ b/mocks/wsservermocks/web_socket_server.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.40.2. DO NOT EDIT. +// Code generated by mockery v2.53.4. DO NOT EDIT. package wsservermocks @@ -21,7 +21,7 @@ func (_m *WebSocketServer) Broadcast(ctx context.Context, stream string, payload _m.Called(ctx, stream, payload) } -// Close provides a mock function with given fields: +// Close provides a mock function with no fields func (_m *WebSocketServer) Close() { _m.Called() } diff --git a/pkg/dbsql/database.go b/pkg/dbsql/database.go index df0434d2..616af346 100644 --- a/pkg/dbsql/database.go +++ b/pkg/dbsql/database.go @@ -30,6 +30,7 @@ import ( "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/hyperledger/firefly-common/pkg/log" + "github.com/golang-migrate/migrate/v4/database" // Import migrate file source _ "github.com/golang-migrate/migrate/v4/source/file" ) @@ -139,8 +140,17 @@ func (s *Database) RunAsGroup(ctx context.Context, fn func(ctx context.Context) return s.CommitTx(ctx, tx, false /* we _are_ the auto-committer */) } -func (s *Database) applyDBMigrations(ctx context.Context, config config.Section, provider Provider) error { - driver, err := provider.GetMigrationDriver(s.db) +func (s *Database) applyDBMigrations(ctx context.Context, config config.Section, provider Provider) (err error) { + var driver database.Driver + providerClosable, isClosable := provider.(ProviderCloseableMigrationDriver) + if isClosable { + driver, err = providerClosable.GetMigrationDriverClosable(s.db) + defer func() { + _ = driver.Close() + }() + } else { + driver, err = provider.GetMigrationDriver(s.db) + } if err == nil { fileURL := "file://" + config.GetString(SQLConfMigrationsDirectory) log.L(ctx).Infof("Running migrations in: %s", fileURL) @@ -184,7 +194,7 @@ func (s *Database) BeginOrUseTx(ctx context.Context) (ctx1 context.Context, tx * ctx1 = log.WithLogger(ctx, l) before := time.Now() l.Tracef("SQL-> begin") - sqlTX, err := s.db.Begin() + sqlTX, err := s.db.BeginTx(ctx /* transaction should auto-rollback on context cancel */, nil /* default options */) if err != nil { return ctx1, nil, false, i18n.WrapError(ctx1, err, i18n.MsgDBBeginFailed) } diff --git a/pkg/dbsql/mock_provider.go b/pkg/dbsql/mock_provider.go index 1aeacdd1..21aa4817 100644 --- a/pkg/dbsql/mock_provider.go +++ b/pkg/dbsql/mock_provider.go @@ -40,6 +40,10 @@ type MockProvider struct { mmg *dbmigratemocks.Driver } +type MockProviderConnScoped struct { + MockProvider +} + type MockProviderConfig struct { FakePSQLInsert bool OpenError error @@ -108,3 +112,7 @@ func (mp *MockProvider) Open(_ string) (*sql.DB, error) { func (mp *MockProvider) GetMigrationDriver(_ *sql.DB) (migratedb.Driver, error) { return mp.mmg, mp.GetMigrationDriverError } + +func (mp *MockProviderConnScoped) GetMigrationDriverConn(_ *sql.DB) (migratedb.Driver, error) { + return mp.mmg, mp.GetMigrationDriverError +} diff --git a/pkg/dbsql/provider.go b/pkg/dbsql/provider.go index 6a8f418c..47926d2b 100644 --- a/pkg/dbsql/provider.go +++ b/pkg/dbsql/provider.go @@ -59,7 +59,7 @@ type Provider interface { // Open creates the DB instances Open(url string) (*sql.DB, error) - // GetDriver returns the driver implementation + // GetDriver returns the driver implementation - preferred if supported to implement GetMigrationDriverConn so the connection can be cleaned up (cleaning this driver up closes the whole DB) GetMigrationDriver(*sql.DB) (migratedb.Driver, error) // Features returns database specific configuration switches @@ -68,3 +68,9 @@ type Provider interface { // ApplyInsertQueryCustomizations updates the INSERT query for returning the Sequence, and returns whether it needs to be run as a query to return the Sequence field ApplyInsertQueryCustomizations(insert sq.InsertBuilder, requestConflictEmptyResult bool) (updatedInsert sq.InsertBuilder, runAsQuery bool) } + +// Implementing this interface allows cleanup of the connection used during migration +type ProviderCloseableMigrationDriver interface { + // Returns the driver implementation that is safe to close. Specifically this means the WithConnection() use in migration, rather than WithInstance(). + GetMigrationDriverClosable(*sql.DB) (migratedb.Driver, error) +} diff --git a/pkg/eventstreams/e2e_test.go b/pkg/eventstreams/e2e_test.go index cef28d3b..06e78c9e 100644 --- a/pkg/eventstreams/e2e_test.go +++ b/pkg/eventstreams/e2e_test.go @@ -35,6 +35,7 @@ import ( "github.com/hyperledger/firefly-common/pkg/wsserver" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type testData struct { @@ -106,7 +107,7 @@ func TestE2E_DeliveryWebSockets(t *testing.T) { close(ts.started) // start delivery immediately - will block as no WS connected mgr, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), p, wss, ts) - assert.NoError(t, err) + require.NoError(t, err) // Create a stream to sub-select one topic es1 := &GenericEventStream{ @@ -117,14 +118,14 @@ func TestE2E_DeliveryWebSockets(t *testing.T) { }, } created, err := mgr.UpsertStream(ctx, "stream1", es1) - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, created) // Connect our websocket and start it err = wsc.Connect() - assert.NoError(t, err) + require.NoError(t, err) err = wsc.Send(ctx, []byte(`{"type":"start","stream":"stream1"}`)) - assert.NoError(t, err) + require.NoError(t, err) expectedNumber := 1 for i := 0; i < 10; i++ { @@ -153,7 +154,7 @@ func TestE2E_DeliveryWebSocketsBroadcast(t *testing.T) { close(ts.started) // start delivery immediately - will block as no WS connected mgr, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), p, wss, ts) - assert.NoError(t, err) + require.NoError(t, err) // Create a stream to sub-select one topic es1 := &GenericEventStream{ @@ -167,21 +168,21 @@ func TestE2E_DeliveryWebSocketsBroadcast(t *testing.T) { }, } created, err := mgr.UpsertStream(ctx, "stream1", es1) - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, created) // Connect our websocket and start it err = wsc.Connect() - assert.NoError(t, err) + require.NoError(t, err) err = wsc.Send(ctx, []byte(`{"type":"start","stream":"stream1"}`)) - assert.NoError(t, err) + require.NoError(t, err) expectedNumber := 1 for i := 0; i < 10; i++ { data := <-wsc.Receive() var batch EventBatch[testData] err := json.Unmarshal(data, &batch) - assert.NoError(t, err) + require.NoError(t, err) // each batch should be 10 assert.Len(t, batch.Events, 10) for _, e := range batch.Events { @@ -207,7 +208,7 @@ func TestE2E_DeliveryWebSocketsNack(t *testing.T) { close(ts.started) // start delivery immediately - will block as no WS connected mgr, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), p, wss, ts) - assert.NoError(t, err) + require.NoError(t, err) // Create a stream to sub-select one topic es1 := &GenericEventStream{ @@ -218,14 +219,14 @@ func TestE2E_DeliveryWebSocketsNack(t *testing.T) { }, } created, err := mgr.UpsertStream(ctx, "stream1", es1) - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, created) // Connect our websocket and start it err = wsc.Connect() - assert.NoError(t, err) + require.NoError(t, err) err = wsc.Send(ctx, []byte(`{"type":"start","stream":"stream1"}`)) - assert.NoError(t, err) + require.NoError(t, err) for i := 0; i < 5; i++ { wsReceiveNack(ctx, t, wsc, func(batch *EventBatch[testData]) { @@ -253,7 +254,7 @@ func TestE2E_WebsocketDeliveryRestartReset(t *testing.T) { close(ts.started) // start delivery immediately - will block as no WS connected mgr, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), p, wss, ts) - assert.NoError(t, err) + require.NoError(t, err) // Create a stream to sub-select one topic es1 := &GenericEventStream{ @@ -264,14 +265,14 @@ func TestE2E_WebsocketDeliveryRestartReset(t *testing.T) { }, } created, err := mgr.UpsertStream(ctx, "stream1", es1) - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, created) // Connect our websocket and start it err = wsc.Connect() - assert.NoError(t, err) + require.NoError(t, err) err = wsc.Send(ctx, []byte(`{"type":"start","stream":"stream1"}`)) - assert.NoError(t, err) + require.NoError(t, err) // Get the first batch wsReceiveAck(ctx, t, wsc, func(batch *EventBatch[testData]) {}) @@ -283,23 +284,23 @@ func TestE2E_WebsocketDeliveryRestartReset(t *testing.T) { for ess == nil || ess.Statistics == nil || ess.Statistics.Checkpoint == "" { time.Sleep(1 * time.Millisecond) ess, err = mgr.GetStreamByID(ctx, es1.GetID()) - assert.NoError(t, err) + require.NoError(t, err) } // Restart and check we get called with the checkpoint - note we don't reconnect the // websocket or restart that - it remains "started" from the websocket protocol // perspective throughout err = mgr.StopStream(ctx, es1.GetID()) - assert.NoError(t, err) + require.NoError(t, err) err = mgr.StartStream(ctx, es1.GetID()) - assert.NoError(t, err) + require.NoError(t, err) wsReceiveAck(ctx, t, wsc, func(batch *EventBatch[testData]) {}) assert.Equal(t, "000000000091", ts.sequenceStartedWith) assert.Equal(t, 2, ts.startCount) // Reset it and check we get the reset err = mgr.ResetStream(ctx, es1.GetID(), ptrTo("first")) - assert.NoError(t, err) + require.NoError(t, err) wsReceiveAck(ctx, t, wsc, func(batch *EventBatch[testData]) {}) assert.Equal(t, "first", ts.sequenceStartedWith) assert.Equal(t, 3, ts.startCount) @@ -313,7 +314,7 @@ func TestE2E_ResetStreamWhileAwaitingAck(t *testing.T) { close(ts.started) // start delivery immediately - will block as no WS connected mgr, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), p, wss, ts) - assert.NoError(t, err) + require.NoError(t, err) // Create a stream to sub-select one topic es1 := &GenericEventStream{ @@ -324,29 +325,29 @@ func TestE2E_ResetStreamWhileAwaitingAck(t *testing.T) { }, } created, err := mgr.UpsertStream(ctx, "stream1", es1) - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, created) // Connect our websocket and start it err = wsc.Connect() - assert.NoError(t, err) + require.NoError(t, err) err = wsc.Send(ctx, []byte(`{"type":"start","stream":"stream1"}`)) - assert.NoError(t, err) + require.NoError(t, err) // Receive the message batch data := <-wsc.Receive() var batch EventBatch[testData] err = json.Unmarshal(data, &batch) - assert.NoError(t, err) + require.NoError(t, err) // Do a reset before we ack. err = mgr.ResetStream(ctx, "stream1", ptrTo("12345")) - assert.NoError(t, err) + require.NoError(t, err) // Should get the batch again data = <-wsc.Receive() err = json.Unmarshal(data, &batch) - assert.NoError(t, err) + require.NoError(t, err) // Check we did the reset done() @@ -362,7 +363,7 @@ func TestE2E_DeliveryWebHooks200(t *testing.T) { close(ts.started) // start delivery immediately - will block as no WS connected mgr, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), p, wss, ts) - assert.NoError(t, err) + require.NoError(t, err) got100 := make(chan struct{}) var received int64 @@ -374,7 +375,7 @@ func TestE2E_DeliveryWebHooks200(t *testing.T) { var batch EventBatch[testData] err := json.NewDecoder(req.Body).Decode(&batch) - assert.NoError(t, err) + require.NoError(t, err) // each batch should be 10 assert.Len(t, batch.Events, 10) @@ -410,14 +411,14 @@ func TestE2E_DeliveryWebHooks200(t *testing.T) { }, } created, err := mgr.UpsertStream(ctx, "stream1", es1) - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, created) // Connect our websocket and start it err = wsc.Connect() - assert.NoError(t, err) + require.NoError(t, err) err = wsc.Send(ctx, []byte(`{"type":"start","stream":"stream1"}`)) - assert.NoError(t, err) + require.NoError(t, err) // Check we ran the loop just once, and from the empty string for the checkpoint (as there was no InitialSequenceID) <-got100 @@ -435,7 +436,7 @@ func TestE2E_DeliveryWebHooks500Retry(t *testing.T) { close(ts.started) // start delivery immediately - will block as no WS connected mgr, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), p, wss, ts) - assert.NoError(t, err) + require.NoError(t, err) gotFiveTimes := make(chan struct{}) var received int64 @@ -446,7 +447,7 @@ func TestE2E_DeliveryWebHooks500Retry(t *testing.T) { var batch EventBatch[testData] err := json.NewDecoder(req.Body).Decode(&batch) - assert.NoError(t, err) + require.NoError(t, err) // We should get the same batch redelivered expectedNumber := 1 @@ -482,14 +483,14 @@ func TestE2E_DeliveryWebHooks500Retry(t *testing.T) { }, } created, err := mgr.UpsertStream(ctx, "stream1", es1) - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, created) // Connect our websocket and start it err = wsc.Connect() - assert.NoError(t, err) + require.NoError(t, err) err = wsc.Send(ctx, []byte(`{"type":"start","stream":"stream1"}`)) - assert.NoError(t, err) + require.NoError(t, err) // Check we ran the loop just once, and from the empty string for the checkpoint (as there was no InitialSequenceID) <-gotFiveTimes @@ -507,7 +508,7 @@ func TestE2E_CRUDLifecycle(t *testing.T) { } mgr, err := NewEventStreamManager[*GenericEventStream, testData](ctx, GenerateConfig[*GenericEventStream, testData](ctx), p, wss, ts) - assert.NoError(t, err) + require.NoError(t, err) // Create first event stream started es1 := &GenericEventStream{ @@ -517,7 +518,7 @@ func TestE2E_CRUDLifecycle(t *testing.T) { }, } created, err := mgr.UpsertStream(ctx, "stream1", es1) - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, created) // Create second event stream stopped @@ -529,12 +530,12 @@ func TestE2E_CRUDLifecycle(t *testing.T) { }, } created, err = mgr.UpsertStream(ctx, "stream2", es2) - assert.NoError(t, err) + require.NoError(t, err) assert.True(t, created) // Find the second one by topic filter esList, _, err := mgr.ListStreams(ctx, GenericEventStreamFilters.NewFilter(ctx).Eq("topicfilter", "topic2")) - assert.NoError(t, err) + require.NoError(t, err) assert.Len(t, esList, 1) assert.Equal(t, "stream2", *esList[0].Name) assert.Equal(t, "topic2", *esList[0].TopicFilter) @@ -543,38 +544,38 @@ func TestE2E_CRUDLifecycle(t *testing.T) { // Get the first by ID es1c, err := mgr.GetStreamByID(ctx, es1.GetID(), dbsql.FailIfNotFound) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, "stream1", *es1c.Name) assert.Equal(t, EventStreamStatusStarted, *es1c.Status) // Rename second event stream es2.Name = ptrTo("stream2a") created, err = mgr.UpsertStream(ctx, "" /* ID is in es2 object */, es2) - assert.NoError(t, err) + require.NoError(t, err) assert.False(t, created) // Start and re-stop, then delete the second event stream err = mgr.StartStream(ctx, es2.GetID()) - assert.NoError(t, err) + require.NoError(t, err) es2c, err := mgr.GetStreamByID(ctx, es2.GetID(), dbsql.FailIfNotFound) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, EventStreamStatusStarted, *es2c.Status) assert.Equal(t, "stream2a", *es2c.Name) err = mgr.StopStream(ctx, es2.GetID()) - assert.NoError(t, err) + require.NoError(t, err) es2c, err = mgr.GetStreamByID(ctx, es2.GetID(), dbsql.FailIfNotFound) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, EventStreamStatusStopped, *es2c.Status) err = mgr.DeleteStream(ctx, es2.GetID()) - assert.NoError(t, err) + require.NoError(t, err) // Delete the first stream (which is running still) err = mgr.DeleteStream(ctx, *es1.Name) - assert.NoError(t, err) + require.NoError(t, err) // Check no streams left esList, _, err = mgr.ListStreams(ctx, GenericEventStreamFilters.NewFilter(ctx).And()) - assert.NoError(t, err) + require.NoError(t, err) assert.Empty(t, esList) } @@ -583,20 +584,20 @@ func wsReceiveAck(ctx context.Context, t *testing.T, wsc wsclient.WSClient, cb f data := <-wsc.Receive() var batch EventBatch[testData] err := json.Unmarshal(data, &batch) - assert.NoError(t, err) + require.NoError(t, err) cb(&batch) err = wsc.Send(ctx, []byte(fmt.Sprintf(`{"type":"ack","stream":"stream1","batchNumber":%d}`, batch.BatchNumber))) - assert.NoError(t, err) + require.NoError(t, err) } func wsReceiveNack(ctx context.Context, t *testing.T, wsc wsclient.WSClient, cb func(batch *EventBatch[testData])) { data := <-wsc.Receive() var batch EventBatch[testData] err := json.Unmarshal(data, &batch) - assert.NoError(t, err) + require.NoError(t, err) cb(&batch) err = wsc.Send(ctx, []byte(fmt.Sprintf(`{"type":"nack","stream":"stream1","batchNumber":%d}`, batch.BatchNumber))) - assert.NoError(t, err) + require.NoError(t, err) } func setupE2ETest(t *testing.T, extraSetup ...func()) (context.Context, Persistence[*GenericEventStream], wsserver.Protocol, wsclient.WSClient, func()) { @@ -625,7 +626,7 @@ func setupE2ETest(t *testing.T, extraSetup ...func()) (context.Context, Persiste } db, err := dbsql.NewSQLiteProvider(ctx, dbConf) - assert.NoError(t, err) + require.NoError(t, err) p := NewGenericEventStreamPersistence(db, dbsql.UUIDValidator) p.EventStreams().Validate()