From a40d20b68964ad7dc2273d051e1fdf8c5b731e28 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Mon, 4 May 2026 16:56:46 +0300 Subject: [PATCH 1/3] Recover from non-UTF-8 timer ids and cover error log paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Item 2: a corrupted or externally-produced timer id with non-UTF-8 bytes used to raise UnicodeDecodeError on every poll, leaving the offending ZSET entry in place — unbounded log noise and wasted polls. Catch the decode failure separately, log a WARNING with the raw bytes, and remove the entry from both the timeline and payload keys so polls recover. Item 6: drop `# pragma: no cover` from the two top-level error branches in the consume loop and add unit tests asserting the log message format (including `{e!r}` repr of the exception). The recent `Inline exception repr in subscriber error logs` change is exactly the class of regression these tests are intended to catch. Co-Authored-By: Claude Opus 4.7 --- faststream_redis_timers/subscriber/usecase.py | 18 +++- tests/test_unit.py | 84 ++++++++++++++++++- 2 files changed, 98 insertions(+), 4 deletions(-) diff --git a/faststream_redis_timers/subscriber/usecase.py b/faststream_redis_timers/subscriber/usecase.py index 1ac7e42..7cfe485 100644 --- a/faststream_redis_timers/subscriber/usecase.py +++ b/faststream_redis_timers/subscriber/usecase.py @@ -99,7 +99,7 @@ async def _consume(self, client: "RedisClient", *, start_signal: anyio.Event) -> while self.running: try: fetched = await self._get_msgs(client, tg, limiter) - except Exception as e: # noqa: BLE001 # pragma: no cover + except Exception as e: # noqa: BLE001 self._log(log_level=logging.ERROR, message=f"Message fetch error: {e!r}", exc_info=e) error_attempt = min(error_attempt + 1, _BACKOFF_EXP_CAP) delay = min(2.0 ** (error_attempt - 1) * random.uniform(0.5, 1.5), 30.0) # noqa: S311 @@ -151,11 +151,23 @@ async def _claim_and_consume( limiter: anyio.CapacityLimiter, client: "RedisClient", ) -> None: + try: + timer_id = raw_id.decode() if isinstance(raw_id, bytes) else raw_id + except UnicodeDecodeError as e: + self._log( + log_level=logging.WARNING, + message=f"Dropping timer with non-UTF-8 id {raw_id!r}: {e!r}", + ) + async with client.pipeline(transaction=True) as pipe: + pipe.zrem(self._config.topic_timeline_key, raw_id) + pipe.hdel(self._config.topic_payloads_key, raw_id) + await pipe.execute() + return + try: async with limiter: now = time.time() claim_score = now + lease_ttl - timer_id = raw_id.decode() if isinstance(raw_id, bytes) else raw_id raw_payload: bytes | None = await eval_cached( client, CLAIM_LUA, @@ -181,7 +193,7 @@ async def _claim_and_consume( ) self._log(log_level=logging.DEBUG, message=f"Timer {timer_id!r} delivered to handler") await self.consume(msg) - except Exception as e: # noqa: BLE001 # pragma: no cover + except Exception as e: # noqa: BLE001 self._log( log_level=logging.ERROR, message=f"Timer {raw_id!r} consume error: {e!r}", diff --git a/tests/test_unit.py b/tests/test_unit.py index 1901551..b3a9626 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -1,8 +1,10 @@ import asyncio +import logging import warnings from datetime import UTC, datetime -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, patch +import anyio import faststream.asgi.factories.asyncapi.try_it_out import pytest from faststream.exceptions import IncorrectState @@ -300,6 +302,86 @@ async def handler(body: str) -> None: ... await asyncio.sleep(0.05) +# --- error path log format (B6) --- + + +async def test_consume_logs_get_msgs_error_with_repr() -> None: + """A Redis error during _get_msgs is logged at ERROR with `{e!r}` so the type and msg survive Sentry truncation.""" + client = AsyncMock() + client.ping.return_value = True + client.zrangebyscore.side_effect = ConnectionError("redis down") + broker = TimersBroker(client, start_timeout=2.0) + + @broker.subscriber("topic", polling_interval=0.05) + async def handler(body: str) -> None: ... + + sub = next(iter(broker._subscribers)) # noqa: SLF001 + log_calls: list[dict[str, object]] = [] + sub._log = MagicMock(side_effect=lambda **kwargs: log_calls.append(kwargs)) # noqa: SLF001 # ty: ignore[invalid-assignment] + + async with broker: + await asyncio.sleep(0.1) + + error_logs = [c for c in log_calls if c.get("log_level") == logging.ERROR] + assert error_logs, "expected at least one ERROR log" + msg = error_logs[0]["message"] + assert isinstance(msg, str) + assert "Message fetch error" in msg + assert "ConnectionError('redis down')" in msg + + +async def test_claim_and_consume_logs_unhandled_error_with_repr() -> None: + """An unhandled error inside the limiter block is logged with `{raw_id!r}` and `{e!r}`.""" + client = AsyncMock() + broker = TimersBroker(client) + sub = broker.subscriber("topic") + await broker.connect() + + log_calls: list[dict[str, object]] = [] + sub._log = MagicMock(side_effect=lambda **kwargs: log_calls.append(kwargs)) # noqa: SLF001 # ty: ignore[invalid-assignment] + + limiter = anyio.CapacityLimiter(1) + raw_id = b"timer-1" + + with patch( + "faststream_redis_timers.subscriber.usecase.eval_cached", + new=AsyncMock(side_effect=RuntimeError("boom")), + ): + await sub._claim_and_consume(raw_id, 30, limiter, client) # noqa: SLF001 + + error_logs = [c for c in log_calls if c.get("log_level") == logging.ERROR] + assert error_logs + msg = error_logs[0]["message"] + assert isinstance(msg, str) + assert "b'timer-1'" in msg + assert "RuntimeError('boom')" in msg + + +# --- non-UTF-8 timer id recovery (B2) --- + + +async def test_claim_and_consume_drops_non_utf8_id() -> None: + """A non-UTF-8 raw_id is removed from both keys so polls recover instead of looping forever.""" + client = AsyncMock() + pipe = MagicMock() + pipe.__aenter__ = AsyncMock(return_value=pipe) + pipe.__aexit__ = AsyncMock(return_value=None) + pipe.execute = AsyncMock(return_value=[1, 1]) + client.pipeline = MagicMock(return_value=pipe) + + broker = TimersBroker(client) + sub = broker.subscriber("topic") + await broker.connect() + bad_id = b"\xff\xfe-broken" + limiter = anyio.CapacityLimiter(1) + + await sub._claim_and_consume(bad_id, 30, limiter, client) # noqa: SLF001 + + pipe.zrem.assert_called_once_with(sub._config.topic_timeline_key, bad_id) # noqa: SLF001 + pipe.hdel.assert_called_once_with(sub._config.topic_payloads_key, bad_id) # noqa: SLF001 + pipe.execute.assert_awaited_once() + + # --- eval_cached NOSCRIPT fallback (O1) --- From 080521115bf022edd8f4335c9e4049c9ce2c927c Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Mon, 4 May 2026 16:57:43 +0300 Subject: [PATCH 2/3] Require non-empty topic at every public broker entry publish, cancel_timer, has_pending, get_pending_timers and cancel_all now raise ValueError when handed an empty topic. cancel_all("") was the worst foot-gun: it silently nuked whatever orphaned keys happened to match the empty-topic key prefix. Drop the topic="" default from publish; topic is now a required positional argument. This is source-incompatible for callers that relied on the default, which is acceptable while pyproject version is still 0. Co-Authored-By: Claude Opus 4.7 --- faststream_redis_timers/broker.py | 15 ++++++++++++-- tests/test_unit.py | 33 +++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/faststream_redis_timers/broker.py b/faststream_redis_timers/broker.py index e1346c0..18841ee 100644 --- a/faststream_redis_timers/broker.py +++ b/faststream_redis_timers/broker.py @@ -33,6 +33,12 @@ from faststream._internal.context.repository import ContextRepo +def _require_topic(topic: str) -> None: + if not topic: + msg = "topic must be a non-empty string" + raise ValueError(msg) + + class TimersParamsStorage(DefaultLoggerStorage): __max_msg_id_ln = -1 _max_channel_name = 7 @@ -176,8 +182,8 @@ async def ping(self, timeout: float | None = None) -> bool: async def publish( # noqa: PLR0913 self, - message: "SendableMessage" = None, - topic: str = "", + message: "SendableMessage", + topic: str, *, timer_id: str = "", activate_in: timedelta = timedelta(0), @@ -185,6 +191,7 @@ async def publish( # noqa: PLR0913 correlation_id: str | None = None, headers: dict[str, typing.Any] | None = None, ) -> str: + _require_topic(topic) if not timer_id: timer_id = gen_cor_id() cmd = TimerPublishCommand( @@ -201,12 +208,14 @@ async def publish( # noqa: PLR0913 async def cancel_timer(self, topic: str, timer_id: str) -> None: """Cancel a pending timer. No-op if the timer has already fired or does not exist.""" + _require_topic(topic) full_topic = f"{self.config.broker_config.prefix}{topic}" producer = typing.cast("TimersProducer", self.config.broker_config.producer) await producer.cancel(full_topic, timer_id) async def has_pending(self, topic: str, timer_id: str) -> bool: """Return True if a timer with this ID is still pending on *topic*.""" + _require_topic(topic) client = self.config.broker_config.connection.client score = await client.zscore(self._topic_timeline_key(topic), timer_id) return score is not None @@ -219,6 +228,7 @@ async def get_pending_timers(self, topic: str, before: datetime | None = None) - into the future, so they appear in the default (``before=None``) result but are excluded once *before* is set to the current time. """ + _require_topic(topic) client = self.config.broker_config.connection.client score_max: str | float = before.timestamp() if before is not None else "+inf" raw_ids: list[bytes] | list[str] = await client.zrangebyscore( @@ -233,6 +243,7 @@ async def cancel_all(self, topic: str) -> int: Handlers already executing for a leased timer continue to run to completion; their final commit is a no-op because the keys are gone. """ + _require_topic(topic) client = self.config.broker_config.connection.client timeline_key = self._topic_timeline_key(topic) payloads_key = self._topic_payloads_key(topic) diff --git a/tests/test_unit.py b/tests/test_unit.py index b3a9626..bda625a 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -302,6 +302,39 @@ async def handler(body: str) -> None: ... await asyncio.sleep(0.05) +# --- topic must be non-empty (B8) --- + + +async def test_publish_rejects_empty_topic() -> None: + broker = TimersBroker(AsyncMock()) + with pytest.raises(ValueError, match="topic must be a non-empty string"): + await broker.publish("msg", topic="") + + +async def test_cancel_timer_rejects_empty_topic() -> None: + broker = TimersBroker(AsyncMock()) + with pytest.raises(ValueError, match="topic must be a non-empty string"): + await broker.cancel_timer("", "id") + + +async def test_has_pending_rejects_empty_topic() -> None: + broker = TimersBroker(AsyncMock()) + with pytest.raises(ValueError, match="topic must be a non-empty string"): + await broker.has_pending("", "id") + + +async def test_get_pending_timers_rejects_empty_topic() -> None: + broker = TimersBroker(AsyncMock()) + with pytest.raises(ValueError, match="topic must be a non-empty string"): + await broker.get_pending_timers("") + + +async def test_cancel_all_rejects_empty_topic() -> None: + broker = TimersBroker(AsyncMock()) + with pytest.raises(ValueError, match="topic must be a non-empty string"): + await broker.cancel_all("") + + # --- error path log format (B6) --- From 715e9f38ba8449fed902773aba2b72780d468208 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Mon, 4 May 2026 16:57:56 +0300 Subject: [PATCH 3/3] Document NTP requirement, lease_ttl tuning, and fake-broker limits - how-it-works.md: add an "Operational requirements" section calling out single-primary Redis, Sentinel support, and that brokers' wall-clock skew must stay well below lease_ttl to avoid duplicate delivery (item 3); add tuning guidance to the lease_ttl row ("3-5x P99 handler runtime", item 11). - testing.md: note that the fake broker does not simulate lease_ttl or re-delivery, so handlers must verify idempotency separately (item 7). Co-Authored-By: Claude Opus 4.7 --- docs/introduction/how-it-works.md | 8 +++++++- docs/usage/testing.md | 1 + 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/introduction/how-it-works.md b/docs/introduction/how-it-works.md index c18ddf5..f05a831 100644 --- a/docs/introduction/how-it-works.md +++ b/docs/introduction/how-it-works.md @@ -66,4 +66,10 @@ The default ack policy is `NACK_ON_ERROR`: the timer is acknowledged on success, | `polling_interval` | `0.05` s | Base poll interval used when the queue has work or just transitioned from idle | | `max_polling_interval` | `5.0` s | Ceiling for the adaptive idle backoff — `polling_interval` doubles up to this value on consecutive empty polls. Worst-case delivery latency on a previously-idle queue is `max_polling_interval × 1.5` (with ±50% jitter) | | `max_concurrent` | `5` | Max handlers running concurrently per subscriber; also bounds fetch batch size | -| `lease_ttl` | `30` s | How long a worker holds the lease before another worker may re-claim | +| `lease_ttl` | `30` s | How long a worker holds the lease before another worker may re-claim. Set to ~3–5× the P99 handler runtime: lower values speed up recovery from worker death, higher values tolerate handler GC pauses and clock skew | + +## Operational requirements + +`faststream-redis-timers` is designed for a **single-primary Redis** (Sentinel-managed primary/replica setups are supported; Redis Cluster is not — see the broker constructor docstring). + +Multiple brokers polling the same Redis derive timer due-times and lease deadlines from each broker's local wall clock. Keep all broker hosts NTP-synchronised: clock skew larger than `lease_ttl` between brokers can cause a clock-fast broker to re-lease a timer that another broker is still actively processing, producing duplicate delivery to handlers. The default `lease_ttl=30s` tolerates seconds of NTP drift; tune `lease_ttl` upward if your environment cannot guarantee sub-second sync. diff --git a/docs/usage/testing.md b/docs/usage/testing.md index 416e4fd..613d47c 100644 --- a/docs/usage/testing.md +++ b/docs/usage/testing.md @@ -89,6 +89,7 @@ Each `ScheduledTimer` has `topic`, `timer_id`, `activate_at`, `body`, `correlati - `publisher.fetch_redis_timers()` always returns `[]` in tests — timers are delivered instantly, so there are never any pending entries. - The inspection / bulk-cancel methods follow the same "nothing pending" contract: `broker.has_pending(...)` returns `False`, `broker.get_pending_timers(...)` returns `[]`, and `broker.cancel_all(...)` returns `0`. Use `TestTimersBroker.scheduled_timers` to assert *what was published*; the inspection API is for production introspection. - The fake producer uses the same envelope format as the real one, so all serialization paths are exercised. +- `lease_ttl` and re-delivery are **not** simulated — handlers that exceed the configured TTL in production may be re-delivered to another worker, but tests will only invoke the handler once. Idempotency must be verified separately. ## pytest-asyncio configuration