Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions faststream_redis_timers/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,21 +205,32 @@ async def has_pending(self, topic: str, timer_id: str) -> bool:
return score is not None

async def get_pending_timers(self, topic: str, before: datetime | None = None) -> list[str]:
"""Return pending timer IDs on *topic*. If *before* is given, restrict to timers due by then."""
"""
Return pending timer IDs on *topic*. If *before* is given, restrict to timers due by then.

Note that timers currently being processed have their score pushed ``lease_ttl`` seconds
into the future, so they appear in the default (``before=None``) result but are excluded
once *before* is set to the current time.
"""
client = self.config.broker_config.connection.client
score_max: str | float = before.timestamp() if before is not None else "+inf"
raw_ids: list[bytes] = await client.zrangebyscore(self._topic_timeline_key(topic), "-inf", score_max)
return [r.decode() if isinstance(r, bytes) else r for r in raw_ids]

async def cancel_all(self, topic: str) -> int:
"""Cancel every pending timer on *topic*. Returns the number removed."""
"""
Cancel every pending timer on *topic*. Returns the number removed.

Handlers already executing for a leased timer continue to run to completion;
their final commit is a no-op because the keys are gone.
"""
client = self.config.broker_config.connection.client
timeline_key = self._topic_timeline_key(topic)
payloads_key = self._topic_payloads_key(topic)
async with client.pipeline(transaction=True) as pipe:
pipe.zcard(timeline_key)
pipe.delete(timeline_key)
pipe.delete(payloads_key)
pipe.unlink(timeline_key)
pipe.unlink(payloads_key)
results = await pipe.execute()
return int(results[0])

Expand Down
5 changes: 3 additions & 2 deletions faststream_redis_timers/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,22 +142,23 @@ async def _get_msgs(
self._log(log_level=logging.DEBUG, message=f"Fetched {len(timer_ids)} due timers")
lease_ttl = self._config.timer_sub.lease_ttl
for raw_id in timer_ids:
tg.start_soon(self._claim_and_consume, raw_id, lease_ttl, limiter)
tg.start_soon(self._claim_and_consume, raw_id, lease_ttl, limiter, client)
return len(timer_ids)

async def _claim_and_consume(
self,
raw_id: bytes | str,
lease_ttl: int,
limiter: anyio.CapacityLimiter,
client: "Redis[bytes]",
) -> None:
try:
async with limiter:
now = time.time()
claim_score = now + lease_ttl
timer_id = raw_id.decode() if isinstance(raw_id, bytes) else raw_id
raw_payload: bytes | None = await eval_cached(
self._client,
client,
CLAIM_LUA,
CLAIM_SHA,
2,
Expand Down
Loading