Skip to content

Commit 6e8cdc4

Browse files
committed
refactor:
Replace as_dict with dataclasses.asdict; Remove user-level retry orchestration from broker/hub/protocol
1 parent fe3b0a5 commit 6e8cdc4

5 files changed

Lines changed: 57 additions & 100 deletions

File tree

taskiq/brokers/nng/broker.py

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,6 @@ def __init__(
8383
heartbeat_interval: float = 5.0,
8484
lease_timeout: float = 20.0,
8585
capacity: int = 1,
86-
max_retries: int = 0,
87-
retry_backoff: float = 1.0,
88-
retry_jitter: float = 0.0,
8986
recv_timeout_ms: int = 5_000,
9087
send_timeout_ms: int = 5_000,
9188
) -> None:
@@ -102,9 +99,6 @@ def __init__(
10299
:param heartbeat_interval: seconds between heartbeat messages to hub.
103100
:param lease_timeout: seconds a dispatched task lease remains valid.
104101
:param capacity: max concurrent tasks this worker will accept.
105-
:param max_retries: default max retries for submitted tasks.
106-
:param retry_backoff: base seconds for exponential backoff.
107-
:param retry_jitter: jitter multiplier added to backoff (0 = no jitter).
108102
:param recv_timeout_ms: Req0 recv timeout in milliseconds.
109103
:param send_timeout_ms: Req0 send timeout in milliseconds.
110104
"""
@@ -123,9 +117,6 @@ def __init__(
123117
self.heartbeat_interval = heartbeat_interval
124118
self.lease_timeout = lease_timeout
125119
self.capacity = capacity
126-
self.max_retries = max_retries
127-
self.retry_backoff = retry_backoff
128-
self.retry_jitter = retry_jitter
129120
self.recv_timeout_ms = recv_timeout_ms
130121
self.send_timeout_ms = send_timeout_ms
131122

@@ -249,16 +240,6 @@ async def kick(self, message: BrokerMessage) -> None:
249240
"payload_b64": base64.b64encode(message.message).decode("ascii"),
250241
"labels": message.labels,
251242
"lease_id": "", # hub assigns the real lease_id at dispatch time
252-
"attempts": int(message.labels.get("attempts", 0)),
253-
"max_retries": int(
254-
message.labels.get("max_retries", self.max_retries),
255-
),
256-
"retry_backoff": float(
257-
message.labels.get("retry_backoff", self.retry_backoff),
258-
),
259-
"retry_jitter": float(
260-
message.labels.get("retry_jitter", self.retry_jitter),
261-
),
262243
"priority": int(message.labels.get("priority", 0)),
263244
"created_at": time.time(),
264245
}

taskiq/brokers/nng/hub.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ async def _dispatch_once(self) -> bool:
300300
task_name=row["task_name"],
301301
labels=row["labels"],
302302
priority=int(row["priority"]),
303-
attempts=int(row["attempts"]),
303+
attempts=int(row.get("attempts", 0)),
304304
)
305305
worker = self.store.choose_worker(
306306
self._routing,
@@ -334,10 +334,6 @@ async def _dispatch_once(self) -> bool:
334334
payload_b64=base64.b64encode(row["payload"]).decode("ascii"),
335335
labels=row["labels"],
336336
lease_id=lease_id,
337-
attempts=int(row["attempts"]) + 1,
338-
max_retries=int(row["max_retries"]),
339-
retry_backoff=float(row["retry_backoff"]),
340-
retry_jitter=float(row["retry_jitter"]),
341337
priority=int(row["priority"]),
342338
created_at=float(row["created_at"]),
343339
)

taskiq/brokers/nng/protocol.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,18 @@ class TaskEnvelope:
5757
``lease_id`` is the UUID assigned by the hub at dispatch time.
5858
Workers must echo it back in the ACK so the hub can validate
5959
that the ack is not stale (e.g. after lease expiry and requeue).
60+
61+
User-level retry policy is the responsibility of the
62+
:class:`~taskiq.middlewares.SmartRetryMiddleware` (or any compatible
63+
middleware) and travels in :attr:`labels`; the envelope itself carries
64+
no retry knobs.
6065
"""
6166

6267
task_id: str
6368
task_name: str
6469
payload_b64: str
6570
labels: dict[str, Any] = field(default_factory=dict)
6671
lease_id: str = ""
67-
attempts: int = 0
68-
max_retries: int = 0
69-
retry_backoff: float = 1.0
70-
retry_jitter: float = 0.0
7172
priority: int = 0
7273
created_at: float = 0.0
7374

@@ -157,3 +158,4 @@ def to_dict(self) -> dict[str, Any]:
157158
d = asdict(self)
158159
d["status"] = str(self.status)
159160
return d
161+

taskiq/brokers/nng/storage.py

Lines changed: 19 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import inspect
66
import random
77
import time
8-
from dataclasses import dataclass, field
8+
from dataclasses import asdict, dataclass, field
99
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
1010

1111
if TYPE_CHECKING:
@@ -19,7 +19,11 @@ class StoreConfig:
1919
path: str = "" # kept for API compat; not used
2020
max_pending: int = 10_000
2121
lease_timeout: float = 30.0
22-
backoff_base: float = 1.0
22+
# Hub-internal cap on delivery retries (lease expiry / worker death).
23+
# Has nothing to do with user-level retry policy, which is handled by
24+
# taskiq retry middlewares. Set to 0 to disable redelivery entirely.
25+
max_delivery_attempts: int = 5
26+
delivery_backoff: float = 1.0
2327
backoff_cap: float = 60.0
2428

2529

@@ -34,10 +38,9 @@ class _Task:
3438
payload: bytes
3539
labels: dict[str, Any]
3640
state: str # ready / leased / done / failed
41+
# Internal delivery-attempt counter (incremented on each dispatch).
42+
# NOT related to user-level retry policy — that lives in middlewares.
3743
attempts: int = 0
38-
max_retries: int = 0
39-
retry_backoff: float = 1.0
40-
retry_jitter: float = 0.0
4144
priority: int = 0
4245
created_at: float = field(default_factory=time.time)
4346
updated_at: float = field(default_factory=time.time)
@@ -49,25 +52,7 @@ class _Task:
4952

5053
def as_dict(self) -> dict[str, Any]:
5154
"""Return a dict view of this task record."""
52-
return {
53-
"task_id": self.task_id,
54-
"task_name": self.task_name,
55-
"payload": self.payload,
56-
"labels": self.labels,
57-
"state": self.state,
58-
"attempts": self.attempts,
59-
"max_retries": self.max_retries,
60-
"retry_backoff": self.retry_backoff,
61-
"retry_jitter": self.retry_jitter,
62-
"priority": self.priority,
63-
"created_at": self.created_at,
64-
"updated_at": self.updated_at,
65-
"next_run_at": self.next_run_at,
66-
"lease_id": self.lease_id,
67-
"leased_worker_id": self.leased_worker_id,
68-
"lease_until": self.lease_until,
69-
"last_error": self.last_error,
70-
}
55+
return asdict(self)
7156

7257
def as_status_dict(self) -> dict[str, Any]:
7358
"""Return a JSON-safe dict (no raw bytes) for control-plane status responses."""
@@ -91,18 +76,7 @@ class _Worker:
9176

9277
def as_dict(self) -> dict[str, Any]:
9378
"""Return a dict view of this worker record."""
94-
return {
95-
"worker_id": self.worker_id,
96-
"task_addr": self.task_addr,
97-
"capacity": self.capacity,
98-
"inflight": self.inflight,
99-
"last_seen": self.last_seen,
100-
"heartbeat_interval": self.heartbeat_interval,
101-
"lease_timeout": self.lease_timeout,
102-
"draining": self.draining,
103-
"status": self.status,
104-
"version": self.version,
105-
}
79+
return asdict(self)
10680

10781

10882
# ── task context ─────────────────────────────────────────────────────────────
@@ -325,16 +299,21 @@ def __init__(self, config: StoreConfig) -> None:
325299

326300
# ── helpers ───────────────────────────────────────────────────────────────
327301

328-
def _backoff(self, attempts: int, backoff_base: float) -> float:
329-
return min(self.config.backoff_cap, backoff_base * (2 ** max(0, attempts - 1)))
302+
def _backoff(self, attempts: int) -> float:
303+
return min(
304+
self.config.backoff_cap,
305+
self.config.delivery_backoff * (2 ** max(0, attempts - 1)),
306+
)
330307

331308
def _requeue_or_fail(self, task: _Task, worker_id: str, error: str) -> bool:
332309
now = time.time()
333-
if task.attempts > task.max_retries:
310+
# Hub-internal delivery cap. User-level retries are handled by
311+
# retry middlewares, which re-kick the task with updated labels.
312+
if task.attempts > self.config.max_delivery_attempts:
334313
task.state = "failed"
335314
else:
336315
task.state = "ready"
337-
task.next_run_at = now + self._backoff(task.attempts, task.retry_backoff)
316+
task.next_run_at = now + self._backoff(task.attempts)
338317
task.last_error = error
339318
task.lease_id = None
340319
task.leased_worker_id = None
@@ -367,9 +346,6 @@ def submit(self, envelope: TaskEnvelope) -> None:
367346
payload=envelope.payload,
368347
labels=envelope.labels,
369348
state="ready",
370-
max_retries=envelope.max_retries,
371-
retry_backoff=envelope.retry_backoff,
372-
retry_jitter=envelope.retry_jitter,
373349
priority=envelope.priority,
374350
created_at=envelope.created_at or now,
375351
updated_at=now,

tests/brokers/test_nng_broker.py

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,6 @@ def _envelope(**kwargs: object) -> TaskEnvelope:
6565
"payload_b64": "dGVzdA==",
6666
"labels": {},
6767
"lease_id": "",
68-
"attempts": 0,
69-
"max_retries": 0,
70-
"retry_backoff": 1.0,
71-
"retry_jitter": 0.0,
7268
"priority": 0,
7369
"created_at": time.time(),
7470
}
@@ -208,10 +204,6 @@ async def submit(self, **labels: object) -> str:
208204
"payload_b64": "dGVzdA==",
209205
"labels": {},
210206
"lease_id": "",
211-
"attempts": 0,
212-
"max_retries": labels.pop("max_retries", 0),
213-
"retry_backoff": labels.pop("retry_backoff", 1.0),
214-
"retry_jitter": 0.0,
215207
"priority": labels.pop("priority", 0),
216208
"created_at": time.time(),
217209
}
@@ -321,32 +313,46 @@ def test_late_ack_after_requeue_ignored(store: InMemoryStore) -> None:
321313
assert not store.ack(env.task_id, w.worker_id, "L2")
322314

