Skip to content

Commit bd93e5b

Browse files
committed
feat(routing): separate task routing from broker transport
Introduce router-owned task routing, flow subscriptions, and shared task declarations while preserving the existing broker-first task API. Add transport-neutral Flow contracts, explicit route/subscription semantics, multi-broker router ownership, task_builder base_cls support, prepared invocation route snapshots, and scheduler/requeue compatibility behavior. Document migration guidance and add executable routing examples for multiple brokers and shared task packages.
1 parent 8805f3d commit bd93e5b

31 files changed

Lines changed: 2652 additions & 508 deletions

docs/examples/router/multiple_brokers.py

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import asyncio
44

5-
from taskiq import Flow, InMemoryBroker, TaskiqRouter
5+
from taskiq import Flow, InMemoryBroker, TaskiqRoute, TaskiqRouter
66

77
router = TaskiqRouter()
88

@@ -35,6 +35,23 @@ async def send_email(user_id: int, template: str) -> str:
3535
broker=priority_broker,
3636
flow=priority_email_flow,
3737
)
38+
priority_subscription = router.subscribe(
39+
priority_broker,
40+
priority_email_flow,
41+
send_email,
42+
)
43+
44+
45+
def _format_route(task_name: str, route: TaskiqRoute) -> str:
46+
"""Return a readable route diagnostic for the example output."""
47+
flow_name = route.flow.name if route.flow is not None else "<default>"
48+
return f"{task_name} -> broker={route.broker_name}, flow={flow_name}"
49+
50+
51+
def _format_listen_plan() -> str:
52+
"""Return flows that the priority broker should subscribe to."""
53+
flow_names = ", ".join(flow.name for flow in priority_broker.get_subscribed_flows())
54+
return f"priority listens to: {flow_names}"
3855

3956

4057
async def _main() -> None:
@@ -43,22 +60,34 @@ async def _main() -> None:
4360
try:
4461
direct_result = await send_email(7, "welcome")
4562

46-
routed_task = await send_email.kiq(7, "welcome")
47-
routed_result = await routed_task.wait_result(timeout=2)
63+
declared_route = router.resolve_route(send_email)
64+
assert declared_route == priority_route
4865

