diff --git a/CHANGELOG.md b/CHANGELOG.md index 76ffc5d4..663e8a9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +- Fix bug in worker-level stuck job detection. [PR #1133](https://github.com/riverqueue/river/pull/1133). + ## [0.30.1] - 2026-01-19 ### Fixed diff --git a/internal/jobexecutor/job_executor.go b/internal/jobexecutor/job_executor.go index 69e655ce..588902fa 100644 --- a/internal/jobexecutor/job_executor.go +++ b/internal/jobexecutor/job_executor.go @@ -231,7 +231,7 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { ctx, timeoutCancel = context.WithTimeout(ctx, jobTimeout) defer timeoutCancel() - watchStuckCancel := e.watchStuck(ctx) + watchStuckCancel := e.watchStuck(ctx, jobTimeout) defer watchStuckCancel() } @@ -266,7 +266,7 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { // Currently we don't do anything if we notice a job is stuck. Knowing about // stuck jobs is just used for informational purposes in the producer in // generating periodic stats. -func (e *JobExecutor) watchStuck(ctx context.Context) context.CancelFunc { +func (e *JobExecutor) watchStuck(ctx context.Context, jobTimeout time.Duration) context.CancelFunc { // We add a WithoutCancel here so that this inner goroutine becomes // immune to all context cancellations _except_ the one where it's // cancelled because we leave JobExecutor.execute. @@ -281,7 +281,7 @@ func (e *JobExecutor) watchStuck(ctx context.Context) context.CancelFunc { case <-ctx.Done(): // context cancelled as we leave JobExecutor.execute - case <-time.After(e.ClientJobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)): + case <-time.After(jobTimeout + cmp.Or(e.StuckThresholdOverride, stuckThresholdDefault)): e.ProducerCallbacks.Stuck() e.Logger.WarnContext(ctx, e.Name+": Job appears to be stuck",