From 584d8bce88e36b0807f6c7045810b3d39b2547d1 Mon Sep 17 00:00:00 2001 From: Brandur Date: Mon, 26 Jan 2026 07:05:17 -0800 Subject: [PATCH] Fix worker-level stuck job timeout Fix a bug that came in with #1126 in which we we were correctly calculating timeout, but then not passing it down to the stuck job function when starting the stuck detection goroutine. There is a test that was checking this worked, but due to the nature of the bug, it was in effect detecting a stuck job after 0s and therefore passing by accident. I looked into ways to add additional testing here, but elected not to add more because they'd involve the sort of test I really hate, which has to wait arbitrarily wait to try and check that something did not happen, introducing both slowness and intermittency. After the fix here lands, this is the sort of thing that's not too likely to regress, and should be noticed quickly in case it does. Fixes #1125. --- CHANGELOG.md | 2 ++ internal/jobexecutor/job_executor.go | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 76ffc5d4..663e8a9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +- Fix bug in worker-level stuck job detection. [PR #1133](https://github.com/riverqueue/river/pull/1133). + ## [0.30.1] - 2026-01-19 ### Fixed diff --git a/internal/jobexecutor/job_executor.go b/internal/jobexecutor/job_executor.go index 69e655ce..588902fa 100644 --- a/internal/jobexecutor/job_executor.go +++ b/internal/jobexecutor/job_executor.go @@ -231,7 +231,7 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { ctx, timeoutCancel = context.WithTimeout(ctx, jobTimeout) defer timeoutCancel() - watchStuckCancel := e.watchStuck(ctx) + watchStuckCancel := e.watchStuck(ctx, jobTimeout) defer watchStuckCancel() } @@ -266,7 +266,7 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { // Currently we don't do anything if we notice a job is stuck. Knowing about // stuck jobs is just used for informational purposes in the producer in // generating periodic stats. -func (e *JobExecutor) watchStuck(ctx context.Context) context.CancelFunc { +func (e *JobExecutor) watchStuck(ctx context.Context, jobTimeout time.Duration) context.CancelFunc { // We add a WithoutCancel here so that this inner goroutine becomes // immune to all context cancellations _except_ the one where it's // cancelled because we leave JobExecutor.execute. @@ -281,7 +281,7 @@ func (e *JobExecutor) watchStuck(ctx context.Context) context.CancelFunc { case <-ctx.Done(): // context cancelled as we leave JobExecutor.execute - case <-time.After(e.ClientJobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)): + case <-time.After(jobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)): e.ProducerCallbacks.Stuck() e.Logger.WarnContext(ctx, e.Name+": Job appears to be stuck",