From aff50b22be4f41a9497702a0731ba0cea8059164 Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Wed, 28 May 2025 14:14:42 +0100 Subject: [PATCH 01/12] feat: Support task backoff --- settings/dev.py | 1 + src/task_processor/exceptions.py | 19 +++++++++++++++++++ src/task_processor/models.py | 15 ++++----------- src/task_processor/processor.py | 20 +++++++++++++++++++- src/task_processor/task_registry.py | 19 +++++++++++++++---- 5 files changed, 58 insertions(+), 16 deletions(-) diff --git a/settings/dev.py b/settings/dev.py index 086a0eb3..a02fd58c 100644 --- a/settings/dev.py +++ b/settings/dev.py @@ -58,6 +58,7 @@ ENABLE_CLEAN_UP_OLD_TASKS = True ENABLE_TASK_PROCESSOR_HEALTH_CHECK = True RECURRING_TASK_RUN_RETENTION_DAYS = 15 +TASK_BACKOFF_DEFAULT_SECONDS = 5 TASK_DELETE_BATCH_SIZE = 2000 TASK_DELETE_INCLUDE_FAILED_TASKS = False TASK_DELETE_RETENTION_DAYS = 15 diff --git a/src/task_processor/exceptions.py b/src/task_processor/exceptions.py index 7f697a6e..8d53706d 100644 --- a/src/task_processor/exceptions.py +++ b/src/task_processor/exceptions.py @@ -1,3 +1,6 @@ +from datetime import datetime + + class TaskProcessingError(Exception): pass @@ -6,5 +9,21 @@ class InvalidArgumentsError(TaskProcessingError): pass +class TaskBackoffError(TaskProcessingError): + """ + Raise this exception inside a task to indicate that it should be retried after a delay. + This is typically used when a task fails due to a temporary issue, such as + a network error or a service being unavailable. + """ + + def __init__( + self, + delay_until: datetime | None = None, + ) -> None: + super().__init__() + delay_until = delay_until + self.delay_until = delay_until + + class TaskQueueFullError(Exception): pass diff --git a/src/task_processor/models.py b/src/task_processor/models.py index 116b7a6e..58383b5d 100644 --- a/src/task_processor/models.py +++ b/src/task_processor/models.py @@ -7,9 +7,9 @@ from django.db import models from django.utils import timezone -from task_processor.exceptions import TaskProcessingError, TaskQueueFullError +from task_processor.exceptions import TaskQueueFullError from task_processor.managers import RecurringTaskManager, TaskManager -from task_processor.task_registry import registered_tasks +from task_processor.task_registry import get_task, registered_tasks from task_processor.types import TaskCallable _django_json_encoder_default = DjangoJSONEncoder().default @@ -75,15 +75,8 @@ def run(self) -> None: @property def callable(self) -> TaskCallable[typing.Any]: - try: - task = registered_tasks[self.task_identifier] - return task.task_function - except KeyError as e: - raise TaskProcessingError( - "No task registered with identifier '%s'. Ensure your task is " - "decorated with @register_task_handler.", - self.task_identifier, - ) from e + task = get_task(self.task_identifier) + return task.task_handler.unwrapped class Task(AbstractBaseTask): diff --git a/src/task_processor/processor.py b/src/task_processor/processor.py index d6aa414b..24eeb9df 100644 --- a/src/task_processor/processor.py +++ b/src/task_processor/processor.py @@ -5,10 +5,12 @@ from contextlib import ExitStack from datetime import timedelta +from dateutil.relativedelta import relativedelta from django.conf import settings from django.utils import timezone from task_processor import metrics +from task_processor.exceptions import TaskBackoffError from task_processor.managers import RecurringTaskManager, TaskManager from task_processor.models import ( AbstractBaseTask, @@ -120,6 +122,7 @@ def _run_task( ctx.enter_context(timer) task_identifier = task.task_identifier + registered_task = get_task(task_identifier) logger.debug( f"Running task {task_identifier} id={task.pk} args={task.args} kwargs={task.kwargs}" @@ -157,9 +160,24 @@ def _run_task( exc_info=True, ) + if isinstance(e, TaskBackoffError): + delay_until = e.delay_until or timezone.now() + relativedelta( + seconds=settings.TASK_BACKOFF_DEFAULT_SECONDS, + ) + registered_task.task_handler.delay( + delay_until=delay_until, + args=task.args, + kwargs=task.kwargs, + ) + logger.info( + "Backoff requested. Task '%s' set to retry at %s", + task_identifier, + delay_until, + ) + labels = { "task_identifier": task_identifier, - "task_type": get_task(task_identifier).task_type.value.lower(), + "task_type": registered_task.task_type.value.lower(), "result": result.lower(), } diff --git a/src/task_processor/task_registry.py b/src/task_processor/task_registry.py index c64a0898..abd721f9 100644 --- a/src/task_processor/task_registry.py +++ b/src/task_processor/task_registry.py @@ -3,8 +3,12 @@ import typing from dataclasses import dataclass +from task_processor.exceptions import TaskProcessingError from task_processor.types import TaskCallable +if typing.TYPE_CHECKING: + from task_processor.decorators import TaskHandler # noqa: F401 + logger = logging.getLogger(__name__) @@ -16,7 +20,7 @@ class TaskType(enum.Enum): @dataclass class RegisteredTask: task_identifier: str - task_function: TaskCallable[typing.Any] + task_handler: "TaskHandler" task_type: TaskType = TaskType.STANDARD task_kwargs: dict[str, typing.Any] | None = None @@ -43,18 +47,25 @@ def initialise() -> None: def get_task(task_identifier: str) -> RegisteredTask: global registered_tasks - return registered_tasks[task_identifier] + try: + return registered_tasks[task_identifier] + except KeyError: + raise TaskProcessingError( + "No task registered with identifier '%s'. Ensure your task is " + "decorated with @register_task_handler.", + task_identifier, + ) def register_task( task_identifier: str, - callable_: TaskCallable[typing.Any], + task_handler: "TaskHandler", ) -> None: global registered_tasks registered_task = RegisteredTask( task_identifier=task_identifier, - task_function=callable_, + task_handler=task_handler, ) registered_tasks[task_identifier] = registered_task From 0d75fd6bf5c914e8f674281e22703f2d44b38cdd Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Wed, 28 May 2025 14:17:41 +0100 Subject: [PATCH 02/12] cleanup --- src/task_processor/exceptions.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/task_processor/exceptions.py b/src/task_processor/exceptions.py index 8d53706d..b09bc688 100644 --- a/src/task_processor/exceptions.py +++ b/src/task_processor/exceptions.py @@ -21,7 +21,6 @@ def __init__( delay_until: datetime | None = None, ) -> None: super().__init__() - delay_until = delay_until self.delay_until = delay_until From 1fe6a317b0f406ebdc66588be9020ff55a4bb41e Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Wed, 28 May 2025 14:41:34 +0100 Subject: [PATCH 03/12] fixes --- poetry.lock | 17 ++++++++++++++--- pyproject.toml | 1 + src/task_processor/decorators.py | 2 +- src/task_processor/task_registry.py | 10 ++++++---- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/poetry.lock b/poetry.lock index 835f190d..4a5b092a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. [[package]] name = "annotated-types" @@ -900,7 +900,6 @@ files = [ {file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:bb89f0a835bcfc1d42ccd5f41f04870c1b936d8507c6df12b7737febc40f0909"}, {file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:f0c2d907a1e102526dd2986df638343388b94c33860ff3bbe1384130828714b1"}, {file = "psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:f8157bed2f51db683f31306aa497311b560f2265998122abe1dce6428bd86567"}, - {file = "psycopg2_binary-2.9.10-cp313-cp313-win_amd64.whl", hash = "sha256:27422aa5f11fbcd9b18da48373eb67081243662f9b46e6fd07c3eb46e4535142"}, {file = "psycopg2_binary-2.9.10-cp38-cp38-macosx_12_0_x86_64.whl", hash = "sha256:eb09aa7f9cecb45027683bb55aebaaf45a0df8bf6de68801a6afdc7947bb09d4"}, {file = "psycopg2_binary-2.9.10-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b73d6d7f0ccdad7bc43e6d34273f70d587ef62f824d7261c4ae9b8b1b6af90e8"}, {file = "psycopg2_binary-2.9.10-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ce5ab4bf46a211a8e924d307c1b1fcda82368586a19d0a24f8ae166f5c784864"}, @@ -1540,6 +1539,18 @@ files = [ dev = ["build", "hatch"] doc = ["sphinx"] +[[package]] +name = "types-python-dateutil" +version = "2.9.0.20250516" +description = "Typing stubs for python-dateutil" +optional = false +python-versions = ">=3.9" +groups = ["dev"] +files = [ + {file = "types_python_dateutil-2.9.0.20250516-py3-none-any.whl", hash = "sha256:2b2b3f57f9c6a61fba26a9c0ffb9ea5681c9b83e69cd897c6b5f668d9c0cab93"}, + {file = "types_python_dateutil-2.9.0.20250516.tar.gz", hash = "sha256:13e80d6c9c47df23ad773d54b2826bd52dbbb41be87c3f339381c1700ad21ee5"}, +] + [[package]] name = "types-pyyaml" version = "6.0.12.20241230" @@ -1679,4 +1690,4 @@ test-tools = ["pyfakefs"] [metadata] lock-version = "2.1" python-versions = ">=3.11,<4.0" -content-hash = "18369d05529bfce4aad0393b73baa2f6d32394acbd3f3965d2a4e4f96da98078" +content-hash = "38e85051fccc3bf8d25b47f20b680f2d81c388381198df7d2e8a153b9896f9f7" diff --git a/pyproject.toml b/pyproject.toml index 9fa33091..5846849c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,6 +79,7 @@ pytest-mock = "^3.14.0" ruff = "==0.11.9" setuptools = "^78.1.1" types-simplejson = "^3.20.0.20250326" +types-python-dateutil = "^2.9.0.20250516" [build-system] requires = ["poetry-core"] diff --git a/src/task_processor/decorators.py b/src/task_processor/decorators.py index 0749f5eb..522579b4 100644 --- a/src/task_processor/decorators.py +++ b/src/task_processor/decorators.py @@ -49,7 +49,7 @@ def __init__( f, task_name, ) - task_registry.register_task(task_identifier, f) + task_registry.register_task(task_identifier, self) def __call__( self, diff --git a/src/task_processor/task_registry.py b/src/task_processor/task_registry.py index abd721f9..bf823410 100644 --- a/src/task_processor/task_registry.py +++ b/src/task_processor/task_registry.py @@ -20,8 +20,9 @@ class TaskType(enum.Enum): @dataclass class RegisteredTask: task_identifier: str - task_handler: "TaskHandler" + task_callable: TaskCallable[typing.Any] task_type: TaskType = TaskType.STANDARD + task_handler: "TaskHandler[typing.Any] | None" = None task_kwargs: dict[str, typing.Any] | None = None @@ -59,20 +60,21 @@ def get_task(task_identifier: str) -> RegisteredTask: def register_task( task_identifier: str, - task_handler: "TaskHandler", + task_handler: "TaskHandler[typing.Any]", ) -> None: global registered_tasks registered_task = RegisteredTask( task_identifier=task_identifier, task_handler=task_handler, + task_callable=task_handler.unwrapped, ) registered_tasks[task_identifier] = registered_task def register_recurring_task( task_identifier: str, - callable_: TaskCallable[typing.Any], + task_callable: TaskCallable[typing.Any], **task_kwargs: typing.Any, ) -> None: global registered_tasks @@ -81,7 +83,7 @@ def register_recurring_task( registered_task = RegisteredTask( task_identifier=task_identifier, - task_function=callable_, + task_callable=task_callable, task_type=TaskType.RECURRING, task_kwargs=task_kwargs, ) From c176c83c898d298993704edaa1a6855b748fba51 Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Wed, 28 May 2025 14:46:23 +0100 Subject: [PATCH 04/12] more fixes --- src/task_processor/models.py | 10 ++++------ src/task_processor/processor.py | 3 +++ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/task_processor/models.py b/src/task_processor/models.py index 58383b5d..63af6729 100644 --- a/src/task_processor/models.py +++ b/src/task_processor/models.py @@ -36,13 +36,11 @@ class Meta: abstract = True @property - def args(self) -> typing.List[typing.Any]: + def args(self) -> tuple[typing.Any, ...]: if self.serialized_args: args = self.deserialize_data(self.serialized_args) - if typing.TYPE_CHECKING: - assert isinstance(args, list) - return args - return [] + return tuple(args) + return () @property def kwargs(self) -> typing.Dict[str, typing.Any]: @@ -76,7 +74,7 @@ def run(self) -> None: @property def callable(self) -> TaskCallable[typing.Any]: task = get_task(self.task_identifier) - return task.task_handler.unwrapped + return task.task_callable class Task(AbstractBaseTask): diff --git a/src/task_processor/processor.py b/src/task_processor/processor.py index 24eeb9df..1a1da579 100644 --- a/src/task_processor/processor.py +++ b/src/task_processor/processor.py @@ -164,6 +164,9 @@ def _run_task( delay_until = e.delay_until or timezone.now() + relativedelta( seconds=settings.TASK_BACKOFF_DEFAULT_SECONDS, ) + assert registered_task.task_handler, ( + "Trying to back off a recurring task (currently not supported)" + ) registered_task.task_handler.delay( delay_until=delay_until, args=task.args, From f056bbd0a652574c2e732992234cd573b9cc44bb Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Wed, 28 May 2025 15:09:46 +0100 Subject: [PATCH 05/12] test fixes --- .../task_processor/test_unit_task_processor_decorators.py | 4 ++-- .../unit/task_processor/test_unit_task_processor_processor.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/task_processor/test_unit_task_processor_decorators.py b/tests/unit/task_processor/test_unit_task_processor_decorators.py index 9091aa35..7b528da2 100644 --- a/tests/unit/task_processor/test_unit_task_processor_decorators.py +++ b/tests/unit/task_processor/test_unit_task_processor_decorators.py @@ -14,7 +14,7 @@ register_recurring_task, register_task_handler, ) -from task_processor.exceptions import InvalidArgumentsError +from task_processor.exceptions import InvalidArgumentsError, TaskProcessingError from task_processor.models import RecurringTask, Task, TaskPriority from task_processor.task_registry import get_task, initialise from task_processor.task_run_method import TaskRunMethod @@ -143,7 +143,7 @@ def some_function(first_arg: str, second_arg: str) -> None: # Then assert not RecurringTask.objects.filter(task_identifier=task_identifier).exists() - with pytest.raises(KeyError): + with pytest.raises(TaskProcessingError): assert get_task(task_identifier) diff --git a/tests/unit/task_processor/test_unit_task_processor_processor.py b/tests/unit/task_processor/test_unit_task_processor_processor.py index 11a0ac4e..f14328b0 100644 --- a/tests/unit/task_processor/test_unit_task_processor_processor.py +++ b/tests/unit/task_processor/test_unit_task_processor_processor.py @@ -500,7 +500,7 @@ def test_run_task_runs_task_and_creates_task_run_object_when_failure( ), ( logging.DEBUG, - f"Running task {task.task_identifier} id={task.id} args=['{msg}'] kwargs={{}}", + f"Running task {task.task_identifier} id={task.id} args=('{msg}',) kwargs={{}}", ), ( logging.ERROR, From 671c9a5a8e6b9faff4fdea803de6f0403f806eb1 Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Wed, 28 May 2025 15:13:53 +0100 Subject: [PATCH 06/12] add coverage --- .../task_processor/test_unit_task_processor_models.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/unit/task_processor/test_unit_task_processor_models.py b/tests/unit/task_processor/test_unit_task_processor_models.py index 311b9bcd..0adc96b1 100644 --- a/tests/unit/task_processor/test_unit_task_processor_models.py +++ b/tests/unit/task_processor/test_unit_task_processor_models.py @@ -40,6 +40,17 @@ def test_task_run(mocker: MockerFixture) -> None: mock.assert_called_once_with(*args, **kwargs) +def test_task_args__no_data__return_expected() -> None: + # Given + task = Task( + task_identifier="test_task", + scheduled_for=timezone.now(), + ) + + # When & Then + assert task.args == () + + @pytest.mark.parametrize( "input, expected_output", ( From 53c83e22d90c3f661871071c71b46cb3680f40e4 Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Wed, 28 May 2025 15:34:28 +0100 Subject: [PATCH 07/12] add tests --- src/task_processor/processor.py | 2 +- .../test_unit_task_processor_processor.py | 78 ++++++++++++++++++- 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/src/task_processor/processor.py b/src/task_processor/processor.py index 1a1da579..a3e062f5 100644 --- a/src/task_processor/processor.py +++ b/src/task_processor/processor.py @@ -165,7 +165,7 @@ def _run_task( seconds=settings.TASK_BACKOFF_DEFAULT_SECONDS, ) assert registered_task.task_handler, ( - "Trying to back off a recurring task (currently not supported)" + "Attempt to back off a recurring task (currently not supported)" ) registered_task.task_handler.delay( delay_until=delay_until, diff --git a/tests/unit/task_processor/test_unit_task_processor_processor.py b/tests/unit/task_processor/test_unit_task_processor_processor.py index f14328b0..0c513c72 100644 --- a/tests/unit/task_processor/test_unit_task_processor_processor.py +++ b/tests/unit/task_processor/test_unit_task_processor_processor.py @@ -1,7 +1,7 @@ import logging import time import uuid -from datetime import timedelta +from datetime import datetime, timedelta from threading import Thread import pytest @@ -11,12 +11,13 @@ from pytest_django.fixtures import SettingsWrapper from pytest_mock import MockerFixture -from common.test_tools import AssertMetricFixture +from common.test_tools.types import AssertMetricFixture from task_processor.decorators import ( TaskHandler, register_recurring_task, register_task_handler, ) +from task_processor.exceptions import TaskBackoffError from task_processor.models import ( RecurringTask, RecurringTaskRun, @@ -636,6 +637,79 @@ def test_run_task_runs_tasks_in_correct_priority( assert task_runs_3[0].task == task_2 +@pytest.mark.parametrize( + "exception, expected_scheduled_for", + [ + (TaskBackoffError(), datetime.fromisoformat("2023-12-08T06:05:52+00:00")), + ( + TaskBackoffError( + delay_until=datetime.fromisoformat("2023-12-08T06:15:52+00:00") + ), + datetime.fromisoformat("2023-12-08T06:15:52+00:00"), + ), + ], +) +@pytest.mark.freeze_time("2023-12-08T06:05:47+00:00") +@pytest.mark.multi_database +@pytest.mark.task_processor_mode +def test_run_task__backoff__calls_expected( + exception: TaskBackoffError, + expected_scheduled_for: datetime, + current_database: str, + caplog: pytest.LogCaptureFixture, +) -> None: + # Given + @register_task_handler() + def backoff_task() -> None: + raise exception + + task = Task.create( + backoff_task.task_identifier, + scheduled_for=timezone.now(), + args=(), + priority=TaskPriority.HIGH, + ) + task.save(using=current_database) + + caplog.set_level(logging.INFO) + expected_log_message = f"Backoff requested. Task '{backoff_task.task_identifier}' set to retry at {expected_scheduled_for}" + + # When + run_tasks(current_database) + + # Then + assert [ + record.message for record in caplog.records if record.levelno == logging.INFO + ] == [expected_log_message] + assert Task.objects.using(current_database).count() == 2 + assert ( + Task.objects.using(current_database).latest("created_at").scheduled_for + == expected_scheduled_for + ) + + +@pytest.mark.multi_database +@pytest.mark.task_processor_mode +def test_run_task__backoff__recurring__raises_expected( + current_database: str, +) -> None: + # Given + @register_recurring_task(run_every=timedelta(seconds=1)) + def backoff_task() -> None: + raise TaskBackoffError() + + initialise() + + # When & Then + with pytest.raises(AssertionError) as exc_info: + run_recurring_tasks(current_database) + + assert ( + str(exc_info.value) + == "Attempt to back off a recurring task (currently not supported)" + ) + + @pytest.mark.multi_database def test_run_tasks__fails_if_not_in_task_processor_mode( current_database: str, From f652e4840533ce959b7ac8f7172bad31435484f9 Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Wed, 28 May 2025 15:39:04 +0100 Subject: [PATCH 08/12] better setting name, ensure that setting is used --- settings/dev.py | 2 +- src/task_processor/processor.py | 2 +- .../task_processor/test_unit_task_processor_processor.py | 5 ++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/settings/dev.py b/settings/dev.py index a02fd58c..73b0520f 100644 --- a/settings/dev.py +++ b/settings/dev.py @@ -58,7 +58,7 @@ ENABLE_CLEAN_UP_OLD_TASKS = True ENABLE_TASK_PROCESSOR_HEALTH_CHECK = True RECURRING_TASK_RUN_RETENTION_DAYS = 15 -TASK_BACKOFF_DEFAULT_SECONDS = 5 +TASK_BACKOFF_DEFAULT_DELAY_SECONDS = 5 TASK_DELETE_BATCH_SIZE = 2000 TASK_DELETE_INCLUDE_FAILED_TASKS = False TASK_DELETE_RETENTION_DAYS = 15 diff --git a/src/task_processor/processor.py b/src/task_processor/processor.py index a3e062f5..e64da004 100644 --- a/src/task_processor/processor.py +++ b/src/task_processor/processor.py @@ -162,7 +162,7 @@ def _run_task( if isinstance(e, TaskBackoffError): delay_until = e.delay_until or timezone.now() + relativedelta( - seconds=settings.TASK_BACKOFF_DEFAULT_SECONDS, + seconds=settings.TASK_BACKOFF_DEFAULT_DELAY_SECONDS, ) assert registered_task.task_handler, ( "Attempt to back off a recurring task (currently not supported)" diff --git a/tests/unit/task_processor/test_unit_task_processor_processor.py b/tests/unit/task_processor/test_unit_task_processor_processor.py index 0c513c72..f3bfb6f1 100644 --- a/tests/unit/task_processor/test_unit_task_processor_processor.py +++ b/tests/unit/task_processor/test_unit_task_processor_processor.py @@ -640,7 +640,7 @@ def test_run_task_runs_tasks_in_correct_priority( @pytest.mark.parametrize( "exception, expected_scheduled_for", [ - (TaskBackoffError(), datetime.fromisoformat("2023-12-08T06:05:52+00:00")), + (TaskBackoffError(), datetime.fromisoformat("2023-12-08T06:05:57+00:00")), ( TaskBackoffError( delay_until=datetime.fromisoformat("2023-12-08T06:15:52+00:00") @@ -656,9 +656,12 @@ def test_run_task__backoff__calls_expected( exception: TaskBackoffError, expected_scheduled_for: datetime, current_database: str, + settings: SettingsWrapper, caplog: pytest.LogCaptureFixture, ) -> None: # Given + settings.TASK_BACKOFF_DEFAULT_DELAY_SECONDS = 10 + @register_task_handler() def backoff_task() -> None: raise exception From d650160b4fac792edaecd67e4a64254121c75af5 Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Wed, 28 May 2025 16:04:45 +0100 Subject: [PATCH 09/12] switch to timedelta --- src/task_processor/processor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/task_processor/processor.py b/src/task_processor/processor.py index e64da004..4e6b0f45 100644 --- a/src/task_processor/processor.py +++ b/src/task_processor/processor.py @@ -5,7 +5,6 @@ from contextlib import ExitStack from datetime import timedelta -from dateutil.relativedelta import relativedelta from django.conf import settings from django.utils import timezone @@ -161,7 +160,7 @@ def _run_task( ) if isinstance(e, TaskBackoffError): - delay_until = e.delay_until or timezone.now() + relativedelta( + delay_until = e.delay_until or timezone.now() + timedelta( seconds=settings.TASK_BACKOFF_DEFAULT_DELAY_SECONDS, ) assert registered_task.task_handler, ( From b289839f3b88e2317695bd96a693d8c0220c5c1a Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Wed, 28 May 2025 16:39:36 +0100 Subject: [PATCH 10/12] decouple mypy and runtime asserts --- src/task_processor/processor.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/task_processor/processor.py b/src/task_processor/processor.py index 4e6b0f45..e0b9b4b7 100644 --- a/src/task_processor/processor.py +++ b/src/task_processor/processor.py @@ -19,7 +19,7 @@ TaskResult, TaskRun, ) -from task_processor.task_registry import get_task +from task_processor.task_registry import TaskType, get_task T = typing.TypeVar("T", bound=AbstractBaseTask) AnyTaskRun = TaskRun | RecurringTaskRun @@ -160,6 +160,12 @@ def _run_task( ) if isinstance(e, TaskBackoffError): + assert registered_task.task_type == TaskType.STANDARD, ( + "Attempt to back off a recurring task (currently not supported)" + ) + if typing.TYPE_CHECKING: + assert isinstance(task, Task) + assert registered_task.task_handler delay_until = e.delay_until or timezone.now() + timedelta( seconds=settings.TASK_BACKOFF_DEFAULT_DELAY_SECONDS, ) From 9de8df5e05e2ffffbb9ea7663409827962fbae6d Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Wed, 28 May 2025 17:43:49 +0100 Subject: [PATCH 11/12] exclude backed off tasks from retries, revert registry changes --- src/task_processor/decorators.py | 2 +- src/task_processor/models.py | 2 +- src/task_processor/processor.py | 12 ++---------- src/task_processor/task_registry.py | 15 +++++---------- .../test_unit_task_processor_processor.py | 7 ++----- 5 files changed, 11 insertions(+), 27 deletions(-) diff --git a/src/task_processor/decorators.py b/src/task_processor/decorators.py index 522579b4..0749f5eb 100644 --- a/src/task_processor/decorators.py +++ b/src/task_processor/decorators.py @@ -49,7 +49,7 @@ def __init__( f, task_name, ) - task_registry.register_task(task_identifier, self) + task_registry.register_task(task_identifier, f) def __call__( self, diff --git a/src/task_processor/models.py b/src/task_processor/models.py index 63af6729..e7782fe1 100644 --- a/src/task_processor/models.py +++ b/src/task_processor/models.py @@ -74,7 +74,7 @@ def run(self) -> None: @property def callable(self) -> TaskCallable[typing.Any]: task = get_task(self.task_identifier) - return task.task_callable + return task.task_function class Task(AbstractBaseTask): diff --git a/src/task_processor/processor.py b/src/task_processor/processor.py index e0b9b4b7..3387054b 100644 --- a/src/task_processor/processor.py +++ b/src/task_processor/processor.py @@ -51,7 +51,7 @@ def run_tasks(database: str, num_tasks: int = 1) -> list[TaskRun]: if executed_tasks: Task.objects.using(database).bulk_update( executed_tasks, - fields=["completed", "num_failures", "is_locked"], + fields=["completed", "num_failures", "is_locked", "scheduled_for"], ) if task_runs: @@ -165,18 +165,10 @@ def _run_task( ) if typing.TYPE_CHECKING: assert isinstance(task, Task) - assert registered_task.task_handler delay_until = e.delay_until or timezone.now() + timedelta( seconds=settings.TASK_BACKOFF_DEFAULT_DELAY_SECONDS, ) - assert registered_task.task_handler, ( - "Attempt to back off a recurring task (currently not supported)" - ) - registered_task.task_handler.delay( - delay_until=delay_until, - args=task.args, - kwargs=task.kwargs, - ) + task.scheduled_for = delay_until logger.info( "Backoff requested. Task '%s' set to retry at %s", task_identifier, diff --git a/src/task_processor/task_registry.py b/src/task_processor/task_registry.py index bf823410..eb1d3c45 100644 --- a/src/task_processor/task_registry.py +++ b/src/task_processor/task_registry.py @@ -6,9 +6,6 @@ from task_processor.exceptions import TaskProcessingError from task_processor.types import TaskCallable -if typing.TYPE_CHECKING: - from task_processor.decorators import TaskHandler # noqa: F401 - logger = logging.getLogger(__name__) @@ -20,9 +17,8 @@ class TaskType(enum.Enum): @dataclass class RegisteredTask: task_identifier: str - task_callable: TaskCallable[typing.Any] + task_function: TaskCallable[typing.Any] task_type: TaskType = TaskType.STANDARD - task_handler: "TaskHandler[typing.Any] | None" = None task_kwargs: dict[str, typing.Any] | None = None @@ -60,21 +56,20 @@ def get_task(task_identifier: str) -> RegisteredTask: def register_task( task_identifier: str, - task_handler: "TaskHandler[typing.Any]", + callable_: TaskCallable[typing.Any], ) -> None: global registered_tasks registered_task = RegisteredTask( task_identifier=task_identifier, - task_handler=task_handler, - task_callable=task_handler.unwrapped, + task_function=callable_, ) registered_tasks[task_identifier] = registered_task def register_recurring_task( task_identifier: str, - task_callable: TaskCallable[typing.Any], + callable_: TaskCallable[typing.Any], **task_kwargs: typing.Any, ) -> None: global registered_tasks @@ -83,7 +78,7 @@ def register_recurring_task( registered_task = RegisteredTask( task_identifier=task_identifier, - task_callable=task_callable, + task_function=callable_, task_type=TaskType.RECURRING, task_kwargs=task_kwargs, ) diff --git a/tests/unit/task_processor/test_unit_task_processor_processor.py b/tests/unit/task_processor/test_unit_task_processor_processor.py index f3bfb6f1..2fb982bf 100644 --- a/tests/unit/task_processor/test_unit_task_processor_processor.py +++ b/tests/unit/task_processor/test_unit_task_processor_processor.py @@ -684,11 +684,8 @@ def backoff_task() -> None: assert [ record.message for record in caplog.records if record.levelno == logging.INFO ] == [expected_log_message] - assert Task.objects.using(current_database).count() == 2 - assert ( - Task.objects.using(current_database).latest("created_at").scheduled_for - == expected_scheduled_for - ) + task.refresh_from_db(using=current_database) + assert task.scheduled_for == expected_scheduled_for @pytest.mark.multi_database From af18bca467b8af164e99822927f11d25a5f238eb Mon Sep 17 00:00:00 2001 From: Kim Gustyr Date: Wed, 28 May 2025 18:05:52 +0100 Subject: [PATCH 12/12] noop if max attempts reached --- src/task_processor/processor.py | 19 ++++++----- .../test_unit_task_processor_processor.py | 34 ++++++++++++++++++- 2 files changed, 43 insertions(+), 10 deletions(-) diff --git a/src/task_processor/processor.py b/src/task_processor/processor.py index 3387054b..501c9934 100644 --- a/src/task_processor/processor.py +++ b/src/task_processor/processor.py @@ -165,15 +165,16 @@ def _run_task( ) if typing.TYPE_CHECKING: assert isinstance(task, Task) - delay_until = e.delay_until or timezone.now() + timedelta( - seconds=settings.TASK_BACKOFF_DEFAULT_DELAY_SECONDS, - ) - task.scheduled_for = delay_until - logger.info( - "Backoff requested. Task '%s' set to retry at %s", - task_identifier, - delay_until, - ) + if task.num_failures <= 3: + delay_until = e.delay_until or timezone.now() + timedelta( + seconds=settings.TASK_BACKOFF_DEFAULT_DELAY_SECONDS, + ) + task.scheduled_for = delay_until + logger.info( + "Backoff requested. Task '%s' set to retry at %s", + task_identifier, + delay_until, + ) labels = { "task_identifier": task_identifier, diff --git a/tests/unit/task_processor/test_unit_task_processor_processor.py b/tests/unit/task_processor/test_unit_task_processor_processor.py index 2fb982bf..52b05f39 100644 --- a/tests/unit/task_processor/test_unit_task_processor_processor.py +++ b/tests/unit/task_processor/test_unit_task_processor_processor.py @@ -652,7 +652,7 @@ def test_run_task_runs_tasks_in_correct_priority( @pytest.mark.freeze_time("2023-12-08T06:05:47+00:00") @pytest.mark.multi_database @pytest.mark.task_processor_mode -def test_run_task__backoff__calls_expected( +def test_run_task__backoff__persists_expected( exception: TaskBackoffError, expected_scheduled_for: datetime, current_database: str, @@ -710,6 +710,38 @@ def backoff_task() -> None: ) +@pytest.mark.multi_database +@pytest.mark.task_processor_mode +def test_run_task__backoff__max_num_failures__noop( + current_database: str, + caplog: pytest.LogCaptureFixture, +) -> None: + # Given + @register_task_handler() + def backoff_task() -> None: + raise TaskBackoffError() + + expected_scheduled_for = timezone.now() + task = Task.create( + backoff_task.task_identifier, + scheduled_for=expected_scheduled_for, + args=(), + priority=TaskPriority.HIGH, + ) + task.num_failures = 4 + task.save(using=current_database) + + caplog.set_level(logging.INFO) + + # When + run_tasks(current_database) + + # Then + task.refresh_from_db(using=current_database) + assert task.scheduled_for == expected_scheduled_for + assert not [record for record in caplog.records if record.levelno == logging.INFO] + + @pytest.mark.multi_database def test_run_tasks__fails_if_not_in_task_processor_mode( current_database: str,