323315

324-
def test_nack_requeues_with_backoff(store: InMemoryStore) -> None:
325-
env = _envelope(max_retries=2, retry_backoff=1.0)
326-
store.submit(env)
316+
def test_nack_requeues_with_backoff(db_path: str) -> None:
317+
"""Hub-level delivery retry: nack within delivery cap requeues with backoff."""
318+
s = InMemoryStore(
319+
StoreConfig(
320+
path=db_path, max_pending=50, lease_timeout=5.0,
321+
max_delivery_attempts=2, delivery_backoff=1.0,
322+
),
323+
)
324+
env = _envelope()
325+
s.submit(env)
327326
w = _worker_state()
328-
store.register_worker(w)
329-
store.mark_leased(env.task_id, w.worker_id, "L3", time.time() + 60)
330-
assert store.nack(env.task_id, w.worker_id, "L3", "boom")
331-
task = store.get_task(env.task_id)
327+
s.register_worker(w)
328+
s.mark_leased(env.task_id, w.worker_id, "L3", time.time() + 60)
329+
assert s.nack(env.task_id, w.worker_id, "L3", "boom")
330+
task = s.get_task(env.task_id)
332331
assert task["state"] == "ready"
333332
assert float(task["next_run_at"]) > time.time()
334333

335334

336-
def test_nack_exceeds_retries_fails(store: InMemoryStore) -> None:
337-
env = _envelope(max_retries=0)
338-
store.submit(env)
335+
def test_nack_exceeds_delivery_cap_fails(db_path: str) -> None:
336+
"""Delivery cap of 0 → first nack fails the task immediately."""
337+
s = InMemoryStore(
338+
StoreConfig(
339+
path=db_path, max_pending=50, lease_timeout=5.0,
340+
max_delivery_attempts=0,
341+
),
342+
)
343+
env = _envelope()
344+
s.submit(env)
339345
w = _worker_state()
340-
store.register_worker(w)
341-
store.mark_leased(env.task_id, w.worker_id, "L4", time.time() + 60)
342-
store.nack(env.task_id, w.worker_id, "L4", "error")
343-
assert store.get_task(env.task_id)["state"] == "failed"
346+
s.register_worker(w)
347+
s.mark_leased(env.task_id, w.worker_id, "L4", time.time() + 60)
348+
s.nack(env.task_id, w.worker_id, "L4", "error")
349+
assert s.get_task(env.task_id)["state"] == "failed"
344350

