diff --git a/pyproject.toml b/pyproject.toml index 396fb5a..13e2c55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -127,3 +127,9 @@ allow-magic-value-types = ["int", "str", "float"] [tool.ruff.lint.flake8-bugbear] extend-immutable-calls = ["taskiq_dependencies.Depends", "taskiq.TaskiqDepends"] + +[tool.pytest.ini_options] +filterwarnings = [ + # about deprecated RedisScheduleSource usage - delete after removing RedisScheduleSource + 'ignore:RedisScheduleSource is deprecated:DeprecationWarning', +] diff --git a/taskiq_redis/list_schedule_source.py b/taskiq_redis/list_schedule_source.py index a056d09..7d8efe8 100644 --- a/taskiq_redis/list_schedule_source.py +++ b/taskiq_redis/list_schedule_source.py @@ -130,7 +130,7 @@ async def _get_previous_time_schedules(self) -> list[bytes]: if key_time and key_time <= minute_before: time_keys.append(key.decode()) for key in time_keys: - schedules.extend(await redis.lrange(key, 0, -1)) + schedules.extend(await redis.lrange(key, 0, -1)) # type: ignore[misc] return schedules @@ -146,10 +146,10 @@ async def delete_schedule(self, schedule_id: str) -> None: ) # We need to remove the schedule from the cron or time list. if schedule.cron is not None: - await redis.lrem(self._get_cron_key(), 0, schedule_id) + await redis.lrem(self._get_cron_key(), 0, schedule_id) # type: ignore[misc] elif schedule.time is not None: time_key = self._get_time_key(schedule.time) - await redis.lrem(time_key, 0, schedule_id) + await redis.lrem(time_key, 0, schedule_id) # type: ignore[misc] async def add_schedule(self, schedule: "ScheduledTask") -> None: """Add a schedule to the source.""" @@ -163,9 +163,9 @@ async def add_schedule(self, schedule: "ScheduledTask") -> None: # This is an optimization, so we can get all the schedules # for the current time much faster. if schedule.cron is not None: - await redis.rpush(self._get_cron_key(), schedule.schedule_id) + await redis.rpush(self._get_cron_key(), schedule.schedule_id) # type: ignore[misc] elif schedule.time is not None: - await redis.rpush( + await redis.rpush( # type: ignore[misc] self._get_time_key(schedule.time), schedule.schedule_id, ) @@ -195,11 +195,11 @@ async def get_schedules(self) -> List["ScheduledTask"]: self._is_first_run = False async with Redis(connection_pool=self._connection_pool) as redis: buffer = [] - crons = await redis.lrange(self._get_cron_key(), 0, -1) + crons = await redis.lrange(self._get_cron_key(), 0, -1) # type: ignore[misc] logger.debug("Got %d cron schedules", len(crons)) if crons: buffer.extend(crons) - timed.extend(await redis.lrange(self._get_time_key(current_time), 0, -1)) + timed.extend(await redis.lrange(self._get_time_key(current_time), 0, -1)) # type: ignore[misc] logger.debug("Got %d timed schedules", len(timed)) if timed: buffer.extend(timed) diff --git a/taskiq_redis/redis_backend.py b/taskiq_redis/redis_backend.py index 5f0b3f4..9d0394f 100644 --- a/taskiq_redis/redis_backend.py +++ b/taskiq_redis/redis_backend.py @@ -4,7 +4,6 @@ TYPE_CHECKING, Any, AsyncIterator, - Dict, List, Optional, Tuple, @@ -121,17 +120,15 @@ async def set_result( :param task_id: ID of the task. :param result: TaskiqResult instance. """ - redis_set_params: Dict[str, Union[str, int, bytes]] = { - "name": self._task_name(task_id), - "value": self.serializer.dumpb(model_dump(result)), - } - if self.result_ex_time: - redis_set_params["ex"] = self.result_ex_time - elif self.result_px_time: - redis_set_params["px"] = self.result_px_time - + name = self._task_name(task_id) + value = self.serializer.dumpb(model_dump(result)) async with Redis(connection_pool=self.redis_pool) as redis: - await redis.set(**redis_set_params) + if self.result_ex_time: + await redis.set(name=name, value=value, ex=self.result_ex_time) + elif self.result_px_time: + await redis.set(name=name, value=value, px=self.result_px_time) + else: + await redis.set(name=name, value=value) async def is_result_ready(self, task_id: str) -> bool: """ @@ -195,17 +192,15 @@ async def set_progress( :param task_id: ID of the task. :param result: task's TaskProgress instance. """ - redis_set_params: Dict[str, Union[str, int, bytes]] = { - "name": self._task_name(task_id) + PROGRESS_KEY_SUFFIX, - "value": self.serializer.dumpb(model_dump(progress)), - } - if self.result_ex_time: - redis_set_params["ex"] = self.result_ex_time - elif self.result_px_time: - redis_set_params["px"] = self.result_px_time - + name = self._task_name(task_id) + PROGRESS_KEY_SUFFIX + value = self.serializer.dumpb(model_dump(progress)) async with Redis(connection_pool=self.redis_pool) as redis: - await redis.set(**redis_set_params) + if self.result_ex_time: + await redis.set(name=name, value=value, ex=self.result_ex_time) + elif self.result_px_time: + await redis.set(name=name, value=value, px=self.result_px_time) + else: + await redis.set(name=name, value=value) async def get_progress( self, @@ -296,24 +291,23 @@ async def set_result( result: TaskiqResult[_ReturnType], ) -> None: """ - Sets task result in redis. + Sets task result in redis cluster. Dumps TaskiqResult instance into the bytes and writes - it to redis. + it to redis cluster. :param task_id: ID of the task. :param result: TaskiqResult instance. """ - redis_set_params: Dict[str, Union[str, bytes, int]] = { - "name": self._task_name(task_id), - "value": self.serializer.dumpb(model_dump(result)), - } - if self.result_ex_time: - redis_set_params["ex"] = self.result_ex_time - elif self.result_px_time: - redis_set_params["px"] = self.result_px_time - - await self.redis.set(**redis_set_params) # type: ignore + name = self._task_name(task_id) + value = self.serializer.dumpb(model_dump(result)) + async with self.redis as redis: + if self.result_ex_time: + await redis.set(name=name, value=value, ex=self.result_ex_time) + elif self.result_px_time: + await redis.set(name=name, value=value, px=self.result_px_time) + else: + await redis.set(name=name, value=value) async def is_result_ready(self, task_id: str) -> bool: """ @@ -367,24 +361,23 @@ async def set_progress( progress: TaskProgress[_ReturnType], ) -> None: """ - Sets task progress in redis. + Sets task progress in redis cluster. Dumps TaskProgress instance into the bytes and writes - it to redis with a standard suffix on the task_id as the key + it to redis cluster with a standard suffix on the task_id as the key :param task_id: ID of the task. :param result: task's TaskProgress instance. """ - redis_set_params: Dict[str, Union[str, int, bytes]] = { - "name": self._task_name(task_id) + PROGRESS_KEY_SUFFIX, - "value": self.serializer.dumpb(model_dump(progress)), - } - if self.result_ex_time: - redis_set_params["ex"] = self.result_ex_time - elif self.result_px_time: - redis_set_params["px"] = self.result_px_time - - await self.redis.set(**redis_set_params) # type: ignore + name = self._task_name(task_id) + PROGRESS_KEY_SUFFIX + value = self.serializer.dumpb(model_dump(progress)) + async with self.redis as redis: + if self.result_ex_time: + await redis.set(name=name, value=value, ex=self.result_ex_time) + elif self.result_px_time: + await redis.set(name=name, value=value, px=self.result_px_time) + else: + await redis.set(name=name, value=value) async def get_progress( self, @@ -490,17 +483,15 @@ async def set_result( :param task_id: ID of the task. :param result: TaskiqResult instance. """ - redis_set_params: Dict[str, Union[str, bytes, int]] = { - "name": self._task_name(task_id), - "value": self.serializer.dumpb(model_dump(result)), - } - if self.result_ex_time: - redis_set_params["ex"] = self.result_ex_time - elif self.result_px_time: - redis_set_params["px"] = self.result_px_time - + name = self._task_name(task_id) + value = self.serializer.dumpb(model_dump(result)) async with self._acquire_master_conn() as redis: - await redis.set(**redis_set_params) # type: ignore + if self.result_ex_time: + await redis.set(name=name, value=value, ex=self.result_ex_time) + elif self.result_px_time: + await redis.set(name=name, value=value, px=self.result_px_time) + else: + await redis.set(name=name, value=value) async def is_result_ready(self, task_id: str) -> bool: """ @@ -559,22 +550,20 @@ async def set_progress( Sets task progress in redis. Dumps TaskProgress instance into the bytes and writes - it to redis with a standard suffix on the task_id as the key + it to redis via sentinel with a standard suffix on the task_id as the key :param task_id: ID of the task. :param result: task's TaskProgress instance. """ - redis_set_params: Dict[str, Union[str, int, bytes]] = { - "name": self._task_name(task_id) + PROGRESS_KEY_SUFFIX, - "value": self.serializer.dumpb(model_dump(progress)), - } - if self.result_ex_time: - redis_set_params["ex"] = self.result_ex_time - elif self.result_px_time: - redis_set_params["px"] = self.result_px_time - + name = self._task_name(task_id) + PROGRESS_KEY_SUFFIX + value = self.serializer.dumpb(model_dump(progress)) async with self._acquire_master_conn() as redis: - await redis.set(**redis_set_params) # type: ignore + if self.result_ex_time: + await redis.set(name=name, value=value, ex=self.result_ex_time) + elif self.result_px_time: + await redis.set(name=name, value=value, px=self.result_px_time) + else: + await redis.set(name=name, value=value) async def get_progress( self, diff --git a/taskiq_redis/redis_broker.py b/taskiq_redis/redis_broker.py index 663c8e4..db6f4ca 100644 --- a/taskiq_redis/redis_broker.py +++ b/taskiq_redis/redis_broker.py @@ -10,6 +10,7 @@ Dict, Optional, TypeVar, + Union, ) from redis.asyncio import BlockingConnectionPool, Connection, Redis, ResponseError @@ -122,7 +123,7 @@ async def kick(self, message: BrokerMessage) -> None: """ queue_name = message.labels.get("queue_name") or self.queue_name async with Redis(connection_pool=self.connection_pool) as redis_conn: - await redis_conn.lpush(queue_name, message.message) + await redis_conn.lpush(queue_name, message.message) # type: ignore async def listen(self) -> AsyncGenerator[bytes, None]: """ @@ -137,7 +138,7 @@ async def listen(self) -> AsyncGenerator[bytes, None]: while True: try: async with Redis(connection_pool=self.connection_pool) as redis_conn: - yield (await redis_conn.brpop(self.queue_name))[ + yield (await redis_conn.brpop(self.queue_name))[ # type: ignore redis_brpop_data_position ] except ConnectionError as exc: @@ -170,7 +171,7 @@ def __init__( idle_timeout: int = 600000, # 10 minutes unacknowledged_batch_size: int = 100, xread_count: Optional[int] = 100, - additional_streams: Optional[Dict[str, str]] = None, + additional_streams: Optional[Dict[str, Union[str, int]]] = None, **connection_kwargs: Any, ) -> None: """ @@ -281,7 +282,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]: self.consumer_name, { self.queue_name: ">", - **self.additional_streams, + **self.additional_streams, # type: ignore[dict-item] }, block=self.block, noack=False,