From 1584891541463d92d0bc264d7ddfdb09235736f2 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Mon, 4 May 2026 09:01:15 +0300 Subject: [PATCH 1/4] Make TestTimersBroker inspection methods report no pending timers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `has_pending` returned `True` for any input under `TestTimersBroker` because `mock_client.zscore` was an unstubbed `AsyncMock` and `score is not None` evaluated to a truthy mock. `cancel_all` would also have failed: `client.pipeline(...)` returned an awaitable mock that could not be entered with `async with`. The test broker's contract is "messages deliver immediately, so no timer is ever pending." Make the inspection paths reflect that: - `mock_client.zscore.return_value = None` → `has_pending` returns `False`. - `mock_client.zrangebyscore.return_value = []` (already set) → `get_pending_timers` returns `[]`. - Stub the pipeline so `cancel_all` returns `0`. Use `MagicMock` for the `pipeline()` call (returns the AsyncMock directly) and set `__aenter__.return_value = mock_pipe` so `async with ... as pipe` binds the same mock that exposes `execute() -> [0]`. Add a regression test in `tests/test_fake.py` and document the new contract in `docs/usage/testing.md`. --- docs/usage/testing.md | 1 + faststream_redis_timers/testing.py | 15 +++++++++++++-- tests/test_fake.py | 15 +++++++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/docs/usage/testing.md b/docs/usage/testing.md index ad9a234..416e4fd 100644 --- a/docs/usage/testing.md +++ b/docs/usage/testing.md @@ -87,6 +87,7 @@ Each `ScheduledTimer` has `topic`, `timer_id`, `activate_at`, `body`, `correlati - `timer_id` is passed through normally and available in the handler via the message. - `cancel_timer` / `publisher.cancel()` are no-ops in tests — since messages are delivered immediately, by the time you'd cancel, the handler has already run. - `publisher.fetch_redis_timers()` always returns `[]` in tests — timers are delivered instantly, so there are never any pending entries. +- The inspection / bulk-cancel methods follow the same "nothing pending" contract: `broker.has_pending(...)` returns `False`, `broker.get_pending_timers(...)` returns `[]`, and `broker.cancel_all(...)` returns `0`. Use `TestTimersBroker.scheduled_timers` to assert *what was published*; the inspection API is for production introspection. - The fake producer uses the same envelope format as the real one, so all serialization paths are exercised. ## pytest-asyncio configuration diff --git a/faststream_redis_timers/testing.py b/faststream_redis_timers/testing.py index ad81b25..84a0fb0 100644 --- a/faststream_redis_timers/testing.py +++ b/faststream_redis_timers/testing.py @@ -2,7 +2,7 @@ from contextlib import contextmanager from dataclasses import dataclass from datetime import datetime -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, MagicMock from faststream._internal.testing.broker import TestBroker, change_producer @@ -63,8 +63,19 @@ def _patch_producer(self, broker: TimersBroker) -> "Iterator[None]": @contextmanager def _patch_broker(self, broker: TimersBroker) -> "Iterator[None]": + # Test-broker contract: messages deliver immediately, so there are + # never any pending timers. Stub the inspection paths to return that. mock_client = AsyncMock() - mock_client.zrangebyscore.return_value = [] + mock_client.zrangebyscore.return_value = [] # get_pending_timers -> [] + mock_client.zscore.return_value = None # has_pending -> False + # cancel_all uses `async with client.pipeline(...) as pipe`. AsyncMock's + # default makes pipeline() return a coroutine, which can't be entered. + # Use MagicMock so the call returns the AsyncMock directly; AsyncMock + # natively supports the async-context-manager protocol. + mock_pipe = AsyncMock() + mock_pipe.__aenter__.return_value = mock_pipe # `async with ... as pipe` -> mock_pipe + mock_pipe.execute.return_value = [0] # zcard result -> 0 removed + mock_client.pipeline = MagicMock(return_value=mock_pipe) connection = broker.config.broker_config.connection original_client = connection._client # noqa: SLF001 connection._client = mock_client # noqa: SLF001 diff --git a/tests/test_fake.py b/tests/test_fake.py index b6624c7..eec0480 100644 --- a/tests/test_fake.py +++ b/tests/test_fake.py @@ -87,6 +87,21 @@ async def test_fake_broker_fetch_redis_timers_returns_empty() -> None: assert result == [] +async def test_fake_broker_inspection_methods_report_no_pending() -> None: + """Inside TestTimersBroker, messages deliver immediately, so the inspection + methods must report "nothing pending" — not lie via unstubbed AsyncMock.""" + broker = TimersBroker() + + @broker.subscriber("topic") + async def handler(body: str) -> None: ... + + async with TestTimersBroker(broker): + await broker.publish("hi", topic="topic", timer_id="t-1") + assert await broker.has_pending("topic", "t-1") is False + assert await broker.get_pending_timers("topic") == [] + assert await broker.cancel_all("topic") == 0 + + # --- scheduled_timers inspection (U8) --- From 1840b1a3d88e726c2f87fa95822ec307ca506d21 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Mon, 4 May 2026 09:03:11 +0300 Subject: [PATCH 2/4] Polish usage docs: client default, retry rate, back-pressure, past-activation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - basic.md: mark `client` as required for production. The previous `None` default suggested the broker was usable without one — it raises `IncorrectState` on the first operation. Point readers at the testing page for the no-client shape. - subscriber.md: replace `broker = TimersBroker()` in the timer-id example with the prod-flavoured `Redis.from_url(...)` snippet used on every other page (was the only no-client example outside the testing docs). - subscriber.md: extend the worst-case-latency line — `× 1.5` is the poll backoff alone; in-flight handlers holding the `max_concurrent` limiter add back-pressure on top. - subscriber.md: document the retry-rate consequence of NACK_ON_ERROR + lease_ttl. A handler that always raises retries every `lease_ttl` seconds; default `lease_ttl=30` is also the poison-pill retry rate. Point at `RejectMessage` for permanent drop. - publisher.md: mirror the basic.md "past activation fires immediately" note so readers who only see the publisher table know about it. --- docs/usage/basic.md | 2 +- docs/usage/publisher.md | 7 +++++++ docs/usage/subscriber.md | 8 ++++++-- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/docs/usage/basic.md b/docs/usage/basic.md index 2f649d5..d70671b 100644 --- a/docs/usage/basic.md +++ b/docs/usage/basic.md @@ -79,7 +79,7 @@ async def schedule_reminder() -> None: | Parameter | Default | Description | |-----------|---------|-------------| -| `client` | `None` | `redis.asyncio.Redis` client instance — the caller owns its lifecycle (the broker does not close it) | +| `client` | _required for production_ | `redis.asyncio.Redis` client instance — the caller owns its lifecycle (the broker does not close it). The constructor accepts `None`, but a broker without a client raises `IncorrectState` on the first operation; `None` is the test-broker shape (see [Testing](./testing.md)). | | `timeline_key` | `timers_timeline` | Sorted set key name | | `payloads_key` | `timers_payloads` | Hash key name | | `start_timeout` | `3.0` | Seconds to wait for the subscriber's first Redis ping during startup | diff --git a/docs/usage/publisher.md b/docs/usage/publisher.md index 8e530f6..4f07fd1 100644 --- a/docs/usage/publisher.md +++ b/docs/usage/publisher.md @@ -57,6 +57,13 @@ The `publish()` method on a publisher accepts the parameters below and returns t `correlation_id` explicitly if your tracing pipeline needs per-publish uniqueness. +!!! note "Past activation times fire immediately" + `activate_in=timedelta(seconds=-5)` and `activate_at` set in the past both + schedule the timer at a negative-relative score — the next subscriber poll + picks it up and fires it. No error is raised; this lets `activate_at` + computations that take "marginally too long" still deliver. If you want + strict scheduling, validate at the call site before publishing. + ### Tracing & headers example ```python diff --git a/docs/usage/subscriber.md b/docs/usage/subscriber.md index be4e7e8..3b45994 100644 --- a/docs/usage/subscriber.md +++ b/docs/usage/subscriber.md @@ -45,8 +45,10 @@ The `timer_id` is available as the message's `message_id`. Inject it via `Contex ```python from faststream import Context from faststream_redis_timers import TimersBroker +from redis.asyncio import Redis -broker = TimersBroker() +client = Redis.from_url("redis://localhost:6379") +broker = TimersBroker(client) @broker.subscriber("invoices") @@ -79,11 +81,13 @@ Configure polling behaviour per subscriber: async def handle_urgent(body: str) -> None: ... ``` -The poll loop uses adaptive backoff: when there are no due timers, the next sleep doubles from `polling_interval` up to `max_polling_interval` and is multiplied by a random factor in `[0.5, 1.5]` to avoid thundering-herd bursts across worker fleets. The counter resets the moment a poll returns work. Worst-case delivery latency for a newly-published timer in a previously-idle queue is `max_polling_interval × 1.5`. +The poll loop uses adaptive backoff: when there are no due timers, the next sleep doubles from `polling_interval` up to `max_polling_interval` and is multiplied by a random factor in `[0.5, 1.5]` to avoid thundering-herd bursts across worker fleets. The counter resets the moment a poll returns work. Worst-case delivery latency for a newly-published timer in a previously-idle queue is `max_polling_interval × 1.5`, plus any time spent waiting for the `max_concurrent` limiter when in-flight handlers are still holding capacity (back-pressure). !!! warning "Handlers must be idempotent and concurrency-safe" A handler that runs longer than `lease_ttl`, or a worker that crashes after the handler ran but before the commit landed, may cause the timer to be delivered more than once. Design handlers to be safe under retry. Because `max_concurrent` invocations run in parallel, handlers must also be safe under concurrent execution (no unsynchronized shared state). + A handler that raises *every* time is also retried indefinitely — the lease score expires after `lease_ttl` seconds and the next poll re-claims the timer, so `lease_ttl` is the lower bound on the retry interval (e.g., the default `lease_ttl=30` retries a poison-pill handler about twice a minute). Raise `faststream.exceptions.RejectMessage` from your handler to permanently drop a timer that should never be retried. + ## Ack policy The default ack policy is `NACK_ON_ERROR`: the timer is acknowledged (removed from Redis) on success, and left for retry on any unhandled exception. From fd1129414e4769dd368c50cb7cbd67c878b84ed6 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Mon, 4 May 2026 09:03:46 +0300 Subject: [PATCH 3/4] Drop "max_attempts planned" speculation from README The "Failure modes" bullet promised a `max_attempts` counter "planned for a future release" but there is no tracking issue and no scheduled work. Replace the speculation with the truthful current state: a buggy handler retries every `lease_ttl` seconds indefinitely; use `RejectMessage` to drop, or track attempts in your own state if you need a hard cap. --- README.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index cce13fa..6704b17 100644 --- a/README.md +++ b/README.md @@ -134,9 +134,10 @@ Per-subscriber knobs (passed to `@broker.subscriber("topic", ...)`): ack failed to land in Redis (network blip) will be retried; a handler that takes longer than `lease_ttl` may be re-delivered to another worker. - **Buggy handler retries forever.** If a handler always raises, the timer is - retried indefinitely. Raise `faststream.exceptions.RejectMessage` from your - handler to drop a poison-pill timer permanently. A built-in `max_attempts` - counter is planned for a future release. + retried every `lease_ttl` seconds indefinitely — there is no built-in + attempt counter. Raise `faststream.exceptions.RejectMessage` from your + handler to drop a poison-pill timer permanently, or track attempts in your + own state if you need a hard cap. ## High availability From 82f756e52075ece65415f3d4b555883e9916f4d7 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Mon, 4 May 2026 09:05:48 +0300 Subject: [PATCH 4/4] Drop redundant docstring on test_fake_broker_inspection_methods_report_no_pending Test name + assertions document the intent; the testing.py comments and the previous commit message capture the WHY (avoid lying via unstubbed AsyncMock). Removes the only ruff D205 issue. --- tests/test_fake.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_fake.py b/tests/test_fake.py index eec0480..6e87cfd 100644 --- a/tests/test_fake.py +++ b/tests/test_fake.py @@ -88,8 +88,6 @@ async def test_fake_broker_fetch_redis_timers_returns_empty() -> None: async def test_fake_broker_inspection_methods_report_no_pending() -> None: - """Inside TestTimersBroker, messages deliver immediately, so the inspection - methods must report "nothing pending" — not lie via unstubbed AsyncMock.""" broker = TimersBroker() @broker.subscriber("topic")