Skip to content

Commit a4e03c6

Browse files
committed
Try to start the queue maintainer multiple times with backoff
This one's aimed at addressing #1161. `HookPeriodicJobsStart.Start` may return an error that causes the queue maintainer not to start, and there are a few other intermittent errors that may cause it not to start (say in the case of a transient DB problem). If this were to occur, the course of action currently is for the client to to just spit an error to logs and not try any additional remediation, which could have the effect of leaving the queue maintainer offline for extended periods. Here, try to address this broadly by allowing the queue maintainer a few attempts at starting, and with our standard exponential backoff (1s, 2s, 4s, 8s, etc.). In case a queue maintainer fails to start completely, the client requests resignation and hands leadership off to another client to see if it can start successfully. I think this is an okay compromise because in case of a non-transient fundamental error (say `HookPeriodicJobsStart.Start` always returns an error), we don't go into a hot loop that starts hammering things. Instead, we'll get a reasonably responsible slow back off that gives things a chance to recover, and which should be very visible in logs. Fixes #1161.
1 parent bcbcb6d commit a4e03c6

File tree

5 files changed

+109
-25
lines changed

5 files changed

+109
-25
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Fixed
11+
12+
- Upon a client gaining leadership, its queue maintainer is given more than one opportunity to start. [PR #1184](https://github.com/riverqueue/river/pull/1184).
13+
1014
## [0.32.0] - 2026-03-23
1115

1216
### Added

client.go

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/riverqueue/river/rivershared/testsignal"
3434
"github.com/riverqueue/river/rivershared/util/dbutil"
3535
"github.com/riverqueue/river/rivershared/util/maputil"
36+
"github.com/riverqueue/river/rivershared/util/serviceutil"
3637
"github.com/riverqueue/river/rivershared/util/sliceutil"
3738
"github.com/riverqueue/river/rivershared/util/testutil"
3839
"github.com/riverqueue/river/rivershared/util/valutil"
@@ -619,7 +620,9 @@ type Client[TTx any] struct {
619620

620621
// Test-only signals.
621622
type clientTestSignals struct {
622-
electedLeader testsignal.TestSignal[struct{}] // notifies when elected leader
623+
electedLeader testsignal.TestSignal[struct{}] // notifies when elected leader
624+
queueMaintainerStartError testsignal.TestSignal[error] // notifies on each failed queue maintainer start attempt
625+
queueMaintainerStartRetriesExhausted testsignal.TestSignal[struct{}] // notifies when leader resignation is requested after all queue maintainer start retries have been exhausted
623626

624627
jobCleaner *maintenance.JobCleanerTestSignals
625628
jobRescuer *maintenance.JobRescuerTestSignals
@@ -631,6 +634,8 @@ type clientTestSignals struct {
631634

632635
func (ts *clientTestSignals) Init(tb testutil.TestingTB) {
633636
ts.electedLeader.Init(tb)
637+
ts.queueMaintainerStartError.Init(tb)
638+
ts.queueMaintainerStartRetriesExhausted.Init(tb)
634639

635640
if ts.jobCleaner != nil {
636641
ts.jobCleaner.Init(tb)
@@ -1290,9 +1295,9 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar
12901295
// cancel the queue maintainer start, and overall run much faster.
12911296
c.testSignals.electedLeader.Signal(struct{}{})
12921297

1293-
if err := c.queueMaintainer.Start(ctx); err != nil {
1294-
c.baseService.Logger.ErrorContext(ctx, "Error starting queue maintainer", slog.String("err", err.Error()))
1295-
}
1298+
// Start the queue maintainer with a few retries and exponential
1299+
// backoff. In case of total failure, request resignation.
1300+
c.tryStartQueueMaintainer(ctx)
12961301

12971302
default:
12981303
c.queueMaintainer.Stop()
@@ -1324,6 +1329,43 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar
13241329
return nil
13251330
}
13261331

1332+
// Tries to start the queue maintainer after gaining leadership. We allow some
1333+
// retries with exponential backoff in case of failure, and in case the queue
1334+
// maintainer can't be started, we request resignation to allow another client
1335+
// to try and take over.
1336+
func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context) {
1337+
const maxStartAttempts = 3
1338+
1339+
var lastErr error
1340+
for attempt := 1; attempt <= maxStartAttempts; attempt++ {
1341+
if lastErr = c.queueMaintainer.Start(ctx); lastErr == nil {
1342+
return
1343+
}
1344+
1345+
c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Error starting queue maintainer",
1346+
slog.String("err", lastErr.Error()), slog.Int("attempt", attempt))
1347+
1348+
c.testSignals.queueMaintainerStartError.Signal(lastErr)
1349+
1350+
// Stop the queue maintainer to fully reset its state (and any
1351+
// sub-services) before retrying.
1352+
c.queueMaintainer.Stop()
1353+
1354+
if attempt < maxStartAttempts {
1355+
serviceutil.CancellableSleep(ctx, serviceutil.ExponentialBackoff(attempt, serviceutil.MaxAttemptsBeforeResetDefault))
1356+
}
1357+
}
1358+
1359+
c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Queue maintainer failed to start after all attempts, requesting leader resignation",
1360+
slog.String("err", lastErr.Error()))
1361+
1362+
c.testSignals.queueMaintainerStartRetriesExhausted.Signal(struct{}{})
1363+
1364+
if err := c.clientNotifyBundle.RequestResign(ctx); err != nil {
1365+
c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Error requesting leader resignation", slog.String("err", err.Error()))
1366+
}
1367+
}
1368+
13271369
// Driver exposes the underlying driver used by the client.
13281370
//
13291371
// API is not stable. DO NOT USE.

client_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5145,6 +5145,31 @@ func Test_Client_Maintenance(t *testing.T) {
51455145
require.True(t, svc.RemoveByID("new_periodic_job"))
51465146
})
51475147

5148+
t.Run("QueueMaintainerStartRetriesAndResigns", func(t *testing.T) {
5149+
t.Parallel()
5150+
5151+
config := newTestConfig(t, "")
5152+
config.Hooks = []rivertype.Hook{
5153+
HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error {
5154+
return errors.New("hook start error")
5155+
}),
5156+
}
5157+
5158+
client, _ := setup(t, config)
5159+
5160+
startClient(ctx, t, client)
5161+
client.testSignals.electedLeader.WaitOrTimeout()
5162+
5163+
// Wait for all 3 retry attempts to fail.
5164+
for range 3 {
5165+
err := client.testSignals.queueMaintainerStartError.WaitOrTimeout()
5166+
require.EqualError(t, err, "hook start error")
5167+
}
5168+
5169+
// After all retries exhausted, the client should request resignation.
5170+
client.testSignals.queueMaintainerStartRetriesExhausted.WaitOrTimeout()
5171+
})
5172+
51485173
t.Run("PeriodicJobEnqueuerWithInsertOpts", func(t *testing.T) {
51495174
t.Parallel()
51505175

internal/maintenance/periodic_job_enqueuer.go

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -318,31 +318,42 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
318318

319319
s.StaggerStart(ctx)
320320

321-
initialPeriodicJobs, err := s.Config.Pilot.PeriodicJobGetAll(ctx, s.exec, &riverpilot.PeriodicJobGetAllParams{
322-
Schema: s.Config.Schema,
323-
})
324-
if err != nil {
325-
return err
326-
}
321+
var (
322+
initialPeriodicJobs []*riverpilot.PeriodicJob
323+
subServices []startstop.Service
324+
)
325+
if err := func() error {
326+
var err error
327+
initialPeriodicJobs, err = s.Config.Pilot.PeriodicJobGetAll(ctx, s.exec, &riverpilot.PeriodicJobGetAllParams{
328+
Schema: s.Config.Schema,
329+
})
330+
if err != nil {
331+
return err
332+
}
327333

328-
for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindPeriodicJobsStart) {
329-
if err := hook.(rivertype.HookPeriodicJobsStart).Start(ctx, &rivertype.HookPeriodicJobsStartParams{ //nolint:forcetypeassert
330-
DurableJobs: sliceutil.Map(initialPeriodicJobs, func(job *riverpilot.PeriodicJob) *rivertype.DurablePeriodicJob {
331-
return (*rivertype.DurablePeriodicJob)(job)
332-
}),
333-
}); err != nil {
334+
for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindPeriodicJobsStart) {
335+
if err := hook.(rivertype.HookPeriodicJobsStart).Start(ctx, &rivertype.HookPeriodicJobsStartParams{ //nolint:forcetypeassert
336+
DurableJobs: sliceutil.Map(initialPeriodicJobs, func(job *riverpilot.PeriodicJob) *rivertype.DurablePeriodicJob {
337+
return (*rivertype.DurablePeriodicJob)(job)
338+
}),
339+
}); err != nil {
340+
return err
341+
}
342+
}
343+
344+
subServices = []startstop.Service{
345+
startstop.StartStopFunc(s.periodicJobKeepAliveAndReapPeriodically),
346+
}
347+
stopServicesOnError := func() {
348+
startstop.StopAllParallel(subServices...)
349+
}
350+
if err := startstop.StartAll(ctx, subServices...); err != nil {
351+
stopServicesOnError()
334352
return err
335353
}
336-
}
337354

338-
subServices := []startstop.Service{
339-
startstop.StartStopFunc(s.periodicJobKeepAliveAndReapPeriodically),
340-
}
341-
stopServicesOnError := func() {
342-
startstop.StopAllParallel(subServices...)
343-
}
344-
if err := startstop.StartAll(ctx, subServices...); err != nil {
345-
stopServicesOnError()
355+
return nil
356+
}(); err != nil {
346357
stopped()
347358
return err
348359
}

internal/maintenance/queue_maintainer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ func (m *QueueMaintainer) Start(ctx context.Context) error {
5050

5151
for _, service := range m.servicesByName {
5252
if err := service.Start(ctx); err != nil {
53+
startstop.StopAllParallel(maputil.Values(m.servicesByName)...)
54+
stopped()
5355
return err
5456
}
5557
}

0 commit comments

Comments
 (0)