From 23c05f905b2114f3eb9d2107330f8f4b7b6d7158 Mon Sep 17 00:00:00 2001 From: smudge Date: Wed, 26 Jun 2024 15:59:59 -0400 Subject: [PATCH 1/3] Fix spin-loop/cleanup failure mode within run loop This ensures that exceptions raised in thread callback hooks are rescued and properly mark jobs as failed. Fixes #23 and #41 --- lib/delayed/worker.rb | 46 +++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index e96bc945..68e5ea1b 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -98,12 +98,10 @@ def work_off(num = 100) pool = Concurrent::FixedThreadPool.new(jobs.length) jobs.each do |job| pool.post do - run_thread_callbacks(job) do - if run_job(job) - success.increment - else - failure.increment - end + if run_job(job) + success.increment + else + failure.increment end end end @@ -122,25 +120,27 @@ def run_thread_callbacks(job, &block) end def run(job) - metadata = { - status: 'RUNNING', - name: job.name, - run_at: job.run_at, - created_at: job.created_at, - priority: job.priority, - queue: job.queue, - attempts: job.attempts, - enqueued_for: (Time.current - job.created_at).round, - } - job_say job, metadata.to_json - run_time = Benchmark.realtime do - Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do - job.invoke_job + run_thread_callbacks(job) do + metadata = { + status: 'RUNNING', + name: job.name, + run_at: job.run_at, + created_at: job.created_at, + priority: job.priority, + queue: job.queue, + attempts: job.attempts, + enqueued_for: (Time.current - job.created_at).round, + } + job_say job, metadata.to_json + run_time = Benchmark.realtime do + Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do + job.invoke_job + end + job.destroy end - job.destroy + job_say job, format('COMPLETED after %.4f seconds', run_time) + true # did work end - job_say job, format('COMPLETED after %.4f seconds', run_time) - true # did work rescue DeserializationError => e job_say job, "FAILED permanently with #{e.class.name}: #{e.message}", 'error' From 2ce9b67316c0197d58cce177bd61eab0d7334d89 Mon Sep 17 00:00:00 2001 From: smudge Date: Wed, 26 Jun 2024 18:44:38 -0400 Subject: [PATCH 2/3] Make 'num' mean # of jobs, not # of iterations (like in pre-threading implementation) --- lib/delayed/worker.rb | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index 68e5ea1b..f9751742 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -89,20 +89,17 @@ def on_exit! # Exit early if interrupted. def work_off(num = 100) success = Concurrent::AtomicFixnum.new(0) - failure = Concurrent::AtomicFixnum.new(0) + total = 0 - num.times do + while total < num jobs = reserve_jobs break if jobs.empty? + total += jobs.length pool = Concurrent::FixedThreadPool.new(jobs.length) jobs.each do |job| pool.post do - if run_job(job) - success.increment - else - failure.increment - end + success.increment if run_job(job) end end @@ -112,7 +109,7 @@ def work_off(num = 100) break if stop? # leave if we're exiting end - [success, failure].map(&:value) + [success.value, total - success.value] end def run_thread_callbacks(job, &block) @@ -139,13 +136,14 @@ def run(job) job.destroy end job_say job, format('COMPLETED after %.4f seconds', run_time) - true # did work end + true # did work rescue DeserializationError => e job_say job, "FAILED permanently with #{e.class.name}: #{e.message}", 'error' job.error = e failed(job) + false # work failed rescue Exception => e # rubocop:disable Lint/RescueException self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, e) } false # work failed From d4cae210ef20ed7b6244679c02e320361678d104 Mon Sep 17 00:00:00 2001 From: smudge Date: Wed, 26 Jun 2024 19:05:10 -0400 Subject: [PATCH 3/3] Pin sqlite3 for build --- .github/workflows/ci.yml | 2 +- Gemfile | 2 +- Gemfile.lock | 2 +- gemfiles/rails_5_2.gemfile | 2 +- gemfiles/rails_6_0.gemfile | 2 +- gemfiles/rails_6_1.gemfile | 2 +- gemfiles/rails_7_0.gemfile | 2 +- gemfiles/rails_7_1.gemfile | 2 +- gemfiles/rails_main.gemfile | 2 +- 9 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 46e75337..85056040 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,7 +8,7 @@ jobs: strategy: fail-fast: false matrix: - ruby: ['2.6', '2.7', '3.0', '3.1', '3.2'] + ruby: ['2.7', '3.0', '3.1', '3.2'] gemfile: - gemfiles/rails_5_2.gemfile - gemfiles/rails_6_0.gemfile diff --git a/Gemfile b/Gemfile index 509ad67b..2848b1e9 100644 --- a/Gemfile +++ b/Gemfile @@ -11,6 +11,6 @@ gem 'mysql2' gem 'pg' gem 'rake' gem 'rspec' -gem 'sqlite3' +gem 'sqlite3', '~> 1.7.3' gem 'timecop' gem 'zeitwerk' diff --git a/Gemfile.lock b/Gemfile.lock index 3d2aa2ed..f5e0eb1d 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -216,7 +216,7 @@ DEPENDENCIES pg rake rspec - sqlite3 + sqlite3 (~> 1.7.3) timecop zeitwerk diff --git a/gemfiles/rails_5_2.gemfile b/gemfiles/rails_5_2.gemfile index da3da842..55b092b9 100644 --- a/gemfiles/rails_5_2.gemfile +++ b/gemfiles/rails_5_2.gemfile @@ -11,7 +11,7 @@ gem "mysql2" gem "pg" gem "rake" gem "rspec" -gem "sqlite3" +gem "sqlite3", "~> 1.7.3" gem "timecop" gem "zeitwerk" diff --git a/gemfiles/rails_6_0.gemfile b/gemfiles/rails_6_0.gemfile index bff971b3..d6bea622 100644 --- a/gemfiles/rails_6_0.gemfile +++ b/gemfiles/rails_6_0.gemfile @@ -11,7 +11,7 @@ gem "mysql2" gem "pg" gem "rake" gem "rspec" -gem "sqlite3" +gem "sqlite3", "~> 1.7.3" gem "timecop" gem "zeitwerk" diff --git a/gemfiles/rails_6_1.gemfile b/gemfiles/rails_6_1.gemfile index b6a59735..6d6f5077 100644 --- a/gemfiles/rails_6_1.gemfile +++ b/gemfiles/rails_6_1.gemfile @@ -11,7 +11,7 @@ gem "mysql2" gem "pg" gem "rake" gem "rspec" -gem "sqlite3" +gem "sqlite3", "~> 1.7.3" gem "timecop" gem "zeitwerk" diff --git a/gemfiles/rails_7_0.gemfile b/gemfiles/rails_7_0.gemfile index 30055212..5c98e898 100644 --- a/gemfiles/rails_7_0.gemfile +++ b/gemfiles/rails_7_0.gemfile @@ -11,7 +11,7 @@ gem "mysql2" gem "pg" gem "rake" gem "rspec" -gem "sqlite3" +gem "sqlite3", "~> 1.7.3" gem "timecop" gem "zeitwerk" diff --git a/gemfiles/rails_7_1.gemfile b/gemfiles/rails_7_1.gemfile index b1431c05..f15f0eb7 100644 --- a/gemfiles/rails_7_1.gemfile +++ b/gemfiles/rails_7_1.gemfile @@ -11,7 +11,7 @@ gem "mysql2" gem "pg" gem "rake" gem "rspec" -gem "sqlite3" +gem "sqlite3", "~> 1.7.3" gem "timecop" gem "zeitwerk" diff --git a/gemfiles/rails_main.gemfile b/gemfiles/rails_main.gemfile index 4fcdd256..7742de24 100644 --- a/gemfiles/rails_main.gemfile +++ b/gemfiles/rails_main.gemfile @@ -11,7 +11,7 @@ gem "mysql2" gem "pg" gem "rake" gem "rspec" -gem "sqlite3" +gem "sqlite3", "~> 1.7.3" gem "timecop" gem "zeitwerk" gem "actionview", github: "rails/rails", glob: "actionview/*.gemspec"