Skip to content

Commit 0202e38

Browse files
authored
Fix worker-level stuck job timeout (#1133)
Fix a bug that came in with #1126 in which we we were correctly calculating timeout, but then not passing it down to the stuck job function when starting the stuck detection goroutine. There is a test that was checking this worked, but due to the nature of the bug, it was in effect detecting a stuck job after 0s and therefore passing by accident. I looked into ways to add additional testing here, but elected not to add more because they'd involve the sort of test I really hate, which has to wait arbitrarily wait to try and check that something did not happen, introducing both slowness and intermittency. After the fix here lands, this is the sort of thing that's not too likely to regress, and should be noticed quickly in case it does. Fixes #1125.
1 parent dfc8826 commit 0202e38

2 files changed

Lines changed: 5 additions & 3 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
- Fix bug in worker-level stuck job detection. [PR #1133](https://github.com/riverqueue/river/pull/1133).
11+
1012
## [0.30.1] - 2026-01-19
1113

1214
### Fixed

internal/jobexecutor/job_executor.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
231231
ctx, timeoutCancel = context.WithTimeout(ctx, jobTimeout)
232232
defer timeoutCancel()
233233

234-
watchStuckCancel := e.watchStuck(ctx)
234+
watchStuckCancel := e.watchStuck(ctx, jobTimeout)
235235
defer watchStuckCancel()
236236
}
237237

@@ -266,7 +266,7 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) {
266266
// Currently we don't do anything if we notice a job is stuck. Knowing about
267267
// stuck jobs is just used for informational purposes in the producer in
268268
// generating periodic stats.
269-
func (e *JobExecutor) watchStuck(ctx context.Context) context.CancelFunc {
269+
func (e *JobExecutor) watchStuck(ctx context.Context, jobTimeout time.Duration) context.CancelFunc {
270270
// We add a WithoutCancel here so that this inner goroutine becomes
271271
// immune to all context cancellations _except_ the one where it's
272272
// cancelled because we leave JobExecutor.execute.
@@ -281,7 +281,7 @@ func (e *JobExecutor) watchStuck(ctx context.Context) context.CancelFunc {
281281
case <-ctx.Done():
282282
// context cancelled as we leave JobExecutor.execute
283283

284-
case <-time.After(e.ClientJobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)):
284+
case <-time.After(jobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)):
285285
e.ProducerCallbacks.Stuck()
286286

287287
e.Logger.WarnContext(ctx, e.Name+": Job appears to be stuck",

0 commit comments

Comments
 (0)