345351

346352
def test_dead_worker_tasks_requeued(store: InMemoryStore) -> None:
347353
w = _worker_state()
348354
store.register_worker(w)
349-
env = _envelope(max_retries=3)
355+
env = _envelope()
350356
store.submit(env)
351357
store.mark_leased(env.task_id, w.worker_id, "L5", time.time() + 60)
352358
store._workers[w.worker_id].last_seen = 0 # simulate missed heartbeats
@@ -456,7 +462,7 @@ async def test_worker_crash_before_ack_task_requeued(
456462
client = FakeClient(ctrl_addr)
457463
try:
458464
await w1.register()
459-
tid = await client.submit(max_retries=3)
465+
tid = await client.submit()
460466
env1 = await w1.recv_task(timeout=3.0)
461467
assert env1.task_id == tid
462468
w1.close() # simulate crash without acking
@@ -493,7 +499,7 @@ async def test_late_ack_after_requeue_rejected(
493499
client = FakeClient(ctrl_addr)
494500
try:
495501
await w1.register()
496-
tid = await client.submit(max_retries=3)
502+
tid = await client.submit()
497503
env1 = await w1.recv_task(timeout=3.0)
498504
await asyncio.sleep(3.5) # let lease expire
499505

@@ -670,10 +676,6 @@ async def test_backpressure_hub_rejects_when_full(
670676
"payload_b64": "dGVzdA==",
671677
"labels": {},
672678
"lease_id": "",
673-
"attempts": 0,
674-
"max_retries": 0,
675-
"retry_backoff": 1.0,
676-
"retry_jitter": 0.0,
677679
"priority": 0,
678680
"created_at": time.time(),
679681
}

0 commit comments

Comments
 (0)