Skip to content

Commit 530eccb

Browse files
committed
Made tasks independent from brokers.
1 parent 3f1d0d1 commit 530eccb

7 files changed

Lines changed: 191 additions & 84 deletions

File tree

a.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import asyncio
2+
3+
from taskiq_redis import ListQueueBroker
4+
5+
from taskiq import task_gen as task
6+
from taskiq.abc.broker import AsyncBroker
7+
8+
9+
@task
10+
async def test_task():
11+
pass
12+
13+
14+
def get_broker() -> AsyncBroker:
15+
broker = ListQueueBroker("redis://localhost")
16+
return broker
17+
18+
19+
async def main():
20+
async with get_broker() as broker:
21+
await test_task.kicker().with_broker(broker).kiq()
22+
await test_task.kiq() # << Error
23+
24+
25+
if __name__ == "__main__":
26+
asyncio.run(main())

pyproject.toml

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,23 @@ dependencies = [
3535
]
3636
dynamic = ["version"]
3737

38+
[project.urls]
39+
"Bug Tracker" = "https://github.com/taskiq-python/taskiq/issues"
40+
Changelog = "https://github.com/taskiq-python/taskiq/releases"
41+
Documentation = "https://taskiq-python.github.io/"
42+
Homepage = "https://taskiq-python.github.io/"
43+
Repository = "https://github.com/taskiq-python/taskiq"
44+
45+
[project.scripts]
46+
taskiq = "taskiq.__main__:main"
47+
48+
[project.entry-points.opentelemetry_instrumentor]
49+
taskiq = "taskiq.instrumentation:TaskiqInstrumentor"
50+
51+
[project.entry-points.taskiq_cli]
52+
worker = "taskiq.cli.worker.cmd:WorkerCMD"
53+
scheduler = "taskiq.cli.scheduler.cmd:SchedulerCMD"
54+
3855
[project.optional-dependencies]
3956
cbor = ["cbor2>=5"]
4057
metrics = ["prometheus_client>=0"]
@@ -49,23 +66,6 @@ reload = ["watchdog>=4", "gitignore-parser>=0"]
4966
uv = ["uvloop>=0.16.0,<1; sys_platform != 'win32'"]
5067
zmq = ["pyzmq>=26"]
5168

52-
[project.entry-points.opentelemetry_instrumentor]
53-
taskiq = "taskiq.instrumentation:TaskiqInstrumentor"
54-
55-
[project.entry-points.taskiq_cli]
56-
worker = "taskiq.cli.worker.cmd:WorkerCMD"
57-
scheduler = "taskiq.cli.scheduler.cmd:SchedulerCMD"
58-
59-
[project.scripts]
60-
taskiq = "taskiq.__main__:main"
61-
62-
[project.urls]
63-
"Bug Tracker" = "https://github.com/taskiq-python/taskiq/issues"
64-
Changelog = "https://github.com/taskiq-python/taskiq/releases"
65-
Documentation = "https://taskiq-python.github.io/"
66-
Homepage = "https://taskiq-python.github.io/"
67-
Repository = "https://github.com/taskiq-python/taskiq"
68-
6969
[dependency-groups]
7070
dev = [
7171
"black>=25.11.0",
@@ -171,7 +171,8 @@ lint.ignore = [
171171
"D100", # Missing docstring in public module
172172
"ANN401", # typing.Any are disallowed in `**kwargs
173173
"PLR0913", # Too many arguments for function call
174-
"D106" # Missing docstring in public nested class
174+
"D106", # Missing docstring in public nested class
175+
"UP037" # Remove quotes from a type def
175176
]
176177
lint.mccabe = { max-complexity = 10 }
177178

@@ -198,6 +199,16 @@ lint.mccabe = { max-complexity = 10 }
198199
"T201" # print found
199200
]
200201

202+
[tool.ruff.lint.flake8-bugbear]
203+
extend-immutable-calls = ["taskiq_dependencies.Depends", "taskiq.TaskiqDepends"]
204+
205+
[tool.ruff.lint.pydocstyle]
206+
convention = "pep257"
207+
ignore-decorators = ["typing.overload"]
208+
209+
[tool.ruff.lint.pylint]
210+
allow-magic-value-types = ["int", "str", "float"]
211+
201212
[tool.tox]
202213
requires = ["tox>=4"]
203214
isolated_build = true
@@ -213,13 +224,3 @@ commands = [["pytest", "-vv", "-n", "auto"]]
213224
extend-exclude = [
214225
"docs/README.md" # because of identifier in head section
215226
]
216-
217-
[tool.ruff.lint.flake8-bugbear]
218-
extend-immutable-calls = ["taskiq_dependencies.Depends", "taskiq.TaskiqDepends"]
219-
220-
[tool.ruff.lint.pydocstyle]
221-
convention = "pep257"
222-
ignore-decorators = ["typing.overload"]
223-
224-
[tool.ruff.lint.pylint]
225-
allow-magic-value-types = ["int", "str", "float"]

taskiq/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from taskiq.scheduler.scheduler import TaskiqScheduler
3737
from taskiq.state import TaskiqState
3838
from taskiq.task import AsyncTaskiqTask
39+
from taskiq.task_gen import TaskiqTaskGenerator, task_gen
3940

4041
__version__ = version("taskiq")
4142

@@ -68,8 +69,10 @@
6869
"TaskiqResultTimeoutError",
6970
"TaskiqScheduler",
7071
"TaskiqState",
72+
"TaskiqTaskGenerator",
7173
"ZeroMQBroker",
7274
"__version__",
7375
"async_shared_broker",
7476
"gather",
77+
"task_gen",
7578
]

taskiq/abc/broker.py

Lines changed: 15 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from taskiq.abc.middleware import TaskiqMiddleware
2222
from taskiq.abc.serializer import TaskiqSerializer
23+
from taskiq.task_gen import TaskiqTaskGenerator
2324
from taskiq.acks import AckableMessage
2425
from taskiq.decor import AsyncTaskiqDecoratedTask
2526
from taskiq.events import TaskiqEvents
@@ -301,64 +302,23 @@ def task( # type: ignore[misc]
301302
302303
:returns: decorator function or AsyncTaskiqDecoratedTask.
303304
"""
304-
305-
def make_decorated_task(
306-
inner_labels: dict[str, str | int],
307-
inner_task_name: str | None = None,
308-
) -> Callable[
309-
[Callable[_FuncParams, _ReturnType]],
310-
AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType],
311-
]:
312-
def inner(
313-
func: Callable[_FuncParams, _ReturnType],
314-
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]:
315-
nonlocal inner_task_name
316-
if inner_task_name is None:
317-
fmodule = func.__module__
318-
if fmodule == "__main__": # pragma: no cover
319-
fmodule = ".".join(
320-
os.path.normpath(sys.argv[0])
321-
.removesuffix(".py")
322-
.split(os.path.sep),
323-
)
324-
fname = func.__name__
325-
if fname == "<lambda>":
326-
fname = f"lambda_{uuid4().hex}"
327-
inner_task_name = f"{fmodule}:{fname}"
328-
wrapper = wraps(func)
329-
330-
sign = get_type_hints(func)
331-
return_type = None
332-
if "return" in sign:
333-
return_type = sign["return"]
334-
335-
decorated_task = wrapper(
336-
self.decorator_class(
337-
broker=self,
338-
original_func=func,
339-
labels=inner_labels,
340-
task_name=inner_task_name,
341-
return_type=return_type, # type: ignore
342-
),
343-
)
344-
345-
self._register_task(decorated_task.task_name, decorated_task) # type: ignore
346-
347-
return decorated_task # type: ignore
348-
349-
return inner
305+
warnings.warn(
306+
"Tasks are not independent from brokers. "
307+
"Use `taskiq.task` as a decorator instead.",
308+
TaskiqDeprecationWarning,
309+
stacklevel=2,
310+
)
311+
generator = TaskiqTaskGenerator().labels(**labels).broker(self)
350312

351313
if callable(task_name):
352314
# This is an edge case,
353315
# when decorator called without parameters.
354-
return make_decorated_task(
355-
inner_labels=labels or {},
356-
)(task_name)
316+
return generator(task_name)
357317

358-
return make_decorated_task(
359-
inner_task_name=task_name,
360-
inner_labels=labels or {},
361-
)
318+
if task_name:
319+
generator = generator.name(task_name)
320+
321+
return generator
362322

363323
def register_task(
364324
self,
@@ -534,9 +494,10 @@ def _register_task(
534494
raise TaskBrokerMismatchError(broker=task.broker)
535495
self.local_task_registry[task_name] = task
536496

537-
async def __aenter__(self) -> None:
497+
async def __aenter__(self) -> "Self":
538498
"""Starts the broker as ctx manager."""
539499
await self.startup()
500+
return self
540501

541502
async def __aexit__(self, *args: object, **kwargs: Any) -> None:
542503
"""Shuts down the broker as ctx manager."""

taskiq/decor.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ class AsyncTaskiqDecoratedTask(Generic[_FuncParams, _ReturnType]):
4545

4646
def __init__(
4747
self,
48-
broker: "AsyncBroker",
4948
task_name: str,
5049
original_func: Callable[_FuncParams, _ReturnType],
5150
labels: dict[str, Any],
51+
broker: "AsyncBroker | None" = None,
5252
return_type: type[_ReturnType] | None = None,
5353
) -> None:
5454
self.broker = broker
@@ -230,5 +230,9 @@ def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]:
230230
return_type=self.return_type,
231231
)
232232

233+
def set_broker(self, broker: "AsyncBroker") -> None:
234+
"""Set broker for the task."""
235+
self.broker = broker
236+
233237
def __repr__(self) -> str:
234238
return f"AsyncTaskiqDecoratedTask({self.task_name})"

taskiq/kicker.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,12 @@ class AsyncKicker(Generic[_FuncParams, _ReturnType]):
4242
def __init__(
4343
self,
4444
task_name: str,
45-
broker: "AsyncBroker",
45+
broker: "AsyncBroker | None",
4646
labels: dict[str, Any],
4747
return_type: type[_ReturnType] | None = None,
4848
) -> None:
49+
if broker is None:
50+
raise RuntimeError("Broker cannot be None for AsyncKicker.")
4951
self.task_name = task_name
5052
self.broker = broker
5153
self.labels = labels

taskiq/task_gen.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import os
2+
import sys
3+
from collections.abc import Callable
4+
from copy import copy
5+
from functools import wraps
6+
from typing import TYPE_CHECKING, Any, get_type_hints
7+
from uuid import uuid4
8+
9+
from typing_extensions import ParamSpec, Self, TypeVar
10+
11+
from taskiq.decor import AsyncTaskiqDecoratedTask
12+
13+
if TYPE_CHECKING:
14+
from taskiq.abc.broker import AsyncBroker
15+
16+
_FuncParams = ParamSpec("_FuncParams")
17+
_ReturnType = TypeVar("_ReturnType")
18+
19+
20+
class TaskiqTaskGenerator:
21+
"""Class used for task generation."""
22+
23+
def __init__(self) -> None:
24+
self._labels: dict[str, Any] = {}
25+
self._name: str | None = None
26+
self._broker: "AsyncBroker | None" = None
27+
28+
def name(self, name: str) -> Self:
29+
"""Set task name."""
30+
inst = copy(self)
31+
self._name = name
32+
return inst
33+
34+
def labels(self, **labels: Any) -> Self:
35+
"""Set task's static labels."""
36+
inst = copy(self)
37+
self._labels = labels
38+
return inst
39+
40+
def broker(self, broker: "AsyncBroker") -> Self:
41+
"""Set a broker."""
42+
inst = copy(self)
43+
self._broker = broker
44+
return inst
45+
46+
@classmethod
47+
def make_task(
48+
cls,
49+
task_name: str,
50+
broker: "AsyncBroker | None",
51+
original_func: Callable[_FuncParams, _ReturnType],
52+
labels: dict[str, Any],
53+
return_type: type[_ReturnType] | None = None,
54+
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]:
55+
"""
56+
Create a task out of given inputs.
57+
58+
This method can be overridden to create custom task classes
59+
with custom arguments and logic.
60+
"""
61+
return AsyncTaskiqDecoratedTask(
62+
broker=broker,
63+
task_name=task_name,
64+
original_func=original_func,
65+
labels=labels,
66+
return_type=return_type,
67+
)
68+
69+
def __call__(
70+
self,
71+
func: Callable[_FuncParams, _ReturnType],
72+
) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]:
73+
"""
74+
Make a decorated task.
75+
76+
This function is the main point for creating a task
77+
from a raw function.
78+
"""
79+
task_name = self._name
80+
if task_name is None:
81+
fmodule = func.__module__
82+
if fmodule == "__main__": # pragma: no cover
83+
fmodule = ".".join(
84+
os.path.normpath(sys.argv[0])
85+
.removesuffix(".py")
86+
.split(os.path.sep),
87+
)
88+
fname = func.__name__
89+
if fname == "<lambda>":
90+
fname = f"lambda_{uuid4().hex}"
91+
task_name = f"{fmodule}:{fname}"
92+
wrapper = wraps(func)
93+
94+
sign = get_type_hints(func)
95+
return_type = None
96+
if "return" in sign:
97+
return_type = sign["return"]
98+
99+
return wrapper(
100+
self.make_task(
101+
original_func=func,
102+
labels=self._labels,
103+
task_name=task_name,
104+
broker=self._broker,
105+
return_type=return_type, # type: ignore
106+
),
107+
)
108+
109+
110+
task_gen = TaskiqTaskGenerator()

0 commit comments

Comments
 (0)