Skip to content

Commit 409485f

Browse files
committed
Made tasks independent from brokers.
1 parent e5c4abf commit 409485f

6 files changed

Lines changed: 138 additions & 57 deletions

File tree

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,8 @@ lint.ignore = [
172172
"D100", # Missing docstring in public module
173173
"ANN401", # typing.Any are disallowed in `**kwargs
174174
"PLR0913", # Too many arguments for function call
175-
"D106" # Missing docstring in public nested class
175+
"D106", # Missing docstring in public nested class
176+
"UP037" # Remove quotes from a type def
176177
]
177178
lint.mccabe = { max-complexity = 10 }
178179

taskiq/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from taskiq.scheduler.scheduler import TaskiqScheduler
3737
from taskiq.state import TaskiqState
3838
from taskiq.task import AsyncTaskiqTask
39+
from taskiq.task_gen import TaskiqTaskGenerator, task_gen
3940

4041
__version__ = version("taskiq")
4142

@@ -68,8 +69,10 @@
6869
"TaskiqResultTimeoutError",
6970
"TaskiqScheduler",
7071
"TaskiqState",
72+
"TaskiqTaskGenerator",
7173
"ZeroMQBroker",
7274
"__version__",
7375
"async_shared_broker",
7476
"gather",
77+
"task_gen",
7578
]

taskiq/abc/broker.py

