From a6b9abe2701d46964ab2db713c433436fbb6ff66 Mon Sep 17 00:00:00 2001 From: Brandur Date: Wed, 25 Mar 2026 22:50:44 -0700 Subject: [PATCH] Startstop: Start successfully again even in the event of a partial start This one's presented as an alternative to #1136. Basically, a current problem with the start/stop infrastructure is that in the event of a partial start where a service returns from its start function, but without `Stop` having been called on, we can get into a situation where the start/stop's `isRunning` flag is still set to true, and when the start/stop is started again, it'll fall through thinking it's already running. Here, we check for this condition on subsequent starts. If the `stopped` channel is non-nil but already closed, we reset all internal state including `isRunning` so the service can start again. To prove this works, I pull in the test case added in #1136 verbatim, and also add one more specific test in `start_stop_test.go` for a more precise version. --- CHANGELOG.md | 4 ++++ client_test.go | 25 +++++++++++++++++++++++ rivershared/startstop/start_stop.go | 19 ++++++++++++++++- rivershared/startstop/start_stop_test.go | 26 ++++++++++++++++++++++++ 4 files changed, 73 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index da183e19..bbd3856b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fix in `Client.Start` where previously it was possible for a River client that only partially started before erroring to not try to start on subsequent `Start` invocations. [PR #1187](https://github.com/riverqueue/river/pull/1187). + ## [0.32.0] - 2026-03-23 ### Added diff --git a/client_test.go b/client_test.go index 973a8363..cb5eed0f 100644 --- a/client_test.go +++ b/client_test.go @@ -7133,6 +7133,31 @@ func Test_Client_Start_Error(t *testing.T) { require.ErrorAs(t, err, &pgErr) require.Equal(t, pgerrcode.InvalidCatalogName, pgErr.Code) }) + + t.Run("CanRestartAfterFailure", func(t *testing.T) { + t.Parallel() + + // Use a non-existent database to trigger a startup failure + dbConfig := riversharedtest.DBPool(ctx, t).Config().Copy() + dbConfig.ConnConfig.Database = "does-not-exist-and-dont-create-it" + + dbPool, err := pgxpool.NewWithConfig(ctx, dbConfig) + require.NoError(t, err) + + config := newTestConfig(t, "") + + client := newTestClient(t, dbPool, config) + + // First Start() should fail with a database error + err = client.Start(ctx) + require.Error(t, err, "first Start() should fail with database error") + + // Second Start() should also fail with an error, NOT return nil. + // This verifies that the client's internal state was properly reset + // after the first failure, allowing it to attempt startup again. + err = client.Start(ctx) + require.Error(t, err, "second Start() should return an error, not nil; client state should be reset after failed start") + }) } func Test_NewClient_BaseServiceName(t *testing.T) { diff --git a/rivershared/startstop/start_stop.go b/rivershared/startstop/start_stop.go index b03cbf24..1ca9b51e 100644 --- a/rivershared/startstop/start_stop.go +++ b/rivershared/startstop/start_stop.go @@ -109,7 +109,24 @@ func (s *BaseStartStop) StartInit(ctx context.Context) (context.Context, bool, f defer s.mu.Unlock() if s.isRunning { - return ctx, false, nil, nil + // If stopped has already been closed (e.g. a previous Start failed and + // called stopped()), reset state so the service can start again. + // + // Notably, for this branch to be taken, Stop will not have been called. + // If it was, isRunning will have been set to false via finalizeStop. + if s.stopped != nil { + select { + case <-s.stopped: + s.isRunning = false + s.started = nil + s.stopped = nil + default: + } + } + + if s.isRunning { + return ctx, false, nil, nil + } } s.isRunning = true diff --git a/rivershared/startstop/start_stop_test.go b/rivershared/startstop/start_stop_test.go index 3b42dccc..2d733b72 100644 --- a/rivershared/startstop/start_stop_test.go +++ b/rivershared/startstop/start_stop_test.go @@ -253,6 +253,32 @@ func TestSampleService(t *testing.T) { riversharedtest.WaitOrTimeout(t, service.Started()) // start channel also closed on erroneous start riversharedtest.WaitOrTimeout(t, service.Stopped()) }) + + t.Run("StartErrorThenSuccessfulRestart", func(t *testing.T) { + t.Parallel() + + service, _ := setup(t) + service.startErr = errors.New("error on start") + + // First start fails with our simulated error. + require.ErrorIs(t, service.Start(ctx), service.startErr) + + riversharedtest.WaitOrTimeout(t, service.Started()) + riversharedtest.WaitOrTimeout(t, service.Stopped()) + + // Clear error so the next start succeeds. + service.startErr = nil + + // Second start should succeed despite the prior failure. Without the + // reset-on-failed-start logic in StartInit, isRunning would still be + // true and StartInit would return shouldStart=false, causing Start to + // return nil without actually starting the service. + require.NoError(t, service.Start(ctx)) + t.Cleanup(service.Stop) + + riversharedtest.WaitOrTimeout(t, service.Started()) + require.True(t, service.state) + }) } // A service with the more unusual case.