Skip to content

Commit 31f203c

Browse files
committed
Rename max_queue_size to max_queue_threshold in CoroQueue and related components
1 parent 77808e6 commit 31f203c

4 files changed

Lines changed: 43 additions & 35 deletions

File tree

broqer/coro_queue.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ async def _coro(value):
2424

2525

2626
class MaxQueueException(Exception):
27-
""" Exception raised when max_queue_size is exceeded in CoroQueue """
27+
""" Exception raised when max_queue_threshold is exceeded in CoroQueue """
2828

2929

3030
class AsyncMode(Enum):
@@ -42,19 +42,21 @@ class CoroQueue: # pylint: disable=too-few-public-methods
4242
""" Schedules the running of a coroutine given on a mode
4343
:param coro: Coroutine to be scheduled
4444
:param mode: scheduling mode (see AsyncMode)
45-
:param max_queue_size: queue len error threshold, used with AsyncMode.QUEUE
45+
:param max_queue_threshold: queue len error threshold,
46+
used with AsyncMode.QUEUE
4647
"""
4748
def __init__(self, coro, mode=AsyncMode.CONCURRENT,
48-
max_queue_size: int | None = None):
49+
max_queue_threshold: int | None = None):
4950

50-
if max_queue_size is not None and mode != AsyncMode.QUEUE:
51+
if max_queue_threshold is not None and mode != AsyncMode.QUEUE:
5152
raise ValueError(
52-
"max_queue_size can only be used with mode=AsyncMode.QUEUE"
53+
"max_queue_threshold can only be used with "
54+
"mode=AsyncMode.QUEUE"
5355
)
5456

5557
self._coro = coro
5658
self._mode = mode
57-
self._max_queue_size = max_queue_size
59+
self._max_queue_threshold = max_queue_threshold
5860

5961
# ._last_args is used for LAST_DISTINCT and keeps the last arguments
6062
self._last_args = None # type: Optional[Tuple]
@@ -124,11 +126,11 @@ def _start_task(self, args: Tuple, future: asyncio.Future):
124126
def _handle_done(self, result_future: asyncio.Future, task: asyncio.Task):
125127
try:
126128
result = task.result()
127-
if self._queue and self._max_queue_size is not None and \
128-
len(self._queue) > self._max_queue_size:
129+
if self._queue and self._max_queue_threshold is not None and \
130+
len(self._queue) > self._max_queue_threshold:
129131
raise MaxQueueException(
130132
"CoroQueue size is %d and exceeds %d" %
131-
(len(self._queue), self._max_queue_size)
133+
(len(self._queue), self._max_queue_threshold)
132134
)
133135
except asyncio.CancelledError: # happend in INTERRUPT mode
134136
result_future.set_result(NONE)

broqer/op/map_async.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ class MapAsync(Operator): # pylint: disable=too-many-instance-attributes
116116
:param mode: behavior when a value is currently processed
117117
:param error_callback: error callback to be registered
118118
:param unpack: value from emits will be unpacked as (\\*value)
119-
:param max_queue_size: queue len error threshold, used with AsyncMode.QUEUE
119+
:param max_queue_threshold: queue len error threshold,
120+
used with AsyncMode.QUEUE
120121
:param \\*\\*kwargs: keyword arguments to be used for calling coro
121122
122123
:ivar scheduled: Publisher emitting the value when coroutine is actually
@@ -125,11 +126,11 @@ class MapAsync(Operator): # pylint: disable=too-many-instance-attributes
125126
def __init__(self,
126127
coro, *args, mode=AsyncMode.CONCURRENT,
127128
error_callback=default_error_handler, unpack: bool = False,
128-
max_queue_size: int | None = None, **kwargs) -> None:
129+
max_queue_threshold: int | None = None, **kwargs) -> None:
129130
Operator.__init__(self)
130131
_coro = wrap_coro(coro, unpack, *args, **kwargs)
131132
self._coro_queue = CoroQueue(
132-
_coro, mode=mode, max_queue_size=max_queue_size
133+
_coro, mode=mode, max_queue_threshold=max_queue_threshold
133134
)
134135
self._error_callback = error_callback
135136

