77# See https://aboutcode.org for more information about nexB OSS projects.
88#
99
10+ import csv
1011import hashlib
1112import json
1213import logging
13- import typing
14+ import xml . etree . ElementTree as ET
1415from contextlib import suppress
1516from functools import cached_property
16- from typing import Optional
17+ from itertools import groupby
18+ from operator import attrgetter
1719from typing import Union
1820
21+ from cvss .exceptions import CVSS2MalformedError
22+ from cvss .exceptions import CVSS3MalformedError
23+ from cvss .exceptions import CVSS4MalformedError
1924from cwe2 .database import Database
25+ from cwe2 .mappings import xml_database_path
26+ from cwe2 .weakness import Weakness as DBWeakness
2027from django .contrib .auth import get_user_model
2128from django .contrib .auth .models import UserManager
2229from django .core import exceptions
4350from univers .version_range import AlpineLinuxVersionRange
4451from univers .versions import Version
4552
46- from aboutcode import hashid
4753from vulnerabilities import utils
54+ from vulnerabilities .severity_systems import EPSS
4855from vulnerabilities .severity_systems import SCORING_SYSTEMS
4956from vulnerabilities .utils import normalize_purl
5057from vulnerabilities .utils import purl_to_dict
5663models .CharField .register_lookup (Trim )
5764
5865# patch univers for missing entry
59- RANGE_CLASS_BY_SCHEMES ["alpine " ] = AlpineLinuxVersionRange
66+ RANGE_CLASS_BY_SCHEMES ["apk " ] = AlpineLinuxVersionRange
6067
6168
6269class BaseQuerySet (models .QuerySet ):
@@ -373,6 +380,127 @@ def get_related_purls(self):
373380 """
374381 return [p .package_url for p in self .packages .distinct ().all ()]
375382
383+ def aggregate_fixed_and_affected_packages (self ):
384+ from vulnerabilities .utils import get_purl_version_class
385+
386+ sorted_fixed_by_packages = self .fixed_by_packages .filter (is_ghost = False ).order_by (
387+ "type" , "namespace" , "name" , "qualifiers" , "subpath"
388+ )
389+
390+ if sorted_fixed_by_packages :
391+ sorted_fixed_by_packages .first ().calculate_version_rank
392+
393+ sorted_affected_packages = self .affected_packages .all ()
394+
395+ if sorted_affected_packages :
396+ sorted_affected_packages .first ().calculate_version_rank
397+
398+ grouped_fixed_by_packages = {
399+ key : list (group )
400+ for key , group in groupby (
401+ sorted_fixed_by_packages ,
402+ key = attrgetter ("type" , "namespace" , "name" , "qualifiers" , "subpath" ),
403+ )
404+ }
405+
406+ all_affected_fixed_by_matches = []
407+
408+ for sorted_affected_package in sorted_affected_packages :
409+ affected_fixed_by_matches = {
410+ "affected_package" : sorted_affected_package ,
411+ "matched_fixed_by_packages" : [],
412+ }
413+
414+ # Build the key to find matching group
415+ key = (
416+ sorted_affected_package .type ,
417+ sorted_affected_package .namespace ,
418+ sorted_affected_package .name ,
419+ sorted_affected_package .qualifiers ,
420+ sorted_affected_package .subpath ,
421+ )
422+
423+ # Get matching group from pre-grouped fixed_by_packages
424+ matching_fixed_packages = grouped_fixed_by_packages .get (key , [])
425+
426+ # Get version classes for comparison
427+ affected_version_class = get_purl_version_class (sorted_affected_package )
428+ affected_version = affected_version_class (sorted_affected_package .version )
429+
430+ # Compare versions and filter valid matches
431+ matched_fixed_by_packages = [
432+ fixed_by_package .purl
433+ for fixed_by_package in matching_fixed_packages
434+ if get_purl_version_class (fixed_by_package )(fixed_by_package .version )
435+ > affected_version
436+ ]
437+
438+ affected_fixed_by_matches ["matched_fixed_by_packages" ] = matched_fixed_by_packages
439+ all_affected_fixed_by_matches .append (affected_fixed_by_matches )
440+ return sorted_fixed_by_packages , sorted_affected_packages , all_affected_fixed_by_matches
441+
442+ def get_severity_vectors_and_values (self ):
443+ """
444+ Collect severity vectors and values, excluding EPSS scoring systems and handling errors gracefully.
445+ """
446+ severity_vectors = []
447+ severity_values = set ()
448+
449+ # Exclude EPSS scoring system
450+ base_severities = self .severities .exclude (scoring_system = EPSS .identifier )
451+
452+ # QuerySet for severities with valid scoring_elements and scoring_system in SCORING_SYSTEMS
453+ valid_scoring_severities = base_severities .filter (
454+ scoring_elements__isnull = False , scoring_system__in = SCORING_SYSTEMS .keys ()
455+ )
456+
457+ for severity in valid_scoring_severities :
458+ try :
459+ vector_values = SCORING_SYSTEMS [severity .scoring_system ].get (
460+ severity .scoring_elements
461+ )
462+ if vector_values :
463+ severity_vectors .append (vector_values )
464+ except (
465+ CVSS2MalformedError ,
466+ CVSS3MalformedError ,
467+ CVSS4MalformedError ,
468+ NotImplementedError ,
469+ ) as e :
470+ logging .error (f"CVSSMalformedError for { severity .scoring_elements } : { e } " )
471+
472+ valid_value_severities = base_severities .filter (value__isnull = False ).exclude (value = "" )
473+
474+ severity_values .update (valid_value_severities .values_list ("value" , flat = True ))
475+
476+ return severity_vectors , severity_values
477+
478+
479+ def get_cwes (self ):
480+ """Yield CWE Weakness objects"""
481+ for cwe_category in self .cwe_files :
482+ cwe_category .seek (0 )
483+ reader = csv .DictReader (cwe_category )
484+ for row in reader :
485+ yield DBWeakness (* list (row .values ())[0 :- 1 ])
486+ tree = ET .parse (xml_database_path )
487+ root = tree .getroot ()
488+ for tag_num in [1 , 2 ]: # Categories , Views
489+ tag = root [tag_num ]
490+ for child in tag :
491+ yield DBWeakness (
492+ * [
493+ child .attrib ["ID" ],
494+ child .attrib .get ("Name" ),
495+ None ,
496+ child .attrib .get ("Status" ),
497+ child [0 ].text ,
498+ ]
499+ )
500+
501+
502+ Database .get_cwes = get_cwes
503+
376504
377505class Weakness (models .Model ):
378506 """
@@ -381,7 +509,15 @@ class Weakness(models.Model):
381509
382510 cwe_id = models .IntegerField (help_text = "CWE id" )
383511 vulnerabilities = models .ManyToManyField (Vulnerability , related_name = "weaknesses" )
384- db = Database ()
512+
513+ cwe_by_id = {}
514+
515+ def get_cwe (self , cwe_id ):
516+ if not self .cwe_by_id :
517+ db = Database ()
518+ for weakness in db .get_cwes ():
519+ self .cwe_by_id [str (weakness .cwe_id )] = weakness
520+ return self .cwe_by_id [cwe_id ]
385521
386522 @property
387523 def cwe (self ):
@@ -393,7 +529,7 @@ def weakness(self):
393529 Return a queryset of Weakness for this vulnerability.
394530 """
395531 try :
396- weakness = self .db . get ( self .cwe_id )
532+ weakness = self .get_cwe ( str ( self .cwe_id ) )
397533 return weakness
398534 except Exception as e :
399535 logger .warning (f"Could not find CWE { self .cwe_id } : { e } " )
0 commit comments