From 346150761e837b2bfb3647dcdb5ec67b029d7220 Mon Sep 17 00:00:00 2001 From: Brandur Date: Tue, 24 Mar 2026 20:36:54 -0700 Subject: [PATCH 1/3] Try to start the queue maintainer multiple times with backoff This one's aimed at addressing #1161. `HookPeriodicJobsStart.Start` may return an error that causes the queue maintainer not to start, and there are a few other intermittent errors that may cause it not to start (say in the case of a transient DB problem). If this were to occur, the course of action currently is for the client to to just spit an error to logs and not try any additional remediation, which could have the effect of leaving the queue maintainer offline for extended periods. Here, try to address this broadly by allowing the queue maintainer a few attempts at starting, and with our standard exponential backoff (1s, 2s, 4s, 8s, etc.). In case a queue maintainer fails to start completely, the client requests resignation and hands leadership off to another client to see if it can start successfully. I think this is an okay compromise because in case of a non-transient fundamental error (say `HookPeriodicJobsStart.Start` always returns an error), we don't go into a hot loop that starts hammering things. Instead, we'll get a reasonably responsible slow back off that gives things a chance to recover, and which should be very visible in logs. Fixes #1161. --- CHANGELOG.md | 1 + client.go | 50 +++++++++++++++-- client_test.go | 25 +++++++++ internal/maintenance/periodic_job_enqueuer.go | 53 +++++++++++-------- internal/maintenance/queue_maintainer.go | 2 + 5 files changed, 106 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bbd3856b..0c744779 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Upon a client gaining leadership, its queue maintainer is given more than one opportunity to start. [PR #1184](https://github.com/riverqueue/river/pull/1184). - Fix in `Client.Start` where previously it was possible for a River client that only partially started before erroring to not try to start on subsequent `Start` invocations. [PR #1187](https://github.com/riverqueue/river/pull/1187). ## [0.32.0] - 2026-03-23 diff --git a/client.go b/client.go index c07c70d4..9799294b 100644 --- a/client.go +++ b/client.go @@ -33,6 +33,7 @@ import ( "github.com/riverqueue/river/rivershared/testsignal" "github.com/riverqueue/river/rivershared/util/dbutil" "github.com/riverqueue/river/rivershared/util/maputil" + "github.com/riverqueue/river/rivershared/util/serviceutil" "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivershared/util/testutil" "github.com/riverqueue/river/rivershared/util/valutil" @@ -619,7 +620,9 @@ type Client[TTx any] struct { // Test-only signals. type clientTestSignals struct { - electedLeader testsignal.TestSignal[struct{}] // notifies when elected leader + electedLeader testsignal.TestSignal[struct{}] // notifies when elected leader + queueMaintainerStartError testsignal.TestSignal[error] // notifies on each failed queue maintainer start attempt + queueMaintainerStartRetriesExhausted testsignal.TestSignal[struct{}] // notifies when leader resignation is requested after all queue maintainer start retries have been exhausted jobCleaner *maintenance.JobCleanerTestSignals jobRescuer *maintenance.JobRescuerTestSignals @@ -631,6 +634,8 @@ type clientTestSignals struct { func (ts *clientTestSignals) Init(tb testutil.TestingTB) { ts.electedLeader.Init(tb) + ts.queueMaintainerStartError.Init(tb) + ts.queueMaintainerStartRetriesExhausted.Init(tb) if ts.jobCleaner != nil { ts.jobCleaner.Init(tb) @@ -1290,9 +1295,9 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar // cancel the queue maintainer start, and overall run much faster. c.testSignals.electedLeader.Signal(struct{}{}) - if err := c.queueMaintainer.Start(ctx); err != nil { - c.baseService.Logger.ErrorContext(ctx, "Error starting queue maintainer", slog.String("err", err.Error())) - } + // Start the queue maintainer with a few retries and exponential + // backoff. In case of total failure, request resignation. + c.tryStartQueueMaintainer(ctx) default: c.queueMaintainer.Stop() @@ -1324,6 +1329,43 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar return nil } +// Tries to start the queue maintainer after gaining leadership. We allow some +// retries with exponential backoff in case of failure, and in case the queue +// maintainer can't be started, we request resignation to allow another client +// to try and take over. +func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context) { + const maxStartAttempts = 3 + + var lastErr error + for attempt := 1; attempt <= maxStartAttempts; attempt++ { + if lastErr = c.queueMaintainer.Start(ctx); lastErr == nil { + return + } + + c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Error starting queue maintainer", + slog.String("err", lastErr.Error()), slog.Int("attempt", attempt)) + + c.testSignals.queueMaintainerStartError.Signal(lastErr) + + // Stop the queue maintainer to fully reset its state (and any + // sub-services) before retrying. + c.queueMaintainer.Stop() + + if attempt < maxStartAttempts { + serviceutil.CancellableSleep(ctx, serviceutil.ExponentialBackoff(attempt, serviceutil.MaxAttemptsBeforeResetDefault)) + } + } + + c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Queue maintainer failed to start after all attempts, requesting leader resignation", + slog.String("err", lastErr.Error())) + + c.testSignals.queueMaintainerStartRetriesExhausted.Signal(struct{}{}) + + if err := c.clientNotifyBundle.RequestResign(ctx); err != nil { + c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Error requesting leader resignation", slog.String("err", err.Error())) + } +} + // Driver exposes the underlying driver used by the client. // // API is not stable. DO NOT USE. diff --git a/client_test.go b/client_test.go index cb5eed0f..5a446068 100644 --- a/client_test.go +++ b/client_test.go @@ -5145,6 +5145,31 @@ func Test_Client_Maintenance(t *testing.T) { require.True(t, svc.RemoveByID("new_periodic_job")) }) + t.Run("QueueMaintainerStartRetriesAndResigns", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + config.Hooks = []rivertype.Hook{ + HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { + return errors.New("hook start error") + }), + } + + client, _ := setup(t, config) + + startClient(ctx, t, client) + client.testSignals.electedLeader.WaitOrTimeout() + + // Wait for all 3 retry attempts to fail. + for range 3 { + err := client.testSignals.queueMaintainerStartError.WaitOrTimeout() + require.EqualError(t, err, "hook start error") + } + + // After all retries exhausted, the client should request resignation. + client.testSignals.queueMaintainerStartRetriesExhausted.WaitOrTimeout() + }) + t.Run("PeriodicJobEnqueuerWithInsertOpts", func(t *testing.T) { t.Parallel() diff --git a/internal/maintenance/periodic_job_enqueuer.go b/internal/maintenance/periodic_job_enqueuer.go index 8ea0c042..54e8fe53 100644 --- a/internal/maintenance/periodic_job_enqueuer.go +++ b/internal/maintenance/periodic_job_enqueuer.go @@ -318,31 +318,42 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { s.StaggerStart(ctx) - initialPeriodicJobs, err := s.Config.Pilot.PeriodicJobGetAll(ctx, s.exec, &riverpilot.PeriodicJobGetAllParams{ - Schema: s.Config.Schema, - }) - if err != nil { - return err - } + var ( + initialPeriodicJobs []*riverpilot.PeriodicJob + subServices []startstop.Service + ) + if err := func() error { + var err error + initialPeriodicJobs, err = s.Config.Pilot.PeriodicJobGetAll(ctx, s.exec, &riverpilot.PeriodicJobGetAllParams{ + Schema: s.Config.Schema, + }) + if err != nil { + return err + } - for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindPeriodicJobsStart) { - if err := hook.(rivertype.HookPeriodicJobsStart).Start(ctx, &rivertype.HookPeriodicJobsStartParams{ //nolint:forcetypeassert - DurableJobs: sliceutil.Map(initialPeriodicJobs, func(job *riverpilot.PeriodicJob) *rivertype.DurablePeriodicJob { - return (*rivertype.DurablePeriodicJob)(job) - }), - }); err != nil { + for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindPeriodicJobsStart) { + if err := hook.(rivertype.HookPeriodicJobsStart).Start(ctx, &rivertype.HookPeriodicJobsStartParams{ //nolint:forcetypeassert + DurableJobs: sliceutil.Map(initialPeriodicJobs, func(job *riverpilot.PeriodicJob) *rivertype.DurablePeriodicJob { + return (*rivertype.DurablePeriodicJob)(job) + }), + }); err != nil { + return err + } + } + + subServices = []startstop.Service{ + startstop.StartStopFunc(s.periodicJobKeepAliveAndReapPeriodically), + } + stopServicesOnError := func() { + startstop.StopAllParallel(subServices...) + } + if err := startstop.StartAll(ctx, subServices...); err != nil { + stopServicesOnError() return err } - } - subServices := []startstop.Service{ - startstop.StartStopFunc(s.periodicJobKeepAliveAndReapPeriodically), - } - stopServicesOnError := func() { - startstop.StopAllParallel(subServices...) - } - if err := startstop.StartAll(ctx, subServices...); err != nil { - stopServicesOnError() + return nil + }(); err != nil { stopped() return err } diff --git a/internal/maintenance/queue_maintainer.go b/internal/maintenance/queue_maintainer.go index 5c780ce0..142bf283 100644 --- a/internal/maintenance/queue_maintainer.go +++ b/internal/maintenance/queue_maintainer.go @@ -50,6 +50,8 @@ func (m *QueueMaintainer) Start(ctx context.Context) error { for _, service := range m.servicesByName { if err := service.Start(ctx); err != nil { + startstop.StopAllParallel(maputil.Values(m.servicesByName)...) + stopped() return err } } From c29f4c153fc8b0becadf7af4e37fa6ec40d574f1 Mon Sep 17 00:00:00 2001 From: Brandur Date: Thu, 26 Mar 2026 15:15:56 -0700 Subject: [PATCH 2/3] Make queue maintainer start cancellable on leadership change --- client.go | 81 ++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 60 insertions(+), 21 deletions(-) diff --git a/client.go b/client.go index 9799294b..6b300dfe 100644 --- a/client.go +++ b/client.go @@ -1284,26 +1284,6 @@ func (c *Client[TTx]) logStatsLoop(ctx context.Context, shouldStart bool, starte } func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStart bool, started, stopped func()) error { - handleLeadershipChange := func(ctx context.Context, notification *leadership.Notification) { - c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Election change received", - slog.String("client_id", c.config.ID), slog.Bool("is_leader", notification.IsLeader)) - - switch { - case notification.IsLeader: - // Starting the queue maintainer can take a little time so send to - // this test signal _first_ so tests waiting on it can finish, - // cancel the queue maintainer start, and overall run much faster. - c.testSignals.electedLeader.Signal(struct{}{}) - - // Start the queue maintainer with a few retries and exponential - // backoff. In case of total failure, request resignation. - c.tryStartQueueMaintainer(ctx) - - default: - c.queueMaintainer.Stop() - } - } - if !shouldStart { return nil } @@ -1315,13 +1295,48 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar sub := c.elector.Listen() defer sub.Unlisten() + // Cancel function for an in-progress tryStartQueueMaintainer. If + // leadership is lost while the start process is still retrying, this + // is used to abort it promptly instead of waiting for retries to + // finish. + var cancelQueueMaintainerStart context.CancelCauseFunc = func(_ error) {} + for { select { case <-ctx.Done(): + cancelQueueMaintainerStart(context.Cause(ctx)) return case notification := <-sub.C(): - handleLeadershipChange(ctx, notification) + c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Election change received", + slog.String("client_id", c.config.ID), slog.Bool("is_leader", notification.IsLeader)) + + switch { + case notification.IsLeader: + // Starting the queue maintainer can take a little time so + // send to this test signal first so tests waiting on it + // can finish, cancel the queue maintainer start, and + // overall run much faster. + c.testSignals.electedLeader.Signal(struct{}{}) + + // Start the queue maintainer with a few retries and + // exponential backoff in a separate goroutine so the + // leadership change loop remains responsive to new + // notifications. startCtx is used for cancellation in case + // leadership is lost while retries are in progress. + var startCtx context.Context + startCtx, cancelQueueMaintainerStart = context.WithCancelCause(ctx) + go c.tryStartQueueMaintainer(startCtx) + + default: + // Cancel any in-progress start attempts before stopping. We + // sent a startstop.ErrStop to make sure services like + // Reindexer run any specific cleanup code for stops. + cancelQueueMaintainerStart(startstop.ErrStop) + cancelQueueMaintainerStart = func(_ error) {} + + c.queueMaintainer.Stop() + } } } }() @@ -1336,8 +1351,20 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context) { const maxStartAttempts = 3 + ctxCancelled := func() bool { + if ctx.Err() != nil { + c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Queue maintainer start cancelled") + return true + } + return false + } + var lastErr error for attempt := 1; attempt <= maxStartAttempts; attempt++ { + if ctxCancelled() { + return + } + if lastErr = c.queueMaintainer.Start(ctx); lastErr == nil { return } @@ -1347,6 +1374,14 @@ func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context) { c.testSignals.queueMaintainerStartError.Signal(lastErr) + // If Start blocked long enough for this context to be cancelled + // (e.g. leadership was lost), bail out immediately. A newer + // leadership term may already have started the maintainer, and + // calling Stop here would tear it down. + if ctxCancelled() { + return + } + // Stop the queue maintainer to fully reset its state (and any // sub-services) before retrying. c.queueMaintainer.Stop() @@ -1356,6 +1391,10 @@ func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context) { } } + if ctxCancelled() { + return + } + c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Queue maintainer failed to start after all attempts, requesting leader resignation", slog.String("err", lastErr.Error())) From c12ae8bb5591db926c26aa9811dbd82a9712ee30 Mon Sep 17 00:00:00 2001 From: Brandur Date: Fri, 27 Mar 2026 20:53:22 -0700 Subject: [PATCH 3/3] Try an epoch-based solution --- client.go | 51 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/client.go b/client.go index 6b300dfe..8e74efb9 100644 --- a/client.go +++ b/client.go @@ -613,6 +613,14 @@ type Client[TTx any] struct { subscriptionManager *subscriptionManager testSignals clientTestSignals + // queueMaintainerEpoch is incremented each time leadership is gained, + // giving each tryStartQueueMaintainer goroutine a term number. + // queueMaintainerMu serializes epoch checks with Stop calls so that a + // stale goroutine from an older term cannot tear down a maintainer + // started by a newer term. + queueMaintainerEpoch int64 + queueMaintainerMu sync.Mutex + // workCancel cancels the context used for all work goroutines. Normal Stop // does not cancel that context. workCancel context.CancelCauseFunc @@ -1324,9 +1332,18 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar // leadership change loop remains responsive to new // notifications. startCtx is used for cancellation in case // leadership is lost while retries are in progress. + // + // The epoch is incremented under the mutex so that + // stale tryStartQueueMaintainer goroutines from a + // previous term cannot call Stop after a new term has + // begun. var startCtx context.Context startCtx, cancelQueueMaintainerStart = context.WithCancelCause(ctx) - go c.tryStartQueueMaintainer(startCtx) + c.queueMaintainerMu.Lock() + c.queueMaintainerEpoch++ + epoch := c.queueMaintainerEpoch + c.queueMaintainerMu.Unlock() + go c.tryStartQueueMaintainer(startCtx, epoch) default: // Cancel any in-progress start attempts before stopping. We @@ -1348,7 +1365,7 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar // retries with exponential backoff in case of failure, and in case the queue // maintainer can't be started, we request resignation to allow another client // to try and take over. -func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context) { +func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context, epoch int64) { const maxStartAttempts = 3 ctxCancelled := func() bool { @@ -1359,6 +1376,23 @@ func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context) { return false } + // stopIfCurrentEpoch atomically checks whether this goroutine's epoch + // is still the active one and calls Stop only if it is. The mutex + // serializes this with the epoch increment in handleLeadershipChangeLoop, + // preventing a stale goroutine from stopping a maintainer that was + // started by a newer leadership term. + stopIfCurrentEpoch := func() bool { + c.queueMaintainerMu.Lock() + defer c.queueMaintainerMu.Unlock() + + if c.queueMaintainerEpoch != epoch { + return false + } + + c.queueMaintainer.Stop() + return true + } + var lastErr error for attempt := 1; attempt <= maxStartAttempts; attempt++ { if ctxCancelled() { @@ -1374,18 +1408,13 @@ func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context) { c.testSignals.queueMaintainerStartError.Signal(lastErr) - // If Start blocked long enough for this context to be cancelled - // (e.g. leadership was lost), bail out immediately. A newer - // leadership term may already have started the maintainer, and - // calling Stop here would tear it down. - if ctxCancelled() { + // Stop the queue maintainer to fully reset its state (and any + // sub-services) before retrying. The epoch check ensures a stale + // goroutine cannot stop a maintainer from a newer leadership term. + if !stopIfCurrentEpoch() { return } - // Stop the queue maintainer to fully reset its state (and any - // sub-services) before retrying. - c.queueMaintainer.Stop() - if attempt < maxStartAttempts { serviceutil.CancellableSleep(ctx, serviceutil.ExponentialBackoff(attempt, serviceutil.MaxAttemptsBeforeResetDefault)) }