@@ -154,18 +155,19 @@ def build_map_async(coro=None, *,
154155
mode: AsyncMode = AsyncMode.CONCURRENT,
155156
error_callback=default_error_handler,
156157
unpack: bool = False,
157-
max_queue_size: int | None = None):
158+
max_queue_threshold: int | None = None):
158159
""" Decorator to wrap a function to return a Map operator.
159160
160161
:param coro: coroutine to be wrapped
161162
:param mode: behavior when a value is currently processed
162163
:param error_callback: error callback to be registered
163164
:param unpack: value from emits will be unpacked (*value)
164-
:param max_queue_size: queue len error threshold, used with AsyncMode.QUEUE
165+
:param max_queue_threshold: queue len error threshold,
166+
used with AsyncMode.QUEUE
165167
"""
166168
def _build_map_async(coro):
167169
return MapAsync(coro, mode=mode, error_callback=error_callback,
168-
unpack=unpack, max_queue_size=max_queue_size)
170+
unpack=unpack, max_queue_threshold=max_queue_threshold)
169171

170172
if coro:
171173
return _build_map_async(coro)
@@ -177,32 +179,33 @@ def build_map_async_factory(coro=None, *,
177179
mode: AsyncMode = AsyncMode.CONCURRENT,
178180
error_callback=default_error_handler,
179181
unpack: bool = False,
180-
max_queue_size: int | None = None):
182+
max_queue_threshold: int | None = None):
181183
""" Decorator to wrap a coroutine to return a factory for MapAsync
182184
operators.
183185
184186
:param coro: coroutine to be wrapped
185187
:param mode: behavior when a value is currently processed
186188
:param error_callback: error callback to be registered
187189
:param unpack: value from emits will be unpacked (*value)
188-
:param max_queue_size: queue len error threshold, used with AsyncMode.QUEUE
190+
:param max_queue_threshold: queue len error threshold,
191+
used with AsyncMode.QUEUE
189192
"""
190193
_mode = mode
191194

192195
def _build_map_async(coro):
193196
@wraps(coro)
194197
def _wrapper(*args, mode=None, **kwargs) -> MapAsync:
195198
if ('unpack' in kwargs) or ('error_callback' in kwargs) or \
196-
('max_queue_size' in kwargs):
199+
('max_queue_threshold' in kwargs):
197200
raise TypeError(
198-
'"unpack", "error_callback" or "max_queue_size" has to '
199-
'be defined by decorator'
201+
'"unpack", "error_callback" or "max_queue_threshold" '
202+
'has to be defined by decorator'
200203
)
201204
if mode is None:
202205
mode = _mode
203206
return MapAsync(coro, *args, mode=mode, unpack=unpack,
204207
error_callback=error_callback,
205-
max_queue_size=max_queue_size, **kwargs)
208+
max_queue_threshold=max_queue_threshold, **kwargs)
206209
return _wrapper
207210

208211
if coro:

