Skip to content

Commit 0bab969

Browse files
committed
Fixed process-pool for sync tasks.
1 parent 5b6d387 commit 0bab969

2 files changed

Lines changed: 22 additions & 11 deletions

File tree

a.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import asyncio
2+
3+
from taskiq_redis import ListQueueBroker
4+
5+
broker = ListQueueBroker("redis://localhost")
6+
7+
8+
@broker.task
9+
async def my_task():
10+
pass
11+
12+
13+
async def main():
14+
async with broker:
15+
await my_task.kiq()
16+
17+
18+
if __name__ == "__main__":
19+
asyncio.run(main())

taskiq/receiver/receiver.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,6 @@ def _execute_sync_task_in_executor(
4646
return target(*args, **kwargs)
4747

4848

49-
def _is_process_pool_executor(executor: Executor) -> bool:
50-
"""Check if the executor is a ProcessPoolExecutor.
51-
52-
:param executor: executor instance
53-
:return: True if it's a ProcessPoolExecutor
54-
"""
55-
return isinstance(executor, ProcessPoolExecutor)
56-
57-
5849
class Receiver:
5950
"""Class that uses as a callback handler."""
6051

@@ -96,6 +87,7 @@ def __init__(
9687
"can result in undefined behavior",
9788
)
9889
self.sem_prefetch = asyncio.Semaphore(max_prefetch)
90+
self.is_process_pool = isinstance(executor, ProcessPoolExecutor)
9991

10092
async def callback( # noqa: C901, PLR0912
10193
self,
@@ -274,7 +266,7 @@ async def run_task( # noqa: C901, PLR0912, PLR0915
274266
is_coroutine = False
275267
# If this is a synchronous function, we
276268
# run it in executor and preserve the context.
277-
if _is_process_pool_executor(self.executor):
269+
if self.is_process_pool:
278270
# For ProcessPoolExecutor, we can't use ctx.run because it contains
279271
# a reference to contextvars.Context which cannot be pickled.
280272
# Instead, we call the target function directly in the executor.
@@ -284,7 +276,7 @@ async def run_task( # noqa: C901, PLR0912, PLR0915
284276
self.executor,
285277
_execute_sync_task_in_executor,
286278
target,
287-
message.args,
279+
tuple(message.args),
288280
kwargs,
289281
)
290282
else:

0 commit comments

Comments
 (0)