From aeaee54f7ce569f9e70eea136f199cf596fd3d4d Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Tue, 28 Apr 2026 16:57:29 +0530 Subject: [PATCH 1/3] feat: persist RecurringTaskRun before _run_task MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Save the RecurringTaskRun row before dispatching to `_run_task` instead of after. The pre-saved row leaves a recoverable artefact if the worker process is killed mid-task — a row with `started_at` set, `finished_at` and `result` null — which a later run can reconcile when the SQL reaper unlocks the abandoned task. `_run_task` gains an optional `task_run` parameter; if supplied, it mutates that row instead of creating a new in-memory one. The post-run save in `run_recurring_tasks` becomes an UPDATE on the persisted row (`update_fields=["finished_at", "result", "error_details"]`). --- src/task_processor/processor.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/task_processor/processor.py b/src/task_processor/processor.py index 7081c186..d8cc1554 100644 --- a/src/task_processor/processor.py +++ b/src/task_processor/processor.py @@ -91,9 +91,14 @@ def run_recurring_task(database: str) -> RecurringTaskRun | None: task_run: RecurringTaskRun | None = None if task.should_execute: - task, run = _run_task(task) - assert isinstance(run, RecurringTaskRun) - task_run = run + # Persist the task run before execution so that, if the worker is + # killed mid-task, we still have a row we can later mark as timed + # out when the task is unlocked by the timeout-based reaper in + # `get_recurringtasks_to_process`. + task_run = RecurringTaskRun(started_at=timezone.now(), task=task) + task_run.save(using=database) + task, run = _run_task(task, task_run=task_run) + assert run is task_run # task.run() may have idled the DB connection past the server's # session timeout; drop stale connections so the saves below open # a fresh one. See Sentry FLAGSMITH-API-5EM. @@ -104,7 +109,10 @@ def run_recurring_task(database: str) -> RecurringTaskRun | None: task.save(using=database, update_fields=["is_locked", "locked_at"]) if task_run: - task_run.save(using=database) + task_run.save( + using=database, + update_fields=["finished_at", "result", "error_details"], + ) logger.debug(f"Finished running recurring task '{task.task_identifier}'") return task_run @@ -113,6 +121,7 @@ def run_recurring_task(database: str) -> RecurringTaskRun | None: def _run_task( task: T, + task_run: AnyTaskRun | None = None, ) -> typing.Tuple[T, AnyTaskRun]: assert settings.TASK_PROCESSOR_MODE, ( "Attempt to run tasks in a non-task-processor environment" @@ -128,7 +137,8 @@ def _run_task( logger.debug( f"Running task {task_identifier} id={task.pk} args={task.args} kwargs={task.kwargs}" ) - task_run: AnyTaskRun = task.task_runs.model(started_at=timezone.now(), task=task) # type: ignore[attr-defined] + if task_run is None: + task_run = task.task_runs.model(started_at=timezone.now(), task=task) # type: ignore[attr-defined] result: str executor = None From 07dff4c4020e095b342b3285ff600c28ff2a599e Mon Sep 17 00:00:00 2001 From: Gagan Trivedi Date: Wed, 29 Apr 2026 08:31:10 +0530 Subject: [PATCH 2/3] feat: reconcile abandoned RecurringTaskRun rows on pickup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a worker dies mid-task (SIGKILL, OOM, host crash), the RecurringTaskRun row persisted before `_run_task` is left with `result IS NULL`. The SQL reaper eventually unlocks the task because its `locked_at` exceeded the timeout grace window. On the next pickup, mark every such abandoned row as `FAILURE` with a distinguishing `error_details` message. The reconciliation lives on `RecurringTask.reconcile_abandoned_run` alongside `should_execute`/`unlock`, called once from `run_recurring_task` immediately after the task is fetched. There is at most one orphan row per task — `FOR UPDATE SKIP LOCKED` and the `is_locked` gate in `get_recurringtasks_to_process` ensure two workers can't both leave orphans for the same task — so the method uses `.first()` and falls through cheaply when no orphan exists. Marking abandoned runs as `FAILURE` lets `should_execute`'s existing failure-count branch count them toward backoff without any additional logic there. --- src/task_processor/exceptions.py | 10 +++ src/task_processor/models.py | 26 +++++++- src/task_processor/processor.py | 7 +- .../test_unit_task_processor_models.py | 64 ++++++++++++++++++- .../test_unit_task_processor_processor.py | 45 +++++++++++-- 5 files changed, 142 insertions(+), 10 deletions(-) diff --git a/src/task_processor/exceptions.py b/src/task_processor/exceptions.py index b09bc688..59bc243b 100644 --- a/src/task_processor/exceptions.py +++ b/src/task_processor/exceptions.py @@ -26,3 +26,13 @@ def __init__( class TaskQueueFullError(Exception): pass + + +class TaskAbandonedError(TaskProcessingError): + """ + Marker error for recurring task runs whose worker died before + recording the result (process killed, OOM, host evicted, DB + connection lost during the post-execution save). Never raised — + used as the prefix in `error_details` so monitoring and log scrapers + can match on a single authoritative class name. + """ diff --git a/src/task_processor/models.py b/src/task_processor/models.py index 47851f92..5f4db0da 100644 --- a/src/task_processor/models.py +++ b/src/task_processor/models.py @@ -1,3 +1,4 @@ +import logging import typing import uuid from datetime import datetime, timedelta @@ -7,11 +8,13 @@ from django.db import models from django.utils import timezone -from task_processor.exceptions import TaskQueueFullError +from task_processor.exceptions import TaskAbandonedError, TaskQueueFullError from task_processor.managers import RecurringTaskManager, TaskManager from task_processor.task_registry import get_task, registered_tasks from task_processor.types import TaskCallable, TraceContext +logger = logging.getLogger(__name__) + _django_json_encoder_default = DjangoJSONEncoder().default @@ -172,6 +175,27 @@ def unlock(self) -> None: self.is_locked = False self.locked_at = None + def reconcile_abandoned_run(self) -> None: + # if for some reason the worker died before before writing the task run result + # we mark that run as explict failure here + abandoned_run = self.task_runs.filter(result__isnull=True).first() + if abandoned_run is None: + return + abandoned_run.finished_at = timezone.now() + abandoned_run.result = TaskResult.FAILURE.value + abandoned_run.error_details = ( + f"{TaskAbandonedError.__name__}: " + "no result was written before the SQL reaper unlocked the task" + ) + abandoned_run.save( + update_fields=["finished_at", "result", "error_details"], + ) + logger.error( + "Recurring task '%s' was abandoned: %s", + self.task_identifier, + abandoned_run.error_details, + ) + @property def should_execute(self) -> bool: now = timezone.now() diff --git a/src/task_processor/processor.py b/src/task_processor/processor.py index d8cc1554..114b71d0 100644 --- a/src/task_processor/processor.py +++ b/src/task_processor/processor.py @@ -80,6 +80,8 @@ def run_recurring_task(database: str) -> RecurringTaskRun | None: logger.debug(f"Running recurring task '{task.task_identifier}'") + task.reconcile_abandoned_run() + if not task.is_task_registered: # This is necessary to ensure that old instances of the task processor, # which may still be running during deployment, do not remove tasks added by new instances. @@ -109,10 +111,7 @@ def run_recurring_task(database: str) -> RecurringTaskRun | None: task.save(using=database, update_fields=["is_locked", "locked_at"]) if task_run: - task_run.save( - using=database, - update_fields=["finished_at", "result", "error_details"], - ) + task_run.save(using=database) logger.debug(f"Finished running recurring task '{task.task_identifier}'") return task_run diff --git a/tests/unit/task_processor/test_unit_task_processor_models.py b/tests/unit/task_processor/test_unit_task_processor_models.py index 6da31acc..1772f959 100644 --- a/tests/unit/task_processor/test_unit_task_processor_models.py +++ b/tests/unit/task_processor/test_unit_task_processor_models.py @@ -7,7 +7,7 @@ from pytest_mock import MockerFixture from task_processor.decorators import register_task_handler -from task_processor.models import RecurringTask, Task +from task_processor.models import RecurringTask, RecurringTaskRun, Task, TaskResult from task_processor.task_registry import initialise now = timezone.now() @@ -144,3 +144,65 @@ def test_task_create__trace_context__persists_expected( # Then task.refresh_from_db() assert task.trace_context == trace_context + + +@pytest.mark.django_db +def test_recurring_task_reconcile_abandoned_run__no_abandoned_run__noop() -> None: + # Given - a task with one completed run and no abandoned rows + task = RecurringTask.objects.create( + task_identifier="test_recurring_task", + run_every=timedelta(seconds=1), + ) + finished_at = timezone.now() + finished_run = RecurringTaskRun.objects.create( + task=task, + started_at=finished_at - timedelta(seconds=1), + finished_at=finished_at, + result=TaskResult.SUCCESS.value, + ) + + # When + task.reconcile_abandoned_run() + + # Then - the finished run is untouched + finished_run.refresh_from_db() + assert finished_run.result == TaskResult.SUCCESS.value + assert finished_run.finished_at == finished_at + assert finished_run.error_details is None + + +@pytest.mark.django_db +def test_recurring_task_reconcile_abandoned_run__finished_run_present__only_abandoned_touched() -> ( + None +): + # Given - a task with both a completed run and an abandoned run + task = RecurringTask.objects.create( + task_identifier="test_recurring_task", + run_every=timedelta(seconds=1), + ) + finished_started_at = timezone.now() - timedelta(hours=2) + finished_at = timezone.now() - timedelta(hours=1) + finished_run = RecurringTaskRun.objects.create( + task=task, + started_at=finished_started_at, + finished_at=finished_at, + result=TaskResult.SUCCESS.value, + ) + abandoned_run = RecurringTaskRun.objects.create( + task=task, + started_at=timezone.now() - timedelta(minutes=30), + ) + + # When + task.reconcile_abandoned_run() + + # Then - only the abandoned row is marked FAILURE + abandoned_run.refresh_from_db() + assert abandoned_run.result == TaskResult.FAILURE.value + assert abandoned_run.finished_at is not None + assert abandoned_run.error_details + + finished_run.refresh_from_db() + assert finished_run.result == TaskResult.SUCCESS.value + assert finished_run.finished_at == finished_at + assert finished_run.error_details is None diff --git a/tests/unit/task_processor/test_unit_task_processor_processor.py b/tests/unit/task_processor/test_unit_task_processor_processor.py index 0f011ea6..953fccd9 100644 --- a/tests/unit/task_processor/test_unit_task_processor_processor.py +++ b/tests/unit/task_processor/test_unit_task_processor_processor.py @@ -22,7 +22,7 @@ register_recurring_task, register_task_handler, ) -from task_processor.exceptions import TaskBackoffError +from task_processor.exceptions import TaskAbandonedError, TaskBackoffError from task_processor.models import ( RecurringTask, RecurringTaskRun, @@ -288,6 +288,43 @@ def _dummy_recurring_task() -> None: assert task.locked_at is None +@pytest.mark.multi_database(transaction=True) +@pytest.mark.task_processor_mode +def test_run_recurring_task__abandoned_run__reconciled_as_failure( + current_database: str, +) -> None: + # Given - a recurring task with a stale lock and a pre-saved + # RecurringTaskRun row that a previous worker left behind when it + # died mid-task (result/finished_at still null). + @register_recurring_task(run_every=timedelta(seconds=1)) + def _dummy_recurring_task() -> None: + pass + + initialise() + + task = RecurringTask.objects.using(current_database).get( + task_identifier="test_unit_task_processor_processor._dummy_recurring_task", + ) + abandoned_run = RecurringTaskRun.objects.using(current_database).create( + task=task, + started_at=timezone.now() - timedelta(hours=1), + ) + task.is_locked = True + task.locked_at = timezone.now() - timedelta(hours=1) + task.save(using=current_database) + + # When + run_recurring_task(current_database) + + # Then - the abandoned row is marked as FAILURE with a distinguishing + # error message + abandoned_run.refresh_from_db(using=current_database) + assert abandoned_run.result == TaskResult.FAILURE.value + assert abandoned_run.finished_at is not None + assert abandoned_run.error_details is not None + assert TaskAbandonedError.__name__ in abandoned_run.error_details + + @pytest.mark.multi_database(transaction=True) @pytest.mark.task_processor_mode def test_run_recurring_task__multiple_runs__executes_expected_times( @@ -344,15 +381,15 @@ def test_run_recurring_task__multiple_tasks__loops_over_all( settings: SettingsWrapper, ) -> None: # Given, Three recurring tasks - @register_recurring_task(run_every=timedelta(milliseconds=200)) + @register_recurring_task(run_every=timedelta(hours=1)) def _dummy_recurring_task_1() -> None: pass - @register_recurring_task(run_every=timedelta(milliseconds=200)) + @register_recurring_task(run_every=timedelta(hours=1)) def _dummy_recurring_task_2() -> None: pass - @register_recurring_task(run_every=timedelta(milliseconds=200)) + @register_recurring_task(run_every=timedelta(hours=1)) def _dummy_recurring_task_3() -> None: pass From fa0c6e0bada9cb05fc236f2100bb05179f215eba Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 30 Apr 2026 08:53:27 +0000 Subject: [PATCH 3/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/task_processor/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/task_processor/models.py b/src/task_processor/models.py index 5f4db0da..de9069ba 100644 --- a/src/task_processor/models.py +++ b/src/task_processor/models.py @@ -177,7 +177,7 @@ def unlock(self) -> None: def reconcile_abandoned_run(self) -> None: # if for some reason the worker died before before writing the task run result - # we mark that run as explict failure here + # we mark that run as explict failure here abandoned_run = self.task_runs.filter(result__isnull=True).first() if abandoned_run is None: return