Lines changed: 15 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from taskiq.abc.middleware import TaskiqMiddleware
2222
from taskiq.abc.serializer import TaskiqSerializer
23+
from taskiq.task_gen import TaskiqTaskGenerator
2324
from taskiq.acks import AckableMessage
2425
from taskiq.decor import AsyncTaskiqDecoratedTask
2526
from taskiq.events import TaskiqEvents
@@ -301,64 +302,23 @@ def task( # type: ignore[misc]
301302
302303
:returns: decorator function or AsyncTaskiqDecoratedTask.
303304
"""
304-
305-
def make_decorated_task(
306-
inner_labels: dict[str, str | int],
307-
inner_task_name: str | None = None,
308-
) -> Callable[
309-
[Callable[_FuncParams, _ReturnType]],
310-
AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType],
311-
]:
312-
def inner(
313-
func: Callable[_FuncParams, _ReturnType],
314-
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]:
315-
nonlocal inner_task_name
316-
if inner_task_name is None:
317-
fmodule = func.__module__
318-
if fmodule == "__main__": # pragma: no cover
319-
fmodule = ".".join(
320-
os.path.normpath(sys.argv[0])
321-
.removesuffix(".py")
322-
.split(os.path.sep),
323-
)
324-
fname = func.__name__
325-
if fname == "<lambda>":
326-
fname = f"lambda_{uuid4().hex}"
327-
inner_task_name = f"{fmodule}:{fname}"
328-
wrapper = wraps(func)
329-
330-
sign = get_type_hints(func)
331-
return_type = None
332-
if "return" in sign:
333-
return_type = sign["return"]
334-
335-
decorated_task = wrapper(
336-
self.decorator_class(
337-
broker=self,
338-
original_func=func,
339-
labels=inner_labels,
340-
task_name=inner_task_name,
341-
return_type=return_type, # type: ignore
342-
),
343-
)
344-
345-
self._register_task(decorated_task.task_name, decorated_task) # type: ignore
346-
347-
return decorated_task # type: ignore
348-
349-
return inner
305+
warnings.warn(
306+
"Tasks are not independent from brokers. "
307+
"Use `taskiq.task` as a decorator instead.",
308+
TaskiqDeprecationWarning,
309+
stacklevel=2,
310+
)
311+
generator = TaskiqTaskGenerator().labels(**labels).broker(self)
350312

351313
if callable(task_name):
352314
# This is an edge case,
353315
# when decorator called without parameters.
354-
return make_decorated_task(
355-
inner_labels=labels or {},
356-
)(task_name)
316+
return generator(task_name)
357317

358-
return make_decorated_task(
359-
inner_task_name=task_name,
360-
inner_labels=labels or {},
361-
)
318+
if task_name:
319+
generator = generator.name(task_name)
320+
321+
return generator
362322

363323
def register_task(
364324
self,
@@ -534,9 +494,10 @@ def _register_task(
534494
raise TaskBrokerMismatchError(broker=task.broker)
535495
self.local_task_registry[task_name] = task
536496

537-
async def __aenter__(self) -> None:
497+
async def __aenter__(self) -> "Self":
538498
"""Starts the broker as ctx manager."""
539499
await self.startup()
500+
return self
540501

541502
async def __aexit__(self, *args: object, **kwargs: Any) -> None:
542503
"""Shuts down the broker as ctx manager."""

taskiq/decor.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ class AsyncTaskiqDecoratedTask(Generic[_FuncParams, _ReturnType]):
4545

4646
def __init__(
4747
self,
48-
broker: "AsyncBroker",
4948
task_name: str,
5049
original_func: Callable[_FuncParams, _ReturnType],
5150
labels: dict[str, Any],
51+
broker: "AsyncBroker | None" = None,
5252
return_type: type[_ReturnType] | None = None,
5353
) -> None:
5454
self.broker = broker
@@ -230,5 +230,9 @@ def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]:
230230
return_type=self.return_type,
231231
)
232232

233+
def set_broker(self, broker: "AsyncBroker") -> None:
234+
"""Set broker for the task."""
235+
self.broker = broker
236+
233237
def __repr__(self) -> str:
234238
return f"AsyncTaskiqDecoratedTask({self.task_name})"

taskiq/kicker.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,12 @@ class AsyncKicker(Generic[_FuncParams, _ReturnType]):
4242
def __init__(
4343
self,
4444
task_name: str,
45-
broker: "AsyncBroker",
45+
broker: "AsyncBroker | None",
4646
labels: dict[str, Any],
4747
return_type: type[_ReturnType] | None = None,
4848
) -> None:
49+
if broker is None:
50+
raise RuntimeError("Broker is not set for this task!")
4951
self.task_name = task_name
5052
self.broker = broker
5153
self.labels = labels

taskiq/task_gen.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import os
2+
import sys
3+
from collections.abc import Callable
4+
from copy import copy
5+
from functools import wraps
6+
from typing import TYPE_CHECKING, Any, get_type_hints
7+
from uuid import uuid4
8+
9+
from typing_extensions import ParamSpec, Self, TypeVar
10+
11+
from taskiq.decor import AsyncTaskiqDecoratedTask
12+
13+
if TYPE_CHECKING:
14+
from taskiq.abc.broker import AsyncBroker
15+
16+
_FuncParams = ParamSpec("_FuncParams")
17+
_ReturnType = TypeVar("_ReturnType")
18+
19+
20+
class TaskiqTaskGenerator:
21+
"""Class used for task generation."""
22+
23+
def __init__(self) -> None:
24+
self._labels: dict[str, Any] = {}
25+
self._name: str | None = None
26+
self._broker: "AsyncBroker | None" = None
27+
28+
def name(self, name: str) -> Self:
29+
"""Set task name."""
30+
inst = copy(self)
31+
inst._name = name # noqa: SLF001
32+
return inst
33+
34+
def labels(self, **labels: Any) -> Self:
35+
"""Set task's static labels."""
36+
inst = copy(self)
37+
inst._labels = labels # noqa: SLF001
38+
return inst
39+
40+
def broker(self, broker: "AsyncBroker") -> Self:
41+
"""Set a broker."""
42+
inst = copy(self)
43+
inst._broker = broker # noqa: SLF001
44+
return inst
45+
46+
@classmethod
47+
def make_task(
48+
cls,
49+
task_name: str,
50+
broker: "AsyncBroker | None",
51+
original_func: Callable[_FuncParams, _ReturnType],
52+
labels: dict[str, Any],
53+
return_type: type[_ReturnType] | None = None,
54+
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]:
55+
"""
56+
Create a task out of given inputs.
57+
58+
This method can be overridden to create custom task classes
59+
with custom arguments and logic.
60+
"""
61+
return AsyncTaskiqDecoratedTask(
62+
broker=broker,
63+
task_name=task_name,
64+
original_func=original_func,
65+
labels=labels,
66+
return_type=return_type,
67+
)
68+
69+
def __call__(
70+
self,
71+
func: Callable[_FuncParams, _ReturnType],
72+
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]:
73+
"""
74+
Make a decorated task.
75+
76+
This function is the main point for creating a task
77+
from a raw function.
78+
"""
79+
task_name = self._name
80+
if task_name is None:
81+
fmodule = func.__module__
82+
if fmodule == "__main__": # pragma: no cover
83+
fmodule = ".".join(
84+
os.path.normpath(sys.argv[0])
85+
.removesuffix(".py")
86+
.split(os.path.sep),
87+
)
88+
fname = func.__name__
89+
if fname == "<lambda>":
90+
fname = f"lambda_{uuid4().hex}"
91+
task_name = f"{fmodule}:{fname}"
92+
wrapper = wraps(func)
93+
94+
sign = get_type_hints(func)
95+
return_type = None
96+
if "return" in sign:
97+
return_type = sign["return"]
98+
99+
return wrapper(
100+
self.make_task(
101+
original_func=func,
102+
labels=self._labels,
103+
task_name=task_name,
104+
broker=self._broker,
105+
return_type=return_type, # type: ignore
106+
),
107+
)
108+
109+
110+
task_gen = TaskiqTaskGenerator()

0 commit comments

Comments
 (0)