broqer/subscribers/sink_async.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,18 @@ class SinkAsync(Subscriber): # pylint: disable=too-few-public-methods
4747
:param mode: behavior when a value is currently processed
4848
:param error_callback: error callback to be registered
4949
:param unpack: value from emits will be unpacked as (\\*value)
50-
:param max_queue_size: queue len error threshold,used with AsyncMode.QUEUE
50+
:param max_queue_threshold: queue len error threshold,
51+
used with AsyncMode.QUEUE
5152
:param \\*\\*kwargs: keyword arguments to be used for calling coro
5253
"""
5354
def __init__(self, coro, *args, mode=AsyncMode.CONCURRENT,
5455
error_callback=default_error_handler,
5556
unpack: bool = False,
56-
max_queue_size: int | None = None, **kwargs) -> None:
57+
max_queue_threshold: int | None = None, **kwargs) -> None:
5758

5859
_coro = wrap_coro(coro, unpack, *args, **kwargs)
5960
self._coro_queue = CoroQueue(
60-
_coro, mode=mode, max_queue_size=max_queue_size
61+
_coro, mode=mode, max_queue_threshold=max_queue_threshold
6162
)
6263
self._error_callback = error_callback
6364

@@ -93,29 +94,30 @@ def build_sink_async_factory(coro=None, *,
9394
mode: AsyncMode = AsyncMode.CONCURRENT,
9495
error_callback=default_error_handler,
9596
unpack: bool = False,
96-
max_queue_size: int | None = None):
97+
max_queue_threshold: int | None = None):
9798
""" Decorator to wrap a coroutine to return a factory for SinkAsync
9899
subscribers.
99100
100101
:param coro: coroutine to be wrapped
101102
:param mode: behavior when a value is currently processed
102103
:param error_callback: error callback to be registered
103104
:param unpack: value from emits will be unpacked (*value)
104-
:param max_queue_size: queue len error threshold, used with AsyncMode.QUEUE
105+
:param max_queue_threshold: queue len error threshold,
106+
used with AsyncMode.QUEUE
105107
"""
106108
def _build_sink_async(coro):
107109
@wraps(coro)
108110
def _wrapper(*args, **kwargs) -> SinkAsync:
109-
if ('unpack' in kwargs) or ('max_queue_size' in kwargs) or \
111+
if ('unpack' in kwargs) or ('max_queue_threshold' in kwargs) or \
110112
('error_callback' in kwargs) or ('mode' in kwargs):
111113
raise TypeError(
112114
'"unpack", "mode", "error_callback" and '
113-
'"max_queue_size" has to be defined by decorator'
115+
'"max_queue_threshold" has to be defined by decorator'
114116
)
115117

116118
return SinkAsync(coro, *args, mode=mode,
117119
error_callback=error_callback, unpack=unpack,
118-
max_queue_size=max_queue_size,
120+
max_queue_threshold=max_queue_threshold,
119121
**kwargs)
120122
return _wrapper
121123

@@ -129,21 +131,22 @@ def sink_async_property(coro=None, *,
129131
mode: AsyncMode = AsyncMode.CONCURRENT,
130132
error_callback=default_error_handler,
131133
unpack: bool = False,
132-
max_queue_size: int | None = None):
134+
max_queue_threshold: int | None = None):
133135
""" Decorator to build a property returning a SinkAsync subscriber.
134136
135137
:param coro: coroutine to be wrapped
136138
:param mode: behavior when a value is currently processed
137139
:param error_callback: error callback to be registered
138140
:param unpack: value from emits will be unpacked (*value)
139-
:param max_queue_size: queue len error threshold, used with AsyncMode.QUEUE
141+
:param max_queue_threshold: queue len error threshold,
142+
used with AsyncMode.QUEUE
140143
"""
141144
def build_sink_async_property(coro):
142145
@property
143146
def _build_sink_async(self):
144147
return SinkAsync(coro, self, mode=mode,
145148
error_callback=error_callback, unpack=unpack,
146-
max_queue_size=max_queue_size)
149+
max_queue_threshold=max_queue_threshold)
147150
return _build_sink_async
148151

149152
if coro:

tests/test_coro_queue.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ async def _coro(index):
181181
await asyncio.sleep(0.001)
182182
return index + 2
183183

184-
coro_queue = CoroQueue(coro=_coro, mode=AsyncMode.QUEUE, max_queue_size=10)
184+
coro_queue = CoroQueue(coro=_coro, mode=AsyncMode.QUEUE, max_queue_threshold=10)
185185

186186
future_0 = coro_queue.schedule(0)
187187
future_1 = coro_queue.schedule(40)
@@ -209,9 +209,9 @@ async def _coro():
209209
return 42
210210

211211
with pytest.raises(ValueError) as exc_info:
212-
CoroQueue(coro=_coro, mode=mode, max_queue_size=10)
212+
CoroQueue(coro=_coro, mode=mode, max_queue_threshold=10)
213213

214-
assert str(exc_info.value) == 'max_queue_size can only be used with mode=AsyncMode.QUEUE'
214+
assert str(exc_info.value) == 'max_queue_threshold can only be used with mode=AsyncMode.QUEUE'
215215

216216

217217
@pytest.mark.parametrize('distinct', [True, False])

0 commit comments

Comments
 (0)