Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/task_processor/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,8 @@ def get_tasks_to_process(self, num_tasks: int) -> "RawQuerySet[Task]":


class RecurringTaskManager(Manager["RecurringTask"]):
def get_tasks_to_process(self) -> "RawQuerySet[RecurringTask]":
return self.raw("SELECT * FROM get_recurringtasks_to_process()")
def get_task_to_process(self) -> "RecurringTask | None":
return next(
iter(self.raw("SELECT * FROM get_recurringtasks_to_process()")),
None,
)
75 changes: 37 additions & 38 deletions src/task_processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from importlib.metadata import version

from django.conf import settings
from django.db import close_old_connections
from django.utils import timezone
from opentelemetry import context as otel_context
from opentelemetry import propagate, trace
Expand Down Expand Up @@ -68,48 +69,46 @@ def run_tasks(database: str, num_tasks: int = 1) -> list[TaskRun]:
return []


def run_recurring_tasks(database: str) -> list[RecurringTaskRun]:
def run_recurring_task(database: str) -> RecurringTaskRun | None:
# NOTE: We will probably see a lot of delay in the execution of recurring tasks
# if the tasks take longer then `run_every` to execute. This is not
# a problem for now, but we should be mindful of this limitation
task_manager: RecurringTaskManager = RecurringTask.objects.db_manager(database)
tasks = task_manager.get_tasks_to_process()
if tasks:
logger.debug(f"Running {len(tasks)} recurring task(s)")

task_runs = []

for task in tasks:
if not task.is_task_registered:
# This is necessary to ensure that old instances of the task processor,
# which may still be running during deployment, do not remove tasks added by new instances.
# Reference: https://github.com/Flagsmith/flagsmith/issues/2551
task_age = timezone.now() - task.created_at
if task_age > UNREGISTERED_RECURRING_TASK_GRACE_PERIOD:
task.delete(using=database)
continue

if task.should_execute:
task, task_run = _run_task(task)
assert isinstance(task_run, RecurringTaskRun)
task_runs.append(task_run)
else:
task.unlock()

# update all tasks that were not deleted
to_update = [task for task in tasks if task.id]
RecurringTask.objects.using(database).bulk_update(
to_update,
fields=["is_locked", "locked_at"],
)

if task_runs:
RecurringTaskRun.objects.using(database).bulk_create(task_runs)
logger.debug(f"Finished running {len(task_runs)} recurring task(s)")

return task_runs

return []
task = task_manager.get_task_to_process()
if task is None:
return None

logger.debug(f"Running recurring task '{task.task_identifier}'")

if not task.is_task_registered:
# This is necessary to ensure that old instances of the task processor,
# which may still be running during deployment, do not remove tasks added by new instances.
# Reference: https://github.com/Flagsmith/flagsmith/issues/2551
task_age = timezone.now() - task.created_at
if task_age > UNREGISTERED_RECURRING_TASK_GRACE_PERIOD:
task.delete(using=database)
return None

task_run: RecurringTaskRun | None = None
if task.should_execute:
task, run = _run_task(task)
assert isinstance(run, RecurringTaskRun)
task_run = run
# task.run() may have idled the DB connection past the server's
# session timeout; drop stale connections so the saves below open
# a fresh one. See Sentry FLAGSMITH-API-5EM.
close_old_connections()
else:
task.unlock()

task.save(using=database, update_fields=["is_locked", "locked_at"])

if task_run:
task_run.save(using=database)
logger.debug(f"Finished running recurring task '{task.task_identifier}'")
return task_run

return None


def _run_task(
Expand Down
4 changes: 2 additions & 2 deletions src/task_processor/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from django.db import close_old_connections
from django.utils import timezone

from task_processor.processor import run_recurring_tasks, run_tasks
from task_processor.processor import run_recurring_task, run_tasks
from task_processor.task_registry import initialise
from task_processor.types import TaskProcessorConfig

Expand Down Expand Up @@ -111,7 +111,7 @@ def run_iteration(self) -> None:

# Recurring tasks are only run on one database
if (database == "default") ^ database_is_separate:
run_recurring_tasks(database)
run_recurring_task(database)
except Exception as exception:
# To prevent task threads from dying if they get an error retrieving the tasks from the
# database this will allow the thread to continue trying to retrieve tasks if it can
Expand Down
Loading