Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 195 additions & 83 deletions vulnerabilities/api_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
# See https://aboutcode.org for more information about nexB OSS projects.
#

from collections import defaultdict
from typing import List
from urllib.parse import urlencode

from django.db.models import Exists
from django.db.models import Max
from django.db.models import OuterRef
from django.db.models import Prefetch
from django_filters import rest_framework as filters
Expand All @@ -21,6 +23,7 @@
from rest_framework.reverse import reverse
from rest_framework.throttling import AnonRateThrottle

from vulnerabilities.models import AdvisoryAlias
from vulnerabilities.models import AdvisoryReference
from vulnerabilities.models import AdvisorySet
from vulnerabilities.models import AdvisorySetMember
Expand Down Expand Up @@ -216,6 +219,25 @@ def get_fixing_vulnerabilities_url(self, obj):

def get_affected_by_vulnerabilities(self, package):
"""Return a dictionary with advisory as keys and their details, including fixed_by_packages."""
advisories = self.context["advisory_map"].get(package.id, [])
impact_map = self.context["impact_map"].get(package.id, {})

if advisories:
result = []

for adv in advisories:
fixed = impact_map.get(adv["avid"])
adv.pop("avid", None)

result.append(
{
**adv,
"fixed_by_packages": fixed,
}
)

return result

advisories_qs = AdvisoryV2.objects.latest_affecting_advisories_for_purl(package.package_url)

advisories = []
Expand Down Expand Up @@ -250,56 +272,32 @@ def get_affected_by_vulnerabilities(self, package):
"advisory_id": advisory.advisory_id.split("/")[-1],
"aliases": [alias.alias for alias in advisory.aliases.all()],
"summary": advisory.summary,
"fixed_by_packages": [pkg.purl for pkg in impact.fixed_by_packages.all()],
"severity": advisory.weighted_severity,
"exploitability": advisory.exploitability,
"risk_score": advisory.risk_score,
"fixed_by_packages": [pkg.purl for pkg in impact.fixed_by_packages.all()],
}
)

return result

is_grouped = AdvisorySet.objects.filter(package=package, relation_type="affecting").exists()