49-
bulk_task = (
66+
routed_task = (
5067
await send_email.kicker()
51-
.with_route(
52-
default_broker,
53-
bulk_email_flow,
68+
.with_route(declared_route)
69+
.kiq(
70+
7,
71+
"welcome",
5472
)
55-
.kiq(8, "digest")
5673
)
74+
routed_result = await routed_task.wait_result(timeout=2)
75+
76+
bulk_route = router.resolve_route(
77+
send_email,
78+
broker=default_broker,
79+
flow=bulk_email_flow,
80+
)
81+
bulk_task = await send_email.kicker().with_route(bulk_route).kiq(8, "digest")
5782
bulk_result = await bulk_task.wait_result(timeout=2)
5883

5984
print(f"Direct call: {direct_result}")
60-
print(f"Declared route: {priority_route.broker_name}")
85+
print(f"Router rule: {_format_route(send_email.task_name, priority_route)}")
86+
print(f"Subscription tasks: {sorted(priority_subscription.task_names)}")
87+
print(_format_listen_plan())
88+
print(f"Resolved route: {_format_route(send_email.task_name, declared_route)}")
6189
print(f"Routed call: {routed_result.return_value}")
90+
print(f"Override route: {_format_route(send_email.task_name, bulk_route)}")
6291
print(f"Route override: {bulk_result.return_value}")
6392
finally:
6493
await priority_broker.shutdown()

docs/examples/router/shared_task_package.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,15 @@
33
import asyncio
44
from collections.abc import Mapping
55
from dataclasses import dataclass
6-
7-
from taskiq import Flow, InMemoryBroker, TaskiqRouter, task_builder
6+
from typing import Any
7+
8+
from taskiq import (
9+
AsyncTaskiqDecoratedTask,
10+
Flow,
11+
InMemoryBroker,
12+
TaskiqRouter,
13+
task_builder,
14+
)
815

916

1017
@dataclass(frozen=True, slots=True)
@@ -14,15 +21,22 @@ class BillingQueue:
1421
name: str
1522
priority: int
1623

17-
def broker_options(self, broker_name: str) -> Mapping[str, object]:
24+
def broker_options(self) -> Mapping[str, object]:
1825
"""Return options that a billing broker adapter can understand."""
1926
return {
20-
"broker": broker_name,
2127
"priority": self.priority,
2228
}
2329

2430

25-
@task_builder("billing.calculate_total", domain="billing")
31+
class BillingTask(AsyncTaskiqDecoratedTask[Any, Any]):
32+
"""Custom task class shared by billing package tasks."""
33+
34+
def billing_name(self) -> str:
35+
"""Return a billing-specific task name."""
36+
return self.task_name
37+
38+
39+
@task_builder("billing.calculate_total", base_cls=BillingTask, domain="billing")
2640
async def calculate_total(price: int, quantity: int) -> int:
2741
"""Package-level task definition that is not bound to any broker."""
2842
return price * quantity
@@ -44,18 +58,27 @@ async def calculate_total(price: int, quantity: int) -> int:
4458
broker=billing_broker,
4559
flow=billing_flow,
4660
)
61+
router.subscribe(
62+
billing_broker,
63+
billing_flow,
64+
registered_calculate_total,
65+
)
4766

4867

4968
async def _main() -> None:
5069
await billing_broker.startup()
5170
try:
5271
direct_result = await calculate_total.call(19, 3)
5372

73+
priority_route = router.resolve_route(
74+
registered_calculate_total,
75+
broker=billing_broker,
76+
flow=priority_billing_flow,
77+
)
5478
prepared_task = (
5579
registered_calculate_total.kicker()
5680
.with_route(
57-
billing_broker,
58-
priority_billing_flow,
81+
priority_route,
5982
)
6083
.prepare(19, 3)
6184
)
@@ -64,6 +87,9 @@ async def _main() -> None:
6487
queued_result = await queued_task.wait_result(timeout=2)
6588

6689
print(f"Shared task direct call: {direct_result}")
90+
print(f"Registered task class: {registered_calculate_total.billing_name()}")
91+
listen_flow = billing_broker.get_subscribed_flows()[0]
92+
print(f"Registered listen flow: {listen_flow.name}")
6793
print(f"Prepared message: {prepared_task.message.task_name}")
6894
print(f"Registered queued call: {queued_result.return_value}")
6995
finally:

docs/guide/architecture-overview.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,28 @@ asyncio.run(main())
8282

8383
```
8484

85+
## Router and flows
86+
87+
Taskiq can use a `TaskiqRouter` to keep routing rules outside of broker
88+
implementations. Brokers remain transport adapters, while the router owns task
89+
registration, route resolution and flow subscriptions.
90+
This section describes the `experiment/separate_broker` branch contract. The
91+
old `@broker.task(...)`, `.kiq()`, labels, scheduler and result backend behavior
92+
remain compatible, while router/flow APIs are additive review material for the
93+
branch.
94+
95+
`Flow` is a transport-neutral delivery address. Broker packages may provide
96+
their own flow classes for queue, topic, subject or stream options, as long as
97+
they implement the same flow protocol. The router deduplicates subscriptions by
98+
flow name and rejects same-name flows with incompatible broker options.
99+
100+
Routing and subscribing are separate responsibilities. `route_task(...)`
101+
chooses the outbound broker and flow for task invocations. `subscribe(...)`
102+
adds flows to a broker listen plan for flow-aware broker adapters. Worker task
103+
lookup still uses `task_name`; flow does not select the Python task.
104+
105+
Read more in the [Routing and flows](./routing-and-flows.md) section.
106+
85107
## Messages
86108

87109
Every message has labels. You can define labels

0 commit comments

Comments
 (0)