Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions settings/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,9 @@
DOCGEN_MODE = env.bool("DOCGEN_MODE", default=False)
TASK_RUN_METHOD = TaskRunMethod.TASK_PROCESSOR

# To be used by test that depend on task processor to run tasks manually by
# calling `run_tasks`
TASK_PROCESSOR_MANUAL_MODE = env.bool("TASK_PROCESSOR_MANUAL_MODE", default=False)

# Avoid models.W042 warnings
DEFAULT_AUTO_FIELD = "django.db.models.AutoField"
2 changes: 1 addition & 1 deletion src/task_processor/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def register_recurring_task(
first_run_time: time | None = None,
timeout: timedelta | None = timedelta(minutes=30),
) -> typing.Callable[[TaskCallable[TaskParameters]], TaskCallable[TaskParameters]]:
if not settings.TASK_PROCESSOR_MODE:
if not settings.TASK_PROCESSOR_MODE or settings.TASK_PROCESSOR_MANUAL_MODE:
# Do not register recurring tasks if not invoked by task processor
return lambda f: f

Expand Down
62 changes: 42 additions & 20 deletions src/task_processor/processor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
import logging
import traceback
import typing
Expand Down Expand Up @@ -109,33 +110,65 @@ def run_recurring_tasks(database: str) -> list[RecurringTaskRun]:
return []


def task_metrics(
func: typing.Callable[[T], typing.Tuple[T, AnyTaskRun]],
) -> typing.Callable[[T], typing.Tuple[T, AnyTaskRun]]:
@functools.wraps(func)
def wrapper(
task: T, *args: typing.Any, **kwargs: typing.Any
) -> typing.Tuple[T, AnyTaskRun]:
# Only collect metrics when TASK_PROCESSOR_MODE is True
if not settings.TASK_PROCESSOR_MODE:
return func(task, *args, **kwargs)

ctx = ExitStack()
timer = metrics.flagsmith_task_processor_task_duration_seconds.time()
ctx.enter_context(timer)

task_obj, task_run = func(task, *args, **kwargs)
result = task_run.result

# Get task info for labels
task_identifier = task.task_identifier
registered_task = get_task(task_identifier)

labels = {
"task_identifier": task_identifier,
"task_type": registered_task.task_type.value.lower(),
"result": result.lower(),
} # type: ignore[union-attr]

timer.labels(**labels) # type: ignore[no-untyped-call]
ctx.close()
metrics.flagsmith_task_processor_finished_tasks_total.labels(**labels).inc()

return task_obj, task_run

return wrapper


@task_metrics
def _run_task(
task: T,
) -> typing.Tuple[T, AnyTaskRun]:
assert settings.TASK_PROCESSOR_MODE, (
assert settings.TASK_PROCESSOR_MODE or settings.TASK_PROCESSOR_MANUAL_MODE, (
"Attempt to run tasks in a non-task-processor environment"
)

ctx = ExitStack()
timer = metrics.flagsmith_task_processor_task_duration_seconds.time()
ctx.enter_context(timer)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to do this because this is evaluated at import time as a result flagsmith_task_processor_task_duration_seconds is not part of the metrics

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe reload_metrics should solve this? See usage

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That still feels like a hack to me. I want this to be a first-class feature so that anyone updating or changing the task processor code is aware of this use case as well


task_identifier = task.task_identifier
registered_task = get_task(task_identifier)

logger.debug(
f"Running task {task_identifier} id={task.pk} args={task.args} kwargs={task.kwargs}"
)
task_run: AnyTaskRun = task.task_runs.model(started_at=timezone.now(), task=task) # type: ignore[attr-defined]
result: str

try:
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(task.run)
timeout = task.timeout.total_seconds() if task.timeout else None
future.result(timeout=timeout) # Wait for completion or timeout

task_run.result = result = TaskResult.SUCCESS.value
task_run.result = TaskResult.SUCCESS.value
task_run.finished_at = timezone.now()
task.mark_success()

Expand All @@ -148,7 +181,7 @@ def _run_task(

task.mark_failure()

task_run.result = result = TaskResult.FAILURE.value
task_run.result = TaskResult.FAILURE.value
task_run.error_details = str(traceback.format_exc())

logger.error(
Expand Down Expand Up @@ -176,15 +209,4 @@ def _run_task(
delay_until,
)

labels = {
"task_identifier": task_identifier,
"task_type": registered_task.task_type.value.lower(),
"result": result.lower(),
}

timer.labels(**labels) # type: ignore[no-untyped-call]
ctx.close()

metrics.flagsmith_task_processor_finished_tasks_total.labels(**labels).inc()

return task, task_run
Loading