diff --git a/settings/dev.py b/settings/dev.py index 73b0520f..f0e42c03 100644 --- a/settings/dev.py +++ b/settings/dev.py @@ -68,5 +68,9 @@ DOCGEN_MODE = env.bool("DOCGEN_MODE", default=False) TASK_RUN_METHOD = TaskRunMethod.TASK_PROCESSOR +# To be used by test that depend on task processor to run tasks manually by +# calling `run_tasks` +TASK_PROCESSOR_MANUAL_MODE = env.bool("TASK_PROCESSOR_MANUAL_MODE", default=False) + # Avoid models.W042 warnings DEFAULT_AUTO_FIELD = "django.db.models.AutoField" diff --git a/src/task_processor/decorators.py b/src/task_processor/decorators.py index 0749f5eb..49cd8efa 100644 --- a/src/task_processor/decorators.py +++ b/src/task_processor/decorators.py @@ -177,7 +177,7 @@ def register_recurring_task( first_run_time: time | None = None, timeout: timedelta | None = timedelta(minutes=30), ) -> typing.Callable[[TaskCallable[TaskParameters]], TaskCallable[TaskParameters]]: - if not settings.TASK_PROCESSOR_MODE: + if not settings.TASK_PROCESSOR_MODE or settings.TASK_PROCESSOR_MANUAL_MODE: # Do not register recurring tasks if not invoked by task processor return lambda f: f diff --git a/src/task_processor/processor.py b/src/task_processor/processor.py index 501c9934..8d4f6476 100644 --- a/src/task_processor/processor.py +++ b/src/task_processor/processor.py @@ -1,3 +1,4 @@ +import functools import logging import traceback import typing @@ -109,17 +110,50 @@ def run_recurring_tasks(database: str) -> list[RecurringTaskRun]: return [] +def task_metrics( + func: typing.Callable[[T], typing.Tuple[T, AnyTaskRun]], +) -> typing.Callable[[T], typing.Tuple[T, AnyTaskRun]]: + @functools.wraps(func) + def wrapper( + task: T, *args: typing.Any, **kwargs: typing.Any + ) -> typing.Tuple[T, AnyTaskRun]: + # Only collect metrics when TASK_PROCESSOR_MODE is True + if not settings.TASK_PROCESSOR_MODE: + return func(task, *args, **kwargs) + + ctx = ExitStack() + timer = metrics.flagsmith_task_processor_task_duration_seconds.time() + ctx.enter_context(timer) + + task_obj, task_run = func(task, *args, **kwargs) + result = task_run.result + + # Get task info for labels + task_identifier = task.task_identifier + registered_task = get_task(task_identifier) + + labels = { + "task_identifier": task_identifier, + "task_type": registered_task.task_type.value.lower(), + "result": result.lower(), + } # type: ignore[union-attr] + + timer.labels(**labels) # type: ignore[no-untyped-call] + ctx.close() + metrics.flagsmith_task_processor_finished_tasks_total.labels(**labels).inc() + + return task_obj, task_run + + return wrapper + + +@task_metrics def _run_task( task: T, ) -> typing.Tuple[T, AnyTaskRun]: - assert settings.TASK_PROCESSOR_MODE, ( + assert settings.TASK_PROCESSOR_MODE or settings.TASK_PROCESSOR_MANUAL_MODE, ( "Attempt to run tasks in a non-task-processor environment" ) - - ctx = ExitStack() - timer = metrics.flagsmith_task_processor_task_duration_seconds.time() - ctx.enter_context(timer) - task_identifier = task.task_identifier registered_task = get_task(task_identifier) @@ -127,7 +161,6 @@ def _run_task( f"Running task {task_identifier} id={task.pk} args={task.args} kwargs={task.kwargs}" ) task_run: AnyTaskRun = task.task_runs.model(started_at=timezone.now(), task=task) # type: ignore[attr-defined] - result: str try: with ThreadPoolExecutor(max_workers=1) as executor: @@ -135,7 +168,7 @@ def _run_task( timeout = task.timeout.total_seconds() if task.timeout else None future.result(timeout=timeout) # Wait for completion or timeout - task_run.result = result = TaskResult.SUCCESS.value + task_run.result = TaskResult.SUCCESS.value task_run.finished_at = timezone.now() task.mark_success() @@ -148,7 +181,7 @@ def _run_task( task.mark_failure() - task_run.result = result = TaskResult.FAILURE.value + task_run.result = TaskResult.FAILURE.value task_run.error_details = str(traceback.format_exc()) logger.error( @@ -176,15 +209,4 @@ def _run_task( delay_until, ) - labels = { - "task_identifier": task_identifier, - "task_type": registered_task.task_type.value.lower(), - "result": result.lower(), - } - - timer.labels(**labels) # type: ignore[no-untyped-call] - ctx.close() - - metrics.flagsmith_task_processor_finished_tasks_total.labels(**labels).inc() - return task, task_run