22import contextvars
33import random
44import time
5+ import unittest .mock
56from collections .abc import Generator
67from concurrent .futures import ThreadPoolExecutor
78from functools import wraps
@@ -24,13 +25,15 @@ def get_receiver(
2425 broker : AsyncBroker | None = None ,
2526 no_parse : bool = False ,
2627 max_async_tasks : int | None = None ,
28+ max_async_tasks_jitter : int = 0 ,
2729) -> Receiver :
2830 """
2931 Returns receiver with custom broker and args.
3032
3133 :param broker: broker, defaults to None
3234 :param no_parse: parameter to taskiq_args, defaults to False
33- :param cli_args: Taskiq worker CLI arguments.
35+ :param max_async_tasks: maximum number of simultaneous async tasks.
36+ :param max_async_tasks_jitter: random jitter to add to max_async_tasks.
3437 :return: new receiver.
3538 """
3639 if broker is None :
@@ -40,6 +43,7 @@ def get_receiver(
4043 executor = ThreadPoolExecutor (max_workers = 10 ),
4144 validate_params = not no_parse ,
4245 max_async_tasks = max_async_tasks ,
46+ max_async_tasks_jitter = max_async_tasks_jitter ,
4347 )
4448
4549
@@ -544,3 +548,55 @@ async def task_no_result() -> str:
544548 assert resp .return_value == "some value"
545549 assert not broker ._running_tasks
546550 assert wrapper_call is True
551+
552+
553+ async def test_jitter_applied_to_semaphore () -> None :
554+ """Test that jitter is correctly applied to max_async_tasks semaphore."""
555+ max_async_tasks = 100
556+ max_async_tasks_jitter = 10
557+
558+ # Test with jitter value of 0 (minimum)
559+ with unittest .mock .patch ("random.randint" , return_value = 0 ):
560+ receiver = get_receiver (
561+ max_async_tasks = max_async_tasks ,
562+ max_async_tasks_jitter = max_async_tasks_jitter ,
563+ )
564+ assert receiver .sem is not None
565+ assert receiver .sem ._value == max_async_tasks
566+
567+ # Test with jitter value of 5 (middle)
568+ with unittest .mock .patch ("random.randint" , return_value = 5 ):
569+ receiver = get_receiver (
570+ max_async_tasks = max_async_tasks ,
571+ max_async_tasks_jitter = max_async_tasks_jitter ,
572+ )
573+ assert receiver .sem is not None
574+ assert receiver .sem ._value == max_async_tasks + 5
575+
576+ # Test with jitter value of 10 (maximum)
577+ with unittest .mock .patch ("random.randint" , return_value = 10 ):
578+ receiver = get_receiver (
579+ max_async_tasks = max_async_tasks ,
580+ max_async_tasks_jitter = max_async_tasks_jitter ,
581+ )
582+ assert receiver .sem is not None
583+ assert receiver .sem ._value == max_async_tasks + 10
584+
585+
586+ async def test_jitter_zero_no_randomization () -> None :
587+ """Test with zero jitter, semaphore value matches max_async_tasks."""
588+ max_async_tasks = 50
589+
590+ receiver = get_receiver (
591+ max_async_tasks = max_async_tasks ,
592+ max_async_tasks_jitter = 0 ,
593+ )
594+
595+ assert receiver .sem is not None
596+ assert receiver .sem ._value == max_async_tasks
597+
598+
599+ async def test_no_semaphore_without_max_async_tasks () -> None :
600+ """Test that semaphore is None when max_async_tasks is not set."""
601+ receiver = get_receiver (max_async_tasks = None )
602+ assert receiver .sem is None
0 commit comments