Skip to content

Commit 0527244

Browse files
committed
Fix recurring task double-enqueue caused by wall-clock race condition
1 parent 176721e commit 0527244

File tree

3 files changed

+22
-7
lines changed

3 files changed

+22
-7
lines changed

app/models/solid_queue/recurring_task.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,9 @@ def create_or_update_all(tasks)
5656
end
5757
end
5858

59-
def delay_from_now
60-
[ (next_time - Time.current).to_f, 0.1 ].max
59+
60+
def next_time_after(time)
61+
parsed_schedule.next_time(time).utc
6162
end
6263

6364
def next_time

lib/solid_queue/scheduler/recurring_schedule.rb

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ def schedule_tasks
3333
end
3434
end
3535

36-
def schedule_task(task)
37-
scheduled_tasks[task.key] = schedule(task)
36+
def schedule_task(task, run_at: task.next_time)
37+
scheduled_tasks[task.key] = schedule(task, run_at: run_at)
3838
end
3939

4040
def unschedule_tasks
@@ -99,9 +99,11 @@ def load_dynamic_tasks
9999
dynamic_tasks_enabled? ? RecurringTask.dynamic.to_a : []
100100
end
101101

102-
def schedule(task)
103-
scheduled_task = Concurrent::ScheduledTask.new(task.delay_from_now, args: [ self, task, task.next_time ]) do |thread_schedule, thread_task, thread_task_run_at|
104-
thread_schedule.schedule_task(thread_task)
102+
def schedule(task, run_at: task.next_time)
103+
delay = [ (run_at - Time.current).to_f, 0.1 ].max
104+
105+
scheduled_task = Concurrent::ScheduledTask.new(delay, args: [ self, task, run_at ]) do |thread_schedule, thread_task, thread_task_run_at|
106+
thread_schedule.schedule_task(thread_task, run_at: thread_task.next_time_after(thread_task_run_at))
105107

106108
wrap_in_app_executor do
107109
thread_task.enqueue(at: thread_task_run_at)

test/models/solid_queue/recurring_task_test.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,18 @@ def perform
203203
assert_equal 4, job.priority
204204
end
205205

206+
test "next_time_after returns the next occurrence after the given time" do
207+
task = recurring_task_with(class_name: "JobWithoutArguments", schedule: "every minute")
208+
209+
# next_time_after a time exactly on the minute boundary should return the following minute
210+
time = Time.utc(2026, 3, 12, 1, 28, 0)
211+
assert_equal Time.utc(2026, 3, 12, 1, 29, 0), task.next_time_after(time)
212+
213+
# next_time_after a time just before the boundary should return that boundary
214+
time = Time.utc(2026, 3, 12, 1, 27, 59)
215+
assert_equal Time.utc(2026, 3, 12, 1, 28, 0), task.next_time_after(time)
216+
end
217+
206218
test "task configured with a command" do
207219
task = recurring_task_with(command: "JobBuffer.add('from_a_command')")
208220
enqueue_and_assert_performed_with_result(task, "from_a_command")

0 commit comments

Comments
 (0)