Correct me if I'm wrong but currently there's no way to cap amount of jobs a single worker can work concurrently across all queues, similarly to what celery worker --concurrency=N setting does in celery.
Currently throughput can be controlled through these primary mechanisms:
- Using
Queue.concurrency to control how many jobs a single worker can work concurrently
- Using
Queue.rate_limit to control how many jobs can be executed in given time frame by all workers
- Spawning more worker processes
This covers almost everything except scenario where a single worker can work too many queues and get oversaturated by taking in too many jobs leading timeouts (if set), particularly when using AsyncExecutor type executor.
For example, given there are 10 queues all of which have concurrency=5 a single general worker would enqueue 50 jobs slowing execution of each down to a crawl as all jobs are sharing the same asyncio loop / process.
To address this a pretty complex orchestration design between queues and workers is needed where some workers subscribe to some queues to optimize throughput without the asyncio loop saturation. Alternatively a per worker concurrency cap would be a lazy fix here.
I looked into this a bit and seems like a Plugin can work here but it's slightly sub-optimal as the worker would still lock some amount of jobs it's not actually working on yet:
class WorkerConcurrencyCap(Plugin):
"""
Enforce a per-worker global concurrency cap across all queues.
Works by blocking in on_job_starting() until a semaphore slot is available.
"""
def __init__(self, max_in_flight: int = 10):
self._sem = asyncio.Semaphore(max_in_flight)
self._held: set[str] = set()
super().__init__()
@staticmethod
def get_identifier() -> str:
return "worker_concurrency_cap"
async def on_job_starting(
self,
*,
job: QueuedJob,
worker: "Worker",
) -> QueuedJob:
await self._sem.acquire()
self._held.add(job.id)
return job
async def on_job_completed(
self,
*,
worker: "Worker",
job: QueuedJob,
exc: Exception | None = None,
result: Any | None = None,
) -> QueuedJob:
if job.id in self._held:
self._held.remove(job.id)
self._sem.release()
return job
async def run(self, worker: "Worker", chancy: "Chancy"):
while True:
await self.sleep(3600)
This plugin does work with one caveat that more than max_in_flight jobs are set as status=running as that seems to be set by executor before on_job_starting callback is called to block job execution. So with 10 queues of concurrency=5 all 50 jobs will be set to status=running but only max_in_flight=10 actually run concurrently. Seems like Job timeout (Limit(Limit.Type.TIME, seconds)) is not starting until this block clears so aside from incorrect status everything else works as expected.
Open to other suggestions on how this could be orchestrated to avoid this issue though
Edit: Actually a custom worker can also be used instead of a plugin with the same semaphore-based blocking technique:
class CappedWorker(Worker):
_cap: asyncio.Semaphore | None
def __init__(self, *args, max_in_flight: int | None = 10, **kwargs):
super().__init__(*args, **kwargs)
self._cap = asyncio.Semaphore(max_in_flight) if max_in_flight else None
async def fetch_jobs(self, queue: Queue, conn, *, up_to: int = 1):
if self._cap is None:
return await super().fetch_jobs(queue, conn, up_to=up_to)
# Only claim as many jobs as we have global capacity for.
available = self._cap._value # private-ish but works
if available <= 0:
return []
# Claim at most min(up_to, available)
jobs = await super().fetch_jobs(queue, conn, up_to=min(up_to, available))
# Reserve slots for claimed jobs (so other queues also respect cap)
for _ in jobs:
await self._cap.acquire()
return jobs
async def on_job_completed(self, *, queue: Queue, job: Any):
# Release global slot when job is truly completed
if self._cap:
self._cap.release()
return await super().on_job_completed(queue=queue, job=job)
This does not suffer from the same issue the plugin does.
Correct me if I'm wrong but currently there's no way to cap amount of jobs a single worker can work concurrently across all queues, similarly to what
celery worker --concurrency=Nsetting does in celery.Currently throughput can be controlled through these primary mechanisms:
Queue.concurrencyto control how many jobs a single worker can work concurrentlyQueue.rate_limitto control how many jobs can be executed in given time frame by all workersThis covers almost everything except scenario where a single worker can work too many queues and get oversaturated by taking in too many jobs leading timeouts (if set), particularly when using
AsyncExecutortype executor.For example, given there are 10 queues all of which have
concurrency=5a single general worker would enqueue 50 jobs slowing execution of each down to a crawl as all jobs are sharing the same asyncio loop / process.To address this a pretty complex orchestration design between queues and workers is needed where some workers subscribe to some queues to optimize throughput without the asyncio loop saturation. Alternatively a per worker concurrency cap would be a lazy fix here.
I looked into this a bit and seems like a
Plugincan work here but it's slightly sub-optimal as the worker would still lock some amount of jobs it's not actually working on yet:This plugin does work with one caveat that more than
max_in_flightjobs are set asstatus=runningas that seems to be set by executor beforeon_job_startingcallback is called to block job execution. So with 10 queues of concurrency=5 all 50 jobs will be set tostatus=runningbut onlymax_in_flight=10actually run concurrently. Seems like Job timeout (Limit(Limit.Type.TIME, seconds)) is not starting until this block clears so aside from incorrectstatuseverything else works as expected.Open to other suggestions on how this could be orchestrated to avoid this issue though
Edit: Actually a custom worker can also be used instead of a plugin with the same semaphore-based blocking technique:
This does not suffer from the same issue the plugin does.