diff --git a/faststream_redis_timers/broker.py b/faststream_redis_timers/broker.py index 861cab1..66ee9da 100644 --- a/faststream_redis_timers/broker.py +++ b/faststream_redis_timers/broker.py @@ -205,21 +205,32 @@ async def has_pending(self, topic: str, timer_id: str) -> bool: return score is not None async def get_pending_timers(self, topic: str, before: datetime | None = None) -> list[str]: - """Return pending timer IDs on *topic*. If *before* is given, restrict to timers due by then.""" + """ + Return pending timer IDs on *topic*. If *before* is given, restrict to timers due by then. + + Note that timers currently being processed have their score pushed ``lease_ttl`` seconds + into the future, so they appear in the default (``before=None``) result but are excluded + once *before* is set to the current time. + """ client = self.config.broker_config.connection.client score_max: str | float = before.timestamp() if before is not None else "+inf" raw_ids: list[bytes] = await client.zrangebyscore(self._topic_timeline_key(topic), "-inf", score_max) return [r.decode() if isinstance(r, bytes) else r for r in raw_ids] async def cancel_all(self, topic: str) -> int: - """Cancel every pending timer on *topic*. Returns the number removed.""" + """ + Cancel every pending timer on *topic*. Returns the number removed. + + Handlers already executing for a leased timer continue to run to completion; + their final commit is a no-op because the keys are gone. + """ client = self.config.broker_config.connection.client timeline_key = self._topic_timeline_key(topic) payloads_key = self._topic_payloads_key(topic) async with client.pipeline(transaction=True) as pipe: pipe.zcard(timeline_key) - pipe.delete(timeline_key) - pipe.delete(payloads_key) + pipe.unlink(timeline_key) + pipe.unlink(payloads_key) results = await pipe.execute() return int(results[0]) diff --git a/faststream_redis_timers/subscriber/usecase.py b/faststream_redis_timers/subscriber/usecase.py index 4084969..02e9887 100644 --- a/faststream_redis_timers/subscriber/usecase.py +++ b/faststream_redis_timers/subscriber/usecase.py @@ -142,7 +142,7 @@ async def _get_msgs( self._log(log_level=logging.DEBUG, message=f"Fetched {len(timer_ids)} due timers") lease_ttl = self._config.timer_sub.lease_ttl for raw_id in timer_ids: - tg.start_soon(self._claim_and_consume, raw_id, lease_ttl, limiter) + tg.start_soon(self._claim_and_consume, raw_id, lease_ttl, limiter, client) return len(timer_ids) async def _claim_and_consume( @@ -150,6 +150,7 @@ async def _claim_and_consume( raw_id: bytes | str, lease_ttl: int, limiter: anyio.CapacityLimiter, + client: "Redis[bytes]", ) -> None: try: async with limiter: @@ -157,7 +158,7 @@ async def _claim_and_consume( claim_score = now + lease_ttl timer_id = raw_id.decode() if isinstance(raw_id, bytes) else raw_id raw_payload: bytes | None = await eval_cached( - self._client, + client, CLAIM_LUA, CLAIM_SHA, 2,