Skip to content

Commit 71605e7

Browse files
authored
Merge branch 'main' into codex/issue-1295-package-release-date
2 parents 5426539 + 1bc406d commit 71605e7

17 files changed

+450
-134
lines changed

CHANGELOG.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
Release notes
22
=============
33

4+
Version v38.4.0
5+
---------------------
6+
7+
- fix: run pipeline scheduling jobs in respective queues (https://github.com/aboutcode-org/vulnerablecode/pull/2263)
8+
- feat: show queue load factors on the pipeline dashboard (https://github.com/aboutcode-org/vulnerablecode/pull/2264)
9+
410
Version v38.3.0
511
---------------------
612

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[metadata]
22
name = vulnerablecode
3-
version = 38.3.0
3+
version = 38.4.0
44
license = Apache-2.0 AND CC-BY-SA-4.0
55

66
# description must be on ONE line https://github.com/pypa/setuptools/issues/1390
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Generated by Django 5.2.11 on 2026-04-13 19:05
2+
3+
from django.db import migrations
4+
from django.db import models
5+
from django.db.models import F
6+
7+
8+
class Migration(migrations.Migration):
9+
def add_is_latest_on_existing_advisory(apps, schema_editor):
10+
Advisory = apps.get_model("vulnerabilities", "AdvisoryV2")
11+
12+
print(f"\nUpdating is_latest on existing V2 Advisory.")
13+
latest_qs = Advisory.objects.order_by(
14+
"avid",
15+
F("date_collected").desc(nulls_last=True),
16+
"-id",
17+
).distinct("avid")
18+
19+
Advisory.objects.filter(id__in=latest_qs.values("id")).update(is_latest=True)
20+
21+
dependencies = [
22+
("vulnerabilities", "0120_impactedpackage_last_range_unfurl_at_and_more"),
23+
]
24+
25+
operations = [
26+
migrations.AddField(
27+
model_name="advisoryv2",
28+
name="is_latest",
29+
field=models.BooleanField(
30+
db_index=True,
31+
default=False,
32+
help_text="Indicates whether this is the latest version of the advisory identified by its AVID.",
33+
),
34+
),
35+
migrations.AlterField(
36+
model_name="advisoryv2",
37+
name="advisory_id",
38+
field=models.CharField(
39+
db_index=True,
40+
help_text="An advisory is a unique vulnerability identifier in some database, such as PYSEC-2020-2233",
41+
max_length=500,
42+
),
43+
),
44+
migrations.AlterField(
45+
model_name="advisoryv2",
46+
name="datasource_id",
47+
field=models.CharField(
48+
db_index=True,
49+
help_text="Unique ID for the datasource used for this advisory .e.g.: nginx_importer_v2",
50+
max_length=200,
51+
),
52+
),
53+
migrations.AddConstraint(
54+
model_name="advisoryv2",
55+
constraint=models.UniqueConstraint(
56+
condition=models.Q(("is_latest", True)),
57+
fields=("avid",),
58+
name="unique_latest_per_avid",
59+
),
60+
),
61+
migrations.RunPython(
62+
code=add_is_latest_on_existing_advisory,
63+
reverse_code=migrations.RunPython.noop,
64+
),
65+
]

vulnerabilities/models.py

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2358,7 +2358,11 @@ def save(self, *args, **kwargs):
23582358
if not self.pk:
23592359
self.schedule_work_id = self.create_new_job(execute_now=True)
23602360
elif self.pk and (existing := PipelineSchedule.objects.get(pk=self.pk)):
2361-
if existing.is_active != self.is_active or existing.run_interval != self.run_interval:
2361+
if (
2362+
existing.is_active != self.is_active
2363+
or existing.run_interval != self.run_interval
2364+
or existing.run_priority != self.run_priority
2365+
):
23622366
self.schedule_work_id = self.create_new_job()
23632367
self.full_clean()
23642368
return super().save(*args, **kwargs)
@@ -2390,6 +2394,11 @@ def all_runs(self):
23902394
def latest_run(self):
23912395
return self.pipelineruns.first() if self.pipelineruns.exists() else None
23922396

2397+
@property
2398+
def latest_successful_run(self):
2399+
successful_runs = self.pipelineruns.filter(run_end_date__isnull=False, run_exitcode=0)
2400+
return successful_runs.first() if successful_runs.exists() else None
2401+
23932402
@property
23942403
def earliest_run(self):
23952404
return self.pipelineruns.earliest("run_start_date") if self.pipelineruns.exists() else None
@@ -2874,21 +2883,10 @@ def to_dict(self):
28742883

28752884
class AdvisoryV2QuerySet(BaseQuerySet):
28762885
def latest_for_avid(self, avid: str):
2877-
return (
2878-
self.filter(avid=avid)
2879-
.order_by(
2880-
F("date_collected").desc(nulls_last=True),
2881-
"-id",
2882-
)
2883-
.first()
2884-
)
2886+
return self.get(avid=avid, is_latest=True)
28852887

28862888
def latest_per_avid(self):
2887-
return self.order_by(
2888-
"avid",
2889-
F("date_collected").desc(nulls_last=True),
2890-
"-id",
2891-
).distinct("avid")
2889+
return self.filter(is_latest=True)
28922890

28932891
def latest_for_avids(self, avids):
28942892
return self.filter(avid__in=avids).latest_per_avid()
@@ -3005,6 +3003,7 @@ class AdvisoryV2(models.Model):
30053003
max_length=200,
30063004
blank=False,
30073005
null=False,
3006+
db_index=True,
30083007
help_text="Unique ID for the datasource used for this advisory ." "e.g.: nginx_importer_v2",
30093008
)
30103009

@@ -3014,6 +3013,7 @@ class AdvisoryV2(models.Model):
30143013
blank=False,
30153014
null=False,
30163015
unique=False,
3016+
db_index=True,
30173017
help_text="An advisory is a unique vulnerability identifier in some database, "
30183018
"such as PYSEC-2020-2233",
30193019
)
@@ -3088,6 +3088,14 @@ class AdvisoryV2(models.Model):
30883088
help_text="UTC Date on which the advisory was collected",
30893089
)
30903090

3091+
is_latest = models.BooleanField(
3092+
default=False,
3093+
blank=False,
3094+
null=False,
3095+
db_index=True,
3096+
help_text="Indicates whether this is the latest version of the advisory identified by its AVID.",
3097+
)
3098+
30913099
original_advisory_text = models.TextField(
30923100
blank=True,
30933101
null=True,
@@ -3140,6 +3148,11 @@ class AdvisoryV2(models.Model):
31403148
class Meta:
31413149
unique_together = ["datasource_id", "advisory_id", "unique_content_id"]
31423150
ordering = ["datasource_id", "advisory_id", "date_published", "unique_content_id"]
3151+
constraints = [
3152+
models.UniqueConstraint(
3153+
fields=["avid"], condition=Q(is_latest=True), name="unique_latest_per_avid"
3154+
)
3155+
]
31433156
indexes = [
31443157
models.Index(
31453158
fields=["avid", "-date_collected", "-id"],

vulnerabilities/pipes/advisory.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,13 @@ def insert_advisory_v2(
334334
if not created:
335335
return advisory_obj
336336

337+
AdvisoryV2.objects.filter(
338+
avid=f"{pipeline_id}/{advisory.advisory_id}",
339+
is_latest=True,
340+
).update(is_latest=False)
341+
advisory_obj.is_latest = True
342+
advisory_obj.save()
343+
337344
aliases = get_or_create_advisory_aliases(aliases=advisory.aliases)
338345
references = get_or_create_advisory_references(references=advisory.references)
339346
severities = get_or_create_advisory_severities(severities=advisory.severities)

vulnerabilities/schedules.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def schedule_execution(pipeline_schedule, execute_now=False):
2424
Takes a `PackageSchedule` object as input and schedule a
2525
recurring job using `rq_scheduler` to execute the pipeline.
2626
"""
27+
queue_name = pipeline_schedule.get_run_priority_display()
2728
first_execution = datetime.datetime.now(tz=datetime.timezone.utc)
2829
if not execute_now:
2930
first_execution = pipeline_schedule.next_run_date
@@ -36,6 +37,7 @@ def schedule_execution(pipeline_schedule, execute_now=False):
3637
args=[pipeline_schedule.pipeline_id],
3738
interval=interval_in_seconds,
3839
repeat=None,
40+
queue_name=queue_name,
3941
)
4042
return job._id
4143

vulnerabilities/tasks.py

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,23 @@
99

1010

1111
import logging
12+
from collections import Counter
13+
from contextlib import suppress
1214
from io import StringIO
1315
from traceback import format_exc as traceback_format_exc
1416

1517
import django_rq
18+
from redis.exceptions import ConnectionError
19+
from rq import Worker
1620

1721
from vulnerabilities import models
1822
from vulnerabilities.importer import Importer
1923
from vulnerabilities.improver import Improver
24+
from vulnerablecode.settings import RQ_QUEUES
2025

2126
logger = logging.getLogger(__name__)
2227

23-
default_queue = django_rq.get_queue("default")
24-
high_queue = django_rq.get_queue("high")
25-
26-
queues = {
27-
"default": django_rq.get_queue("default"),
28-
"high": django_rq.get_queue("high"),
29-
}
28+
queues = {queue: django_rq.get_queue(queue) for queue in RQ_QUEUES.keys()}
3029

3130

3231
def execute_pipeline(pipeline_id, run_id):
@@ -151,3 +150,61 @@ def dequeue_job(job_id):
151150
for queue in queues.values():
152151
if job_id in queue.jobs:
153152
queue.remove(job_id)
153+
154+
155+
def compute_queue_load_factor():
156+
"""
157+
Compute worker load per queue.
158+
159+
Load factor is the ratio of the total compute required to run all active pipelines
160+
in a queue to the available worker capacity for that queue over a 24-hour period.
161+
A value greater than 1 indicates that the number of workers is insufficient to
162+
run all pipelines within the schedule.
163+
164+
Also compute the additional workers needed to balance each queue
165+
"""
166+
field = models.PipelineSchedule._meta.get_field("run_priority")
167+
label_to_value = {label: value for value, label in field.choices}
168+
total_compute_seconds_per_queue = {}
169+
worker_per_queue = {}
170+
load_per_queue = {}
171+
seconds_in_24_hr = 86400
172+
173+
with suppress(ConnectionError):
174+
redis_conn = django_rq.get_connection()
175+
queue_names = [
176+
w.queue_names()[0] for w in Worker.all(connection=redis_conn) if w.queue_names()
177+
]
178+
worker_per_queue = dict(Counter(queue_names))
179+
180+
for queue in RQ_QUEUES.keys():
181+
total_compute_seconds_per_queue[queue] = sum(
182+
(p.latest_successful_run.runtime / (p.run_interval / 24))
183+
for p in models.PipelineSchedule.objects.filter(
184+
is_active=True, run_priority=label_to_value[queue]
185+
)
186+
if p.latest_successful_run
187+
)
188+
if queue not in worker_per_queue:
189+
worker_per_queue[queue] = 0
190+
191+
for queue_name, worker_count in worker_per_queue.items():
192+
net_load_on_queue = "no_worker"
193+
total_compute = total_compute_seconds_per_queue.get(queue_name, 0)
194+
if total_compute == 0:
195+
continue
196+
197+
unit_load_on_queue = total_compute / seconds_in_24_hr
198+
199+
num_of_worker_for_balanced_queue = round(unit_load_on_queue)
200+
addition_worker_needed = max(num_of_worker_for_balanced_queue - worker_count, 0)
201+
202+
if worker_count > 0:
203+
net_load_on_queue = unit_load_on_queue / worker_count
204+
205+
load_per_queue[queue_name] = {
206+
"load_factor": net_load_on_queue,
207+
"additional_worker": addition_worker_needed,
208+
}
209+
210+
return dict(sorted(load_per_queue.items(), key=lambda x: x[0], reverse=True))

vulnerabilities/templates/pipeline_dashboard.html

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
{% extends "base.html" %}
22

3+
{% load utils %}
4+
35
{% block title %}
46
Pipeline Dashboard
57
{% endblock %}
@@ -22,6 +24,18 @@
2224
.column {
2325
word-break: break-word;
2426
}
27+
28+
.has-text-orange {
29+
color: #ff8c42 !important;
30+
}
31+
32+
.has-tooltip-orange::before {
33+
background-color: #ff8c42 !important;
34+
}
35+
36+
.has-tooltip-orange::after {
37+
border-top-color: #ff8c42 !important;
38+
}
2539
</style>
2640
{% endblock %}
2741

@@ -48,11 +62,63 @@ <h1>Pipeline Dashboard</h1>
4862
</form>
4963

5064
<div class="box">
51-
<div class="column has-text-right">
52-
<p class="has-text-weight-semibold">
53-
{{ active_pipeline_count|default:0 }} active pipeline{{ active_pipeline_count|default:0|pluralize }},
54-
{{ disabled_pipeline_count|default:0 }} disabled pipeline{{ disabled_pipeline_count|default:0|pluralize }}
55-
</p>
65+
<div class="columns is-multiline is-vcentered mb-0">
66+
<div class="column is-half has-text-left">
67+
{% if load_per_queue %}
68+
<p class="ml-3">
69+
<span class="has-text-weight-bold has-text-black is-size-6 has-tooltip-arrow has-tooltip-multiline"
70+
data-tooltip="Load factor is the ratio of the total compute required to run all active pipelines
71+
in a queue to the available worker capacity for that queue over a 24-hour period.
72+
A value greater than 1 indicates that the number of workers is insufficient to
73+
run all pipelines within the schedule.">
74+
Load Factor:
75+
</span>
76+
{% for queue_name, values in load_per_queue.items %}
77+
78+
<span class="has-text-weight-bold is-size-6 has-tooltip-arrow has-tooltip-multiline"
79+
data-tooltip="{{ queue_name|capfirst }} priority pipeline queue.">
80+
{{ queue_name| capfirst }}
81+
</span>
82+
{% with load_factor=values|get_item:"load_factor" additional=values|get_item:"additional_worker" %}
83+
{% if load_factor == "no_worker" %}
84+
<span class="has-text-weight-bold is-size-6 has-text-danger has-tooltip-arrow has-tooltip-multiline has-tooltip-danger"
85+
data-tooltip="All workers in the {{ queue_name }} queue are down. Please run {{ additional }}
86+
worker{{ additional|pluralize }} for the {{ queue_name }} queue.">
87+
<span class="icon"><i class="fa fa-exclamation-triangle"></i></span>
88+
</span>
89+
{% elif load_factor < 1 %}
90+
<span class="has-text-weight-bold is-size-6 has-text-success has-tooltip-arrow has-tooltip-multiline has-tooltip-success"
91+
data-tooltip="{{ queue_name|capfirst }} queue perfectly balanced.">
92+
{{ load_factor|floatformat:2 }}
93+
<span class="icon"><i class="fa fa-check-circle"></i></span>
94+
</span>
95+
{% elif load_factor < 1.6 %}
96+
<span class="has-text-weight-bold is-size-6 has-text-orange has-tooltip-arrow has-tooltip-multiline has-tooltip-orange"
97+
data-tooltip="Consider adding {{ additional }} additional worker{{ additional|pluralize }} to the {{ queue_name }} queue.">
98+
{{ load_factor|floatformat:2 }}
99+
<span class="icon"><i class="fa fa-info-circle"></i></span>
100+
</span>
101+
{% else %}
102+
<span class="has-text-weight-bold is-size-6 has-text-danger has-tooltip-arrow has-tooltip-multiline has-tooltip-danger"
103+
data-tooltip="Consider adding {{ additional }} additional worker{{ additional|pluralize }} to the {{ queue_name }} queue.">
104+
{{ load_factor|floatformat:2 }}
105+
<span class="icon"><i class="fa fa-exclamation-circle"></i></span>
106+
</span>
107+
{% endif %}
108+
{% endwith %}
109+
110+
{% if not forloop.last %} &bull; {% endif %}
111+
112+
{% endfor %}
113+
</p>
114+
{% endif %}
115+
</div>
116+
<div class="column is-half has-text-right">
117+
<p class="has-text-grey-dark has-text-weight-semibold mr-3">
118+
{{ active_pipeline_count|default:0 }} active pipeline{{ active_pipeline_count|default:0|pluralize }},
119+
{{ disabled_pipeline_count|default:0 }} disabled pipeline{{ disabled_pipeline_count|default:0|pluralize }}
120+
</p>
121+
</div>
56122
</div>
57123
<table class="table is-striped is-hoverable is-fullwidth">
58124
<thead>

0 commit comments

Comments
 (0)