Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions dpq/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,19 @@ def run_available_tasks(self):
"""
Runs tasks continuously until there are no more available.
"""
# Prevents tasks that failed from blocking others.
failed_tasks = set()
while True:
self._in_task = True
result = self.queue.run_once(exclude_ids=failed_tasks)
result = self.queue.run_once()
job, retval, exc = result or (None, None, None)
if exc:
if job:
job.failed = True
job.save(update_fields=["failed"])
self.logger.exception('Error in %r: %r.', job, exc, extra={
'data': {
'job': job.to_json(),
},
})
failed_tasks.add(job.id)
else:
# This is an exception before a task could even be
# retrieved, so it's probably fatal
Expand Down
18 changes: 18 additions & 0 deletions dpq/migrations/0004_job_failed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.2 on 2023-10-24 08:53

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('dpq', '0003_alter_job_args'),
]

operations = [
migrations.AddField(
model_name='job',
name='failed',
field=models.BooleanField(default=False),
),
]
7 changes: 4 additions & 3 deletions dpq/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class BaseJob(models.Model):
)
task = models.CharField(max_length=255)
args = JSONField()
failed = models.BooleanField(default=False)

class Meta:
indexes = [
Expand All @@ -29,7 +30,7 @@ def __str__(self):
return '%s: %s' % (self.id, self.task)

@classmethod
def dequeue(cls, exclude_ids=[]):
def dequeue(cls):
"""
Claims the first available task and returns it. If there are no
tasks available, returns None.
Expand All @@ -49,7 +50,7 @@ def dequeue(cls, exclude_ids=[]):
SELECT id
FROM {db_table}
WHERE execute_at <= now()
AND NOT id = ANY(%s)
AND failed = false
ORDER BY priority DESC, created_at
FOR UPDATE SKIP LOCKED
LIMIT 1
Expand All @@ -58,7 +59,6 @@ def dequeue(cls, exclude_ids=[]):
""".format(
db_table=connection.ops.quote_name(cls._meta.db_table),
),
[list(exclude_ids)]
))
assert len(tasks) <= 1
if tasks:
Expand All @@ -74,6 +74,7 @@ def to_json(self):
'priority': self.priority,
'task': self.task,
'args': self.args,
'failed': self.failed,
}

class Job(BaseJob):
Expand Down