diff --git a/CHANGELOG.md b/CHANGELOG.md index bbd3856b..0c744779 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Upon a client gaining leadership, its queue maintainer is given more than one opportunity to start. [PR #1184](https://github.com/riverqueue/river/pull/1184). - Fix in `Client.Start` where previously it was possible for a River client that only partially started before erroring to not try to start on subsequent `Start` invocations. [PR #1187](https://github.com/riverqueue/river/pull/1187). ## [0.32.0] - 2026-03-23 diff --git a/client.go b/client.go index c07c70d4..6b300dfe 100644 --- a/client.go +++ b/client.go @@ -33,6 +33,7 @@ import ( "github.com/riverqueue/river/rivershared/testsignal" "github.com/riverqueue/river/rivershared/util/dbutil" "github.com/riverqueue/river/rivershared/util/maputil" + "github.com/riverqueue/river/rivershared/util/serviceutil" "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivershared/util/testutil" "github.com/riverqueue/river/rivershared/util/valutil" @@ -619,7 +620,9 @@ type Client[TTx any] struct { // Test-only signals. type clientTestSignals struct { - electedLeader testsignal.TestSignal[struct{}] // notifies when elected leader + electedLeader testsignal.TestSignal[struct{}] // notifies when elected leader + queueMaintainerStartError testsignal.TestSignal[error] // notifies on each failed queue maintainer start attempt + queueMaintainerStartRetriesExhausted testsignal.TestSignal[struct{}] // notifies when leader resignation is requested after all queue maintainer start retries have been exhausted jobCleaner *maintenance.JobCleanerTestSignals jobRescuer *maintenance.JobRescuerTestSignals @@ -631,6 +634,8 @@ type clientTestSignals struct { func (ts *clientTestSignals) Init(tb testutil.TestingTB) { ts.electedLeader.Init(tb) + ts.queueMaintainerStartError.Init(tb) + ts.queueMaintainerStartRetriesExhausted.Init(tb) if ts.jobCleaner != nil { ts.jobCleaner.Init(tb) @@ -1279,26 +1284,6 @@ func (c *Client[TTx]) logStatsLoop(ctx context.Context, shouldStart bool, starte } func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStart bool, started, stopped func()) error { - handleLeadershipChange := func(ctx context.Context, notification *leadership.Notification) { - c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Election change received", - slog.String("client_id", c.config.ID), slog.Bool("is_leader", notification.IsLeader)) - - switch { - case notification.IsLeader: - // Starting the queue maintainer can take a little time so send to - // this test signal _first_ so tests waiting on it can finish, - // cancel the queue maintainer start, and overall run much faster. - c.testSignals.electedLeader.Signal(struct{}{}) - - if err := c.queueMaintainer.Start(ctx); err != nil { - c.baseService.Logger.ErrorContext(ctx, "Error starting queue maintainer", slog.String("err", err.Error())) - } - - default: - c.queueMaintainer.Stop() - } - } - if !shouldStart { return nil } @@ -1310,13 +1295,48 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar sub := c.elector.Listen() defer sub.Unlisten() + // Cancel function for an in-progress tryStartQueueMaintainer. If + // leadership is lost while the start process is still retrying, this + // is used to abort it promptly instead of waiting for retries to + // finish. + var cancelQueueMaintainerStart context.CancelCauseFunc = func(_ error) {} + for { select { case <-ctx.Done(): + cancelQueueMaintainerStart(context.Cause(ctx)) return case notification := <-sub.C(): - handleLeadershipChange(ctx, notification) + c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Election change received", + slog.String("client_id", c.config.ID), slog.Bool("is_leader", notification.IsLeader)) + + switch { + case notification.IsLeader: + // Starting the queue maintainer can take a little time so + // send to this test signal first so tests waiting on it + // can finish, cancel the queue maintainer start, and + // overall run much faster. + c.testSignals.electedLeader.Signal(struct{}{}) + + // Start the queue maintainer with a few retries and + // exponential backoff in a separate goroutine so the + // leadership change loop remains responsive to new + // notifications. startCtx is used for cancellation in case + // leadership is lost while retries are in progress. + var startCtx context.Context + startCtx, cancelQueueMaintainerStart = context.WithCancelCause(ctx) + go c.tryStartQueueMaintainer(startCtx) + + default: + // Cancel any in-progress start attempts before stopping. We + // sent a startstop.ErrStop to make sure services like + // Reindexer run any specific cleanup code for stops. + cancelQueueMaintainerStart(startstop.ErrStop) + cancelQueueMaintainerStart = func(_ error) {} + + c.queueMaintainer.Stop() + } } } }() @@ -1324,6 +1344,67 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar return nil } +// Tries to start the queue maintainer after gaining leadership. We allow some +// retries with exponential backoff in case of failure, and in case the queue +// maintainer can't be started, we request resignation to allow another client +// to try and take over. +func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context) { + const maxStartAttempts = 3 + + ctxCancelled := func() bool { + if ctx.Err() != nil { + c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Queue maintainer start cancelled") + return true + } + return false + } + + var lastErr error + for attempt := 1; attempt <= maxStartAttempts; attempt++ { + if ctxCancelled() { + return + } + + if lastErr = c.queueMaintainer.Start(ctx); lastErr == nil { + return + } + + c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Error starting queue maintainer", + slog.String("err", lastErr.Error()), slog.Int("attempt", attempt)) + + c.testSignals.queueMaintainerStartError.Signal(lastErr) + + // If Start blocked long enough for this context to be cancelled + // (e.g. leadership was lost), bail out immediately. A newer + // leadership term may already have started the maintainer, and + // calling Stop here would tear it down. + if ctxCancelled() { + return + } + + // Stop the queue maintainer to fully reset its state (and any + // sub-services) before retrying. + c.queueMaintainer.Stop() + + if attempt < maxStartAttempts { + serviceutil.CancellableSleep(ctx, serviceutil.ExponentialBackoff(attempt, serviceutil.MaxAttemptsBeforeResetDefault)) + } + } + + if ctxCancelled() { + return + } + + c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Queue maintainer failed to start after all attempts, requesting leader resignation", + slog.String("err", lastErr.Error())) + + c.testSignals.queueMaintainerStartRetriesExhausted.Signal(struct{}{}) + + if err := c.clientNotifyBundle.RequestResign(ctx); err != nil { + c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Error requesting leader resignation", slog.String("err", err.Error())) + } +} + // Driver exposes the underlying driver used by the client. // // API is not stable. DO NOT USE. diff --git a/client_test.go b/client_test.go index cb5eed0f..5a446068 100644 --- a/client_test.go +++ b/client_test.go @@ -5145,6 +5145,31 @@ func Test_Client_Maintenance(t *testing.T) { require.True(t, svc.RemoveByID("new_periodic_job")) }) + t.Run("QueueMaintainerStartRetriesAndResigns", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + config.Hooks = []rivertype.Hook{ + HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { + return errors.New("hook start error") + }), + } + + client, _ := setup(t, config) + + startClient(ctx, t, client) + client.testSignals.electedLeader.WaitOrTimeout() + + // Wait for all 3 retry attempts to fail. + for range 3 { + err := client.testSignals.queueMaintainerStartError.WaitOrTimeout() + require.EqualError(t, err, "hook start error") + } + + // After all retries exhausted, the client should request resignation. + client.testSignals.queueMaintainerStartRetriesExhausted.WaitOrTimeout() + }) + t.Run("PeriodicJobEnqueuerWithInsertOpts", func(t *testing.T) { t.Parallel() diff --git a/internal/maintenance/periodic_job_enqueuer.go b/internal/maintenance/periodic_job_enqueuer.go index 8ea0c042..54e8fe53 100644 --- a/internal/maintenance/periodic_job_enqueuer.go +++ b/internal/maintenance/periodic_job_enqueuer.go @@ -318,31 +318,42 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error { s.StaggerStart(ctx) - initialPeriodicJobs, err := s.Config.Pilot.PeriodicJobGetAll(ctx, s.exec, &riverpilot.PeriodicJobGetAllParams{ - Schema: s.Config.Schema, - }) - if err != nil { - return err - } + var ( + initialPeriodicJobs []*riverpilot.PeriodicJob + subServices []startstop.Service + ) + if err := func() error { + var err error + initialPeriodicJobs, err = s.Config.Pilot.PeriodicJobGetAll(ctx, s.exec, &riverpilot.PeriodicJobGetAllParams{ + Schema: s.Config.Schema, + }) + if err != nil { + return err + } - for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindPeriodicJobsStart) { - if err := hook.(rivertype.HookPeriodicJobsStart).Start(ctx, &rivertype.HookPeriodicJobsStartParams{ //nolint:forcetypeassert - DurableJobs: sliceutil.Map(initialPeriodicJobs, func(job *riverpilot.PeriodicJob) *rivertype.DurablePeriodicJob { - return (*rivertype.DurablePeriodicJob)(job) - }), - }); err != nil { + for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindPeriodicJobsStart) { + if err := hook.(rivertype.HookPeriodicJobsStart).Start(ctx, &rivertype.HookPeriodicJobsStartParams{ //nolint:forcetypeassert + DurableJobs: sliceutil.Map(initialPeriodicJobs, func(job *riverpilot.PeriodicJob) *rivertype.DurablePeriodicJob { + return (*rivertype.DurablePeriodicJob)(job) + }), + }); err != nil { + return err + } + } + + subServices = []startstop.Service{ + startstop.StartStopFunc(s.periodicJobKeepAliveAndReapPeriodically), + } + stopServicesOnError := func() { + startstop.StopAllParallel(subServices...) + } + if err := startstop.StartAll(ctx, subServices...); err != nil { + stopServicesOnError() return err } - } - subServices := []startstop.Service{ - startstop.StartStopFunc(s.periodicJobKeepAliveAndReapPeriodically), - } - stopServicesOnError := func() { - startstop.StopAllParallel(subServices...) - } - if err := startstop.StartAll(ctx, subServices...); err != nil { - stopServicesOnError() + return nil + }(); err != nil { stopped() return err } diff --git a/internal/maintenance/queue_maintainer.go b/internal/maintenance/queue_maintainer.go index 5c780ce0..142bf283 100644 --- a/internal/maintenance/queue_maintainer.go +++ b/internal/maintenance/queue_maintainer.go @@ -50,6 +50,8 @@ func (m *QueueMaintainer) Start(ctx context.Context) error { for _, service := range m.servicesByName { if err := service.Start(ctx); err != nil { + startstop.StopAllParallel(maputil.Values(m.servicesByName)...) + stopped() return err } }