Skip to content

Commit c7848df

Browse files
committed
Add support for jitter so that we can ensure evenly distributed tasks don't cause all workers to restart at same time
1 parent 18b6a6c commit c7848df

5 files changed

Lines changed: 22 additions & 1 deletion

File tree

taskiq/api/receiver.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ async def run_receiver_task(
1515
sync_workers: int | None = None,
1616
validate_params: bool = True,
1717
max_async_tasks: int = 100,
18+
max_async_tasks_jitter: int = 0,
1819
max_prefetch: int = 0,
1920
propagate_exceptions: bool = True,
2021
run_startup: bool = False,
@@ -43,6 +44,7 @@ async def run_receiver_task(
4344
or processes in processpool that runs sync tasks.
4445
:param validate_params: whether to validate params or not.
4546
:param max_async_tasks: maximum number of simultaneous async tasks.
47+
:param max_async_tasks_jitter: random jitter to add to max_async_tasks.
4648
:param max_prefetch: maximum number of tasks to prefetch.
4749
:param propagate_exceptions: whether to propagate exceptions in generators or not.
4850
:param run_startup: whether to run startup function or not.
@@ -79,6 +81,7 @@ def on_exit(_: Receiver) -> None:
7981
run_startup=run_startup,
8082
validate_params=validate_params,
8183
max_async_tasks=max_async_tasks,
84+
max_async_tasks_jitter=max_async_tasks_jitter,
8285
max_prefetch=max_prefetch,
8386
propagate_exceptions=propagate_exceptions,
8487
on_exit=on_exit,

taskiq/brokers/inmemory_broker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ def __init__(
127127
max_stored_results: int = 100,
128128
cast_types: bool = True,
129129
max_async_tasks: int = 30,
130+
max_async_tasks_jitter: int = 0,
130131
propagate_exceptions: bool = True,
131132
await_inplace: bool = False,
132133
) -> None:
@@ -140,6 +141,7 @@ def __init__(
140141
executor=self.executor,
141142
validate_params=cast_types,
142143
max_async_tasks=max_async_tasks,
144+
max_async_tasks_jitter=max_async_tasks_jitter,
143145
propagate_exceptions=propagate_exceptions,
144146
)
145147
self.await_inplace = await_inplace

taskiq/cli/worker/args.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class WorkerArgs:
4444
reload_dirs: list[str] = field(default_factory=list)
4545
no_gitignore: bool = False
4646
max_async_tasks: int = 100
47+
max_async_tasks_jitter: int = 0
4748
receiver: str = "taskiq.receiver:Receiver"
4849
receiver_arg: list[tuple[str, str]] = field(default_factory=list)
4950
max_prefetch: int = 0
@@ -210,6 +211,14 @@ def from_cli(
210211
default=100,
211212
help="Maximum simultaneous async tasks per worker process. ",
212213
)
214+
parser.add_argument(
215+
"--max-async-tasks-jitter",
216+
type=int,
217+
dest="max_async_tasks_jitter",
218+
default=0,
219+
help="Add random jitter (0 to this value) to max-async-tasks to prevent "
220+
"all workers from closing at the same time. ",
221+
)
213222
parser.add_argument(
214223
"--max-prefetch",
215224
type=int,

taskiq/cli/worker/run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
165165
executor=pool,
166166
validate_params=not args.no_parse,
167167
max_async_tasks=args.max_async_tasks,
168+
max_async_tasks_jitter=args.max_async_tasks_jitter,
168169
max_prefetch=args.max_prefetch,
169170
propagate_exceptions=not args.no_propagate_errors,
170171
ack_type=args.ack_type,

taskiq/receiver/receiver.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import contextvars
33
import functools
44
import inspect
5+
import random
56
import sys
67
from collections.abc import Callable
78
from concurrent.futures import Executor
@@ -37,6 +38,7 @@ def __init__(
3738
executor: Executor | None = None,
3839
validate_params: bool = True,
3940
max_async_tasks: "int | None" = None,
41+
max_async_tasks_jitter: int = 0,
4042
max_prefetch: int = 0,
4143
propagate_exceptions: bool = True,
4244
run_startup: bool = True,
@@ -62,7 +64,11 @@ def __init__(
6264
self._prepare_task(task.task_name, task.original_func)
6365
self.sem: asyncio.Semaphore | None = None
6466
if max_async_tasks is not None and max_async_tasks > 0:
65-
self.sem = asyncio.Semaphore(max_async_tasks)
67+
# Apply jitter to prevent all workers from hitting the limit simultaneously
68+
actual_limit = max_async_tasks
69+
if max_async_tasks_jitter > 0:
70+
actual_limit = max_async_tasks + random.randint(0, max_async_tasks_jitter)
71+
self.sem = asyncio.Semaphore(actual_limit)
6672
else:
6773
logger.warning(
6874
"Setting unlimited number of async tasks "

0 commit comments

Comments
 (0)