Skip to content

Commit 91d57df

Browse files
committed
feat(routing): separate task routing from broker transport
Introduce Flow, TaskiqRouter, route and subscription contracts to move task dispatch decisions out of brokers while keeping brokers as transport adapters. Preserve backward-compatible
1 parent bd93e5b commit 91d57df

3 files changed

Lines changed: 16 additions & 3 deletions

File tree

taskiq/abc/broker.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ def find_task(self, task_name: str) -> AsyncTaskiqDecoratedTask[Any, Any] | None
142142
143143
It searches task by name in dict of tasks that
144144
were registered for this broker directly.
145-
If it fails, it checks global dict of all available tasks.
145+
If it fails, it checks tasks registered in the broker's router,
146+
then global dict of all available tasks.
146147
147148
:param task_name: name of a task.
148149
:returns: found task or None.
@@ -161,8 +162,9 @@ def get_all_tasks(self) -> dict[str, AsyncTaskiqDecoratedTask[Any, Any]]:
161162
"""
162163
Method to fetch all tasks available in broker.
163164
164-
This method returns all tasks, globally and locally
165-
available in broker. With local tasks having higher priority.
165+
This method returns all tasks globally, through the broker's router
166+
and locally available in broker. With local tasks having higher
167+
priority.
166168
167169
So, if you have two tasks with the same name,
168170
one registered in global registry and one registered

taskiq/router.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ def default_broker(self) -> AsyncBroker | None:
5555

5656
@default_broker.setter
5757
def default_broker(self, broker: AsyncBroker | None) -> None:
58+
if broker is not None:
59+
self._brokers.resolve(broker)
5860
self._brokers.default_broker = broker
5961

6062
@property

tests/routing/test_router_core.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ def test_router_rejects_broker_attached_to_another_router() -> None:
3434
second_router.set_broker(broker, name="broker")
3535

3636

37+
def test_router_default_broker_setter_rejects_foreign_broker() -> None:
38+
first_router = TaskiqRouter()
39+
second_router = TaskiqRouter()
40+
broker = RecordingBroker(router=first_router, broker_name="broker")
41+
42+
with pytest.raises(ValueError, match="not registered"):
43+
second_router.default_broker = broker
44+
45+
3746
def test_router_rejects_string_broker_references() -> None:
3847
router = TaskiqRouter()
3948
RecordingBroker(router=router, broker_name="broker")

0 commit comments

Comments
 (0)