Add parallelize class method to maintenance_tasks::task#1337
Add parallelize class method to maintenance_tasks::task#1337
Conversation
adrianna-chang-shopify
left a comment
There was a problem hiding this comment.
I think this feature could make sense, I do think we'll need to make it a bit safer to use so that we don't risk creating a ton of threads or exhausting AR's connection pool. Left some thoughts inline!
| exceptions = [] | ||
| exception_mutex = Mutex.new | ||
|
|
||
| threads = items.map do |item| |
There was a problem hiding this comment.
Batches can be of arbitrary size, e.g. 1000+ items. There are risks of performance degradation / system instability in generating an unbounded number of threads. Should we implement some sort of thread pool with a configurable size?
There was a problem hiding this comment.
(We may also want to coordinate with Rails' connection pool size, which defaults to 5 connections)
There was a problem hiding this comment.
Another idea is to make the thread count part of the API, ie. parallelize(threads: 5). I don't think we should tie thread count to the batch size though.
There was a problem hiding this comment.
+1, I don't want to allow people spawning unbounded number of threads if they just follow the conventions which for in_batches is 1000 elements per batch
| # implement an override for this method. | ||
| def process(_item) | ||
| raise NoMethodError, "#{self.class.name} must implement `process`." | ||
| def process_item(_item) |
There was a problem hiding this comment.
I'm not sure exactly what to name this, but I think we need an API that's more distinct from #process that indicates that this is for parallel processing in a batch. Maybe #process_for_batch?
| items = batch.respond_to?(:to_a) ? batch.to_a : Array(batch) | ||
|
|
||
| # Execute items in parallel, storing errored item for context | ||
| ParallelExecutor.execute(items) do |item| |
There was a problem hiding this comment.
I feel like we could return the exceptions array ([{ item: <item> , error: <error> }]) directly from .execute instead of raising the error. This would simplify things a lot, ie.
class ParallelExecutor
class << self
def execute(items, &block)
...
threads = items.map do |item|
Thread.new do
ActiveRecord::Base.connection_pool.with_connection do
block.call(item)
rescue => error
exception_mutex.synchronize do
exceptions << { item: item, error: error }
end
end
end
end
threads.each(&:join)
exceptions
end
...And then here:
exceptions = ParallelExecutor.execute(items) do |item|
process_item(item)
end
if exceptions.any?
@errored_element = exceptions.first[:item]
raise exceptions.first.error
end|
I think it's a bad idea, we already have a unit of work, it's the job, and we don't have to handle it, the queue does, and it's not something we should get into IMO. |
Add
parallelizeclass method toMaintenanceTasks::TaskSummary
This PR adds a
parallelizeclass method toMaintenanceTasks::Taskthat enables parallel processing of batch items using threads. This provides a cleaner, more Rails-like API compared to including a concern.Usage
Changes
parallelizedclass attribute to track parallel processing modeparallelizeclass method to enable parallel processingparallelized?class and instance methodsprocess_iteminstance method placeholderprocessto route to parallel execution when enabledParallelExecutorfor thread-safe parallel item processingNotes
process_itemis idempotent.process_itemmust be thread-safe. Avoid shared mutable state.