From 8817c84253c668bbedd88a732bd3aa835efb0265 Mon Sep 17 00:00:00 2001 From: Anselme_REVUZ Date: Mon, 11 May 2026 19:32:17 +0200 Subject: [PATCH] add_progress --- taskiq_postgresql/result_backend.py | 82 +++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/taskiq_postgresql/result_backend.py b/taskiq_postgresql/result_backend.py index aaa5e27..2d84016 100644 --- a/taskiq_postgresql/result_backend.py +++ b/taskiq_postgresql/result_backend.py @@ -4,6 +4,8 @@ from taskiq import AsyncResultBackend, TaskiqResult from taskiq.abc.serializer import TaskiqSerializer +from taskiq.compat import model_dump, model_validate +from taskiq.depends.progress_tracker import TaskProgress from taskiq.serializers.pickle import PickleSerializer from taskiq_postgresql.abc.driver import QueryDriver @@ -13,6 +15,8 @@ _ReturnType = TypeVar("_ReturnType") +PROGRESS_KEY_SUFFIX = "__progress" + @dataclass class Table: @@ -204,3 +208,81 @@ async def delete_by_date( to_date (datetime | date | None): Date to which to delete results. """ await self.driver.delete_by_date(from_date, to_date) + + + async def set_progress( + self, + task_id: Any, + progress: TaskProgress[_ReturnType], + ) -> None: + """ + Store task progress. + + Args: + task_id: ID of the task. + progress: Progress payload. + """ + await self.driver.insert_or_update( + [ + self.columns.primary_key, + self.columns.result, + ], + [ + f"{task_id}{PROGRESS_KEY_SUFFIX}", + self.serializer.dumpb(model_dump(progress)), + ], + [ + self.columns.primary_key, + ], + [ + self.columns.result, + ], + ) + + async def get_progress( + self, + task_id: Any, + ) -> TaskProgress[_ReturnType] | None: + """ + Retrieve task progress. + + Args: + task_id: ID of the task. + + Returns: + TaskProgress instance or None. + """ + data = await self.driver.select( + [ + self.columns.result, + ], + [ + self.columns.primary_key, + ], + [f"{task_id}{PROGRESS_KEY_SUFFIX}"], + ) + + if not data: + return None + + progress_bytes = data[0]["result"] + + if progress_bytes is None: + return None + + return model_validate( + TaskProgress[_ReturnType], + self.serializer.loadb(progress_bytes), + ) + + async def delete_progress(self, task_id: Any) -> None: + """ + Delete stored progress for a task. + + Args: + task_id: ID of the task. + """ + await self.driver.delete( + self.columns.primary_key, + f"{task_id}{PROGRESS_KEY_SUFFIX}", + )