From 215ba0369f1a34e13110b3b8fc2cb84931754918 Mon Sep 17 00:00:00 2001 From: David Svenson Date: Tue, 31 Oct 2023 16:50:00 +0100 Subject: [PATCH] Mark jobs as failed persistently. --- dpq/commands.py | 7 +++---- dpq/migrations/0004_job_failed.py | 18 ++++++++++++++++++ dpq/models.py | 7 ++++--- 3 files changed, 25 insertions(+), 7 deletions(-) create mode 100644 dpq/migrations/0004_job_failed.py diff --git a/dpq/commands.py b/dpq/commands.py index 7223db6..5d67dd4 100644 --- a/dpq/commands.py +++ b/dpq/commands.py @@ -36,20 +36,19 @@ def run_available_tasks(self): """ Runs tasks continuously until there are no more available. """ - # Prevents tasks that failed from blocking others. - failed_tasks = set() while True: self._in_task = True - result = self.queue.run_once(exclude_ids=failed_tasks) + result = self.queue.run_once() job, retval, exc = result or (None, None, None) if exc: if job: + job.failed = True + job.save(update_fields=["failed"]) self.logger.exception('Error in %r: %r.', job, exc, extra={ 'data': { 'job': job.to_json(), }, }) - failed_tasks.add(job.id) else: # This is an exception before a task could even be # retrieved, so it's probably fatal diff --git a/dpq/migrations/0004_job_failed.py b/dpq/migrations/0004_job_failed.py new file mode 100644 index 0000000..6614502 --- /dev/null +++ b/dpq/migrations/0004_job_failed.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2 on 2023-10-24 08:53 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('dpq', '0003_alter_job_args'), + ] + + operations = [ + migrations.AddField( + model_name='job', + name='failed', + field=models.BooleanField(default=False), + ), + ] diff --git a/dpq/models.py b/dpq/models.py index 85712db..5451804 100644 --- a/dpq/models.py +++ b/dpq/models.py @@ -18,6 +18,7 @@ class BaseJob(models.Model): ) task = models.CharField(max_length=255) args = JSONField() + failed = models.BooleanField(default=False) class Meta: indexes = [ @@ -29,7 +30,7 @@ def __str__(self): return '%s: %s' % (self.id, self.task) @classmethod - def dequeue(cls, exclude_ids=[]): + def dequeue(cls): """ Claims the first available task and returns it. If there are no tasks available, returns None. @@ -49,7 +50,7 @@ def dequeue(cls, exclude_ids=[]): SELECT id FROM {db_table} WHERE execute_at <= now() - AND NOT id = ANY(%s) + AND failed = false ORDER BY priority DESC, created_at FOR UPDATE SKIP LOCKED LIMIT 1 @@ -58,7 +59,6 @@ def dequeue(cls, exclude_ids=[]): """.format( db_table=connection.ops.quote_name(cls._meta.db_table), ), - [list(exclude_ids)] )) assert len(tasks) <= 1 if tasks: @@ -74,6 +74,7 @@ def to_json(self): 'priority': self.priority, 'task': self.task, 'args': self.args, + 'failed': self.failed, } class Job(BaseJob):