Skip to content

Commit fe16c1d

Browse files
committed
Try to start the queue maintainer multiple times with backoff
This one's aimed at addressing #1161. `HookPeriodicJobsStart.Start` may return an error that causes the queue maintainer not to start, and there are a few other intermittent errors that may cause it not to start (say in the case of a transient DB problem). If this were to occur, the course of action currently is for the client to to just spit an error to logs and not try any additional remediation, which could have the effect of leaving the queue maintainer offline for extended periods. Here, try to address this broadly by allowing the queue maintainer a few attempts at starting, and with our standard exponential backoff (1s, 2s, 4s, 8s, etc.). In case a queue maintainer fails to start completely, the client requests resignation and hands leadership off to another client to see if it can start successfully. I think this is an okay compromise because in case of a non-transient fundamental error (say `HookPeriodicJobsStart.Start` always returns an error), we don't go into a hot loop that starts hammering things. Instead, we'll get a reasonably responsible slow back off that gives things a chance to recover, and which should be very visible in logs. Fixes #1161.
1 parent bcbcb6d commit fe16c1d

File tree

2 files changed

+37
-3
lines changed

2 files changed

+37
-3
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Fixed
11+
12+
- Upon a client gaining leadership, its queue maintainer is given more than one opportunity to start. [PR #1184](https://github.com/riverqueue/river/pull/1184).
13+
1014
## [0.32.0] - 2026-03-23
1115

1216
### Added

client.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/riverqueue/river/rivershared/testsignal"
3434
"github.com/riverqueue/river/rivershared/util/dbutil"
3535
"github.com/riverqueue/river/rivershared/util/maputil"
36+
"github.com/riverqueue/river/rivershared/util/serviceutil"
3637
"github.com/riverqueue/river/rivershared/util/sliceutil"
3738
"github.com/riverqueue/river/rivershared/util/testutil"
3839
"github.com/riverqueue/river/rivershared/util/valutil"
@@ -1290,9 +1291,9 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar
12901291
// cancel the queue maintainer start, and overall run much faster.
12911292
c.testSignals.electedLeader.Signal(struct{}{})
12921293

1293-
if err := c.queueMaintainer.Start(ctx); err != nil {
1294-
c.baseService.Logger.ErrorContext(ctx, "Error starting queue maintainer", slog.String("err", err.Error()))
1295-
}
1294+
// Start the queue maintainer with a few retries and exponential
1295+
// backoff. In case of total failure, request resignation.
1296+
c.tryStartQueueMaintainer(ctx)
12961297

12971298
default:
12981299
c.queueMaintainer.Stop()
@@ -1324,6 +1325,35 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar
13241325
return nil
13251326
}
13261327

1328+
// Tries to start the queue maintainer after gaining leadership. We allow some
1329+
// retries with exponential backoff in case of failure, and in case the queue
1330+
// maintainer can't be started, we request resignation to allow another client
1331+
// to try and take over.
1332+
func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context) {
1333+
const maxStartAttempts = 5
1334+
1335+
var lastErr error
1336+
for attempt := 1; attempt <= maxStartAttempts; attempt++ {
1337+
if lastErr = c.queueMaintainer.Start(ctx); lastErr == nil {
1338+
return
1339+
}
1340+
1341+
c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Error starting queue maintainer",
1342+
slog.String("err", lastErr.Error()), slog.Int("attempt", attempt))
1343+
1344+
if attempt < maxStartAttempts {
1345+
serviceutil.CancellableSleep(ctx, serviceutil.ExponentialBackoff(attempt, serviceutil.MaxAttemptsBeforeResetDefault))
1346+
}
1347+
}
1348+
1349+
c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Queue maintainer failed to start after all attempts, requesting leader resignation",
1350+
slog.String("err", lastErr.Error()))
1351+
1352+
if err := c.clientNotifyBundle.RequestResign(ctx); err != nil {
1353+
c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Error requesting leader resignation", slog.String("err", err.Error()))
1354+
}
1355+
}
1356+
13271357
// Driver exposes the underlying driver used by the client.
13281358
//
13291359
// API is not stable. DO NOT USE.

0 commit comments

Comments
 (0)