if is_grouped:
affected_by_advisories_qs = (
AdvisorySet.objects.filter(package=package, relation_type="affecting")
.select_related("primary_advisory")
.prefetch_related(
Prefetch(
"members",
queryset=AdvisorySetMember.objects.filter(is_primary=False).select_related(
"advisory"
),
to_attr="secondary_members",
)
if not advisories:
if package.type in TYPES_WITH_MULTIPLE_IMPORTERS:
advisories_qs = advisories_qs.prefetch_related(
"aliases",
"impacted_packages__affecting_packages",
"impacted_packages__fixed_by_packages",
)
)

affected_groups = [
Group(
aliases=list(adv.aliases.all()),
primary=adv.primary_advisory,
secondaries=[member.advisory for member in adv.secondary_members],
advisories: List[GroupedAdvisory] = merge_and_save_grouped_advisories(
package, advisories_qs, "affecting"
)
for adv in affected_by_advisories_qs
]

advisories: List[GroupedAdvisory] = get_advisories_from_groups(affected_groups)
return self.return_advisories_data(package, advisories_qs, advisories)

if package.type in TYPES_WITH_MULTIPLE_IMPORTERS:
advisories_qs = advisories_qs.prefetch_related(
"aliases",
"impacted_packages__affecting_packages",
"impacted_packages__fixed_by_packages",
)
advisories: List[GroupedAdvisory] = merge_and_save_grouped_advisories(
package, advisories_qs, "affecting"
)
return self.return_advisories_data(package, advisories_qs, advisories)
return self.return_advisories_data(package, advisories_qs, advisories)

def get_fixing_vulnerabilities(self, package):
advisories = self.context["fixing_advisory_map"].get(package.id, [])
if advisories:
return advisories

advisories_qs = AdvisoryV2.objects.latest_fixed_by_advisories_for_purl(package.package_url)

if not package.type in TYPES_WITH_MULTIPLE_IMPORTERS:
Expand All @@ -319,43 +317,14 @@ def get_fixing_vulnerabilities(self, package):
)
return results

advisories = []

is_grouped = AdvisorySet.objects.filter(package=package, relation_type="fixing").exists()

if is_grouped:
fixing_advisories_qs = (
AdvisorySet.objects.filter(package=package, relation_type="fixing")
.select_related("primary_advisory")
.prefetch_related(
Prefetch(
"members",
queryset=AdvisorySetMember.objects.filter(is_primary=False).select_related(
"advisory"
),
to_attr="secondary_members",
)
)
)

fixing_groups = [
Group(
aliases=list(adv.aliases.all()),
primary=adv.primary_advisory,
secondaries=[member.advisory for member in adv.secondary_members],
)
for adv in fixing_advisories_qs
]

advisories: List[GroupedAdvisory] = get_advisories_from_groups(fixing_groups)
return self.return_fixing_advisories_data(advisories)

if package.type in TYPES_WITH_MULTIPLE_IMPORTERS:
advisories_qs = advisories_qs.prefetch_related(
"aliases",
"impacted_packages__affecting_packages",
"impacted_packages__fixed_by_packages",
)
if not advisories_qs.exists():
return []
advisories: List[GroupedAdvisory] = merge_and_save_grouped_advisories(
package, advisories_qs, "fixing"
)
Expand Down Expand Up @@ -409,11 +378,11 @@ def return_advisories_data(self, package, advisories_qs, advisories):
return result

def get_next_non_vulnerable_version(self, package):
if next_non_vulnerable := package.get_non_vulnerable_versions()[0]:
if next_non_vulnerable := package.next_non_vulnerable_version:
return next_non_vulnerable.version

def get_latest_non_vulnerable_version(self, package):
if latest_non_vulnerable := package.get_non_vulnerable_versions()[-1]:
if latest_non_vulnerable := package.latest_non_vulnerable_version:
return latest_non_vulnerable.version


Expand Down Expand Up @@ -464,13 +433,11 @@ def create(self, request, *args, **kwargs):
query = (
PackageV2.objects.filter(plain_package_url__in=plain_purls)
.values_list("plain_package_url", flat=True)
.distinct()
.order_by("plain_package_url")
)
else:
query = (
PackageV2.objects.filter(package_url__in=purls)
.distinct()
.order_by("package_url")
.values_list("package_url", flat=True)
)
Expand All @@ -479,20 +446,26 @@ def create(self, request, *args, **kwargs):
return self.get_paginated_response(page)

if ignore_qualifiers_subpath:
query = (
PackageV2.objects.filter(plain_package_url__in=plain_purls)
.order_by("plain_package_url")
.distinct("plain_package_url")
query = PackageV2.objects.filter(plain_package_url__in=plain_purls).order_by(
"plain_package_url"
)
else:
query = (
PackageV2.objects.filter(package_url__in=purls)
.order_by("package_url")
.distinct("package_url")
)
query = PackageV2.objects.filter(package_url__in=purls).order_by("package_url")

page = self.paginate_queryset(query)
serializer = self.get_serializer(page, many=True, context={"request": request})
affected_advisory_map = get_affected_advisories_bulk(page)
fixing_advisory_map = get_fixing_advisories_bulk(page)
impact_map = get_impacts_bulk(page)
serializer = self.get_serializer(
page,
many=True,
context={
"request": request,
"advisory_map": affected_advisory_map,
"impact_map": impact_map,
"fixing_advisory_map": fixing_advisory_map,
},
)
return self.get_paginated_response(serializer.data)


Expand Down Expand Up @@ -592,3 +565,142 @@ class FixingAdvisoriesViewSet(PackageAdvisoriesViewSet):
class AffectedByAdvisoriesViewSet(PackageAdvisoriesViewSet):
relation = "impacted_packages__affecting_packages__package_url"
serializer_class = AffectedByAdvisoryV3Serializer


def get_affected_advisories_bulk(packages):
package_ids = [p.id for p in packages]

advisory_sets = list(
AdvisorySet.objects.filter(
package_id__in=package_ids,
relation_type="affecting",
)
.select_related("primary_advisory")
.prefetch_related(Prefetch("aliases", queryset=AdvisoryAlias.objects.only("alias")))
.annotate(
max_severity=Max(
"members__advisory__weighted_severity",
),
max_exploitability=Max(
"members__advisory__exploitability",
),
)
.only(
"id",
"package_id",
"primary_advisory__avid",
"primary_advisory__summary",
"primary_advisory__advisory_id",
)
)

package_map = defaultdict(list)

for adv in advisory_sets:
adv._aliases_cache = [a.alias for a in adv.aliases.all()]
package_map[adv.package_id].append(adv)

result = {}

for package in packages:
groups = package_map.get(package.id, [])
grouped = []

for adv in groups:
primary = adv.primary_advisory

max_sev = adv.max_severity or 0.0
max_exp = adv.max_exploitability or 0.0

weighted_severity = round(max_sev, 1) if max_sev else None
exploitability = max_exp or None

risk_score = round(min(max_exp * max_sev, 10.0), 1) if max_exp and max_sev else None

identifier = primary.advisory_id.split("/")[-1]

aliases = [a for a in adv._aliases_cache if a != identifier]

grouped.append(
{
"avid": primary.avid,
"advisory_id": identifier,
"aliases": aliases,
"weighted_severity": weighted_severity,
"exploitability": exploitability,
"risk_score": risk_score,
"summary": primary.summary,
}
)

result[package.id] = grouped

return result


def get_impacts_bulk(packages):
package_ids = [p.id for p in packages]

impacts = (
ImpactedPackageAffecting.objects.filter(package_id__in=package_ids)
.select_related("impacted_package__advisory")
.prefetch_related(
Prefetch(
"impacted_package__fixed_by_packages",
queryset=PackageV2.objects.only("package_url"),
)
)
.only(
"package_id",
"impacted_package_id",
"impacted_package__advisory_id",
"impacted_package__advisory__avid",
)
)

impact_map = defaultdict(dict)
fixed_cache = {}

for impact in impacts:
ip = impact.impacted_package
avid = ip.advisory.avid

if ip.id not in fixed_cache:
fixed_cache[ip.id] = list({pkg.purl for pkg in ip.fixed_by_packages.all()})

impact_map[impact.package_id][avid] = fixed_cache[ip.id]

return impact_map


def get_fixing_advisories_bulk(packages):
package_ids = [p.id for p in packages]

advisory_sets = list(
AdvisorySet.objects.filter(
package_id__in=package_ids,
relation_type="fixing",
).only(
"id",
"package_id",
"primary_advisory__advisory_id",
)
)

package_map = defaultdict(list)

for adv in advisory_sets:
package_map[adv.package_id].append(adv.primary_advisory.advisory_id)

result = {}

for package in packages:
groups = package_map.get(package.id, [])
grouped = []

for adv_id in groups:
grouped.append({"advisory_id": adv_id.split("/")[-1]})

result[package.id] = grouped

return result
Loading