diff --git a/docs/introduction/how-it-works.md b/docs/introduction/how-it-works.md index c18ddf5..f05a831 100644 --- a/docs/introduction/how-it-works.md +++ b/docs/introduction/how-it-works.md @@ -66,4 +66,10 @@ The default ack policy is `NACK_ON_ERROR`: the timer is acknowledged on success, | `polling_interval` | `0.05` s | Base poll interval used when the queue has work or just transitioned from idle | | `max_polling_interval` | `5.0` s | Ceiling for the adaptive idle backoff — `polling_interval` doubles up to this value on consecutive empty polls. Worst-case delivery latency on a previously-idle queue is `max_polling_interval × 1.5` (with ±50% jitter) | | `max_concurrent` | `5` | Max handlers running concurrently per subscriber; also bounds fetch batch size | -| `lease_ttl` | `30` s | How long a worker holds the lease before another worker may re-claim | +| `lease_ttl` | `30` s | How long a worker holds the lease before another worker may re-claim. Set to ~3–5× the P99 handler runtime: lower values speed up recovery from worker death, higher values tolerate handler GC pauses and clock skew | + +## Operational requirements + +`faststream-redis-timers` is designed for a **single-primary Redis** (Sentinel-managed primary/replica setups are supported; Redis Cluster is not — see the broker constructor docstring). + +Multiple brokers polling the same Redis derive timer due-times and lease deadlines from each broker's local wall clock. Keep all broker hosts NTP-synchronised: clock skew larger than `lease_ttl` between brokers can cause a clock-fast broker to re-lease a timer that another broker is still actively processing, producing duplicate delivery to handlers. The default `lease_ttl=30s` tolerates seconds of NTP drift; tune `lease_ttl` upward if your environment cannot guarantee sub-second sync. diff --git a/docs/usage/testing.md b/docs/usage/testing.md index 416e4fd..613d47c 100644 --- a/docs/usage/testing.md +++ b/docs/usage/testing.md @@ -89,6 +89,7 @@ Each `ScheduledTimer` has `topic`, `timer_id`, `activate_at`, `body`, `correlati - `publisher.fetch_redis_timers()` always returns `[]` in tests — timers are delivered instantly, so there are never any pending entries. - The inspection / bulk-cancel methods follow the same "nothing pending" contract: `broker.has_pending(...)` returns `False`, `broker.get_pending_timers(...)` returns `[]`, and `broker.cancel_all(...)` returns `0`. Use `TestTimersBroker.scheduled_timers` to assert *what was published*; the inspection API is for production introspection. - The fake producer uses the same envelope format as the real one, so all serialization paths are exercised. +- `lease_ttl` and re-delivery are **not** simulated — handlers that exceed the configured TTL in production may be re-delivered to another worker, but tests will only invoke the handler once. Idempotency must be verified separately. ## pytest-asyncio configuration diff --git a/faststream_redis_timers/broker.py b/faststream_redis_timers/broker.py index e1346c0..18841ee 100644 --- a/faststream_redis_timers/broker.py +++ b/faststream_redis_timers/broker.py @@ -33,6 +33,12 @@ from faststream._internal.context.repository import ContextRepo +def _require_topic(topic: str) -> None: + if not topic: + msg = "topic must be a non-empty string" + raise ValueError(msg) + + class TimersParamsStorage(DefaultLoggerStorage): __max_msg_id_ln = -1 _max_channel_name = 7 @@ -176,8 +182,8 @@ async def ping(self, timeout: float | None = None) -> bool: async def publish( # noqa: PLR0913 self, - message: "SendableMessage" = None, - topic: str = "", + message: "SendableMessage", + topic: str, *, timer_id: str = "", activate_in: timedelta = timedelta(0), @@ -185,6 +191,7 @@ async def publish( # noqa: PLR0913 correlation_id: str | None = None, headers: dict[str, typing.Any] | None = None, ) -> str: + _require_topic(topic) if not timer_id: timer_id = gen_cor_id() cmd = TimerPublishCommand( @@ -201,12 +208,14 @@ async def publish( # noqa: PLR0913 async def cancel_timer(self, topic: str, timer_id: str) -> None: """Cancel a pending timer. No-op if the timer has already fired or does not exist.""" + _require_topic(topic) full_topic = f"{self.config.broker_config.prefix}{topic}" producer = typing.cast("TimersProducer", self.config.broker_config.producer) await producer.cancel(full_topic, timer_id) async def has_pending(self, topic: str, timer_id: str) -> bool: """Return True if a timer with this ID is still pending on *topic*.""" + _require_topic(topic) client = self.config.broker_config.connection.client score = await client.zscore(self._topic_timeline_key(topic), timer_id) return score is not None @@ -219,6 +228,7 @@ async def get_pending_timers(self, topic: str, before: datetime | None = None) - into the future, so they appear in the default (``before=None``) result but are excluded once *before* is set to the current time. """ + _require_topic(topic) client = self.config.broker_config.connection.client score_max: str | float = before.timestamp() if before is not None else "+inf" raw_ids: list[bytes] | list[str] = await client.zrangebyscore( @@ -233,6 +243,7 @@ async def cancel_all(self, topic: str) -> int: Handlers already executing for a leased timer continue to run to completion; their final commit is a no-op because the keys are gone. """ + _require_topic(topic) client = self.config.broker_config.connection.client timeline_key = self._topic_timeline_key(topic) payloads_key = self._topic_payloads_key(topic) diff --git a/faststream_redis_timers/subscriber/usecase.py b/faststream_redis_timers/subscriber/usecase.py index 1ac7e42..7cfe485 100644 --- a/faststream_redis_timers/subscriber/usecase.py +++ b/faststream_redis_timers/subscriber/usecase.py @@ -99,7 +99,7 @@ async def _consume(self, client: "RedisClient", *, start_signal: anyio.Event) -> while self.running: try: fetched = await self._get_msgs(client, tg, limiter) - except Exception as e: # noqa: BLE001 # pragma: no cover + except Exception as e: # noqa: BLE001 self._log(log_level=logging.ERROR, message=f"Message fetch error: {e!r}", exc_info=e) error_attempt = min(error_attempt + 1, _BACKOFF_EXP_CAP) delay = min(2.0 ** (error_attempt - 1) * random.uniform(0.5, 1.5), 30.0) # noqa: S311 @@ -151,11 +151,23 @@ async def _claim_and_consume( limiter: anyio.CapacityLimiter, client: "RedisClient", ) -> None: + try: + timer_id = raw_id.decode() if isinstance(raw_id, bytes) else raw_id + except UnicodeDecodeError as e: + self._log( + log_level=logging.WARNING, + message=f"Dropping timer with non-UTF-8 id {raw_id!r}: {e!r}", + ) + async with client.pipeline(transaction=True) as pipe: + pipe.zrem(self._config.topic_timeline_key, raw_id) + pipe.hdel(self._config.topic_payloads_key, raw_id) + await pipe.execute() + return + try: async with limiter: now = time.time() claim_score = now + lease_ttl - timer_id = raw_id.decode() if isinstance(raw_id, bytes) else raw_id raw_payload: bytes | None = await eval_cached( client, CLAIM_LUA, @@ -181,7 +193,7 @@ async def _claim_and_consume( ) self._log(log_level=logging.DEBUG, message=f"Timer {timer_id!r} delivered to handler") await self.consume(msg) - except Exception as e: # noqa: BLE001 # pragma: no cover + except Exception as e: # noqa: BLE001 self._log( log_level=logging.ERROR, message=f"Timer {raw_id!r} consume error: {e!r}", diff --git a/tests/test_unit.py b/tests/test_unit.py index 1901551..bda625a 100644 --- a/tests/test_unit.py +++ b/tests/test_unit.py @@ -1,8 +1,10 @@ import asyncio +import logging import warnings from datetime import UTC, datetime -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, patch +import anyio import faststream.asgi.factories.asyncapi.try_it_out import pytest from faststream.exceptions import IncorrectState @@ -300,6 +302,119 @@ async def handler(body: str) -> None: ... await asyncio.sleep(0.05) +# --- topic must be non-empty (B8) --- + + +async def test_publish_rejects_empty_topic() -> None: + broker = TimersBroker(AsyncMock()) + with pytest.raises(ValueError, match="topic must be a non-empty string"): + await broker.publish("msg", topic="") + + +async def test_cancel_timer_rejects_empty_topic() -> None: + broker = TimersBroker(AsyncMock()) + with pytest.raises(ValueError, match="topic must be a non-empty string"): + await broker.cancel_timer("", "id") + + +async def test_has_pending_rejects_empty_topic() -> None: + broker = TimersBroker(AsyncMock()) + with pytest.raises(ValueError, match="topic must be a non-empty string"): + await broker.has_pending("", "id") + + +async def test_get_pending_timers_rejects_empty_topic() -> None: + broker = TimersBroker(AsyncMock()) + with pytest.raises(ValueError, match="topic must be a non-empty string"): + await broker.get_pending_timers("") + + +async def test_cancel_all_rejects_empty_topic() -> None: + broker = TimersBroker(AsyncMock()) + with pytest.raises(ValueError, match="topic must be a non-empty string"): + await broker.cancel_all("") + + +# --- error path log format (B6) --- + + +async def test_consume_logs_get_msgs_error_with_repr() -> None: + """A Redis error during _get_msgs is logged at ERROR with `{e!r}` so the type and msg survive Sentry truncation.""" + client = AsyncMock() + client.ping.return_value = True + client.zrangebyscore.side_effect = ConnectionError("redis down") + broker = TimersBroker(client, start_timeout=2.0) + + @broker.subscriber("topic", polling_interval=0.05) + async def handler(body: str) -> None: ... + + sub = next(iter(broker._subscribers)) # noqa: SLF001 + log_calls: list[dict[str, object]] = [] + sub._log = MagicMock(side_effect=lambda **kwargs: log_calls.append(kwargs)) # noqa: SLF001 # ty: ignore[invalid-assignment] + + async with broker: + await asyncio.sleep(0.1) + + error_logs = [c for c in log_calls if c.get("log_level") == logging.ERROR] + assert error_logs, "expected at least one ERROR log" + msg = error_logs[0]["message"] + assert isinstance(msg, str) + assert "Message fetch error" in msg + assert "ConnectionError('redis down')" in msg + + +async def test_claim_and_consume_logs_unhandled_error_with_repr() -> None: + """An unhandled error inside the limiter block is logged with `{raw_id!r}` and `{e!r}`.""" + client = AsyncMock() + broker = TimersBroker(client) + sub = broker.subscriber("topic") + await broker.connect() + + log_calls: list[dict[str, object]] = [] + sub._log = MagicMock(side_effect=lambda **kwargs: log_calls.append(kwargs)) # noqa: SLF001 # ty: ignore[invalid-assignment] + + limiter = anyio.CapacityLimiter(1) + raw_id = b"timer-1" + + with patch( + "faststream_redis_timers.subscriber.usecase.eval_cached", + new=AsyncMock(side_effect=RuntimeError("boom")), + ): + await sub._claim_and_consume(raw_id, 30, limiter, client) # noqa: SLF001 + + error_logs = [c for c in log_calls if c.get("log_level") == logging.ERROR] + assert error_logs + msg = error_logs[0]["message"] + assert isinstance(msg, str) + assert "b'timer-1'" in msg + assert "RuntimeError('boom')" in msg + + +# --- non-UTF-8 timer id recovery (B2) --- + + +async def test_claim_and_consume_drops_non_utf8_id() -> None: + """A non-UTF-8 raw_id is removed from both keys so polls recover instead of looping forever.""" + client = AsyncMock() + pipe = MagicMock() + pipe.__aenter__ = AsyncMock(return_value=pipe) + pipe.__aexit__ = AsyncMock(return_value=None) + pipe.execute = AsyncMock(return_value=[1, 1]) + client.pipeline = MagicMock(return_value=pipe) + + broker = TimersBroker(client) + sub = broker.subscriber("topic") + await broker.connect() + bad_id = b"\xff\xfe-broken" + limiter = anyio.CapacityLimiter(1) + + await sub._claim_and_consume(bad_id, 30, limiter, client) # noqa: SLF001 + + pipe.zrem.assert_called_once_with(sub._config.topic_timeline_key, bad_id) # noqa: SLF001 + pipe.hdel.assert_called_once_with(sub._config.topic_payloads_key, bad_id) # noqa: SLF001 + pipe.execute.assert_awaited_once() + + # --- eval_cached NOSCRIPT fallback (O1) ---