diff --git a/vulnerabilities/improvers/__init__.py b/vulnerabilities/improvers/__init__.py index af8de5dbd..1be791241 100644 --- a/vulnerabilities/improvers/__init__.py +++ b/vulnerabilities/improvers/__init__.py @@ -19,6 +19,7 @@ from vulnerabilities.pipelines import flag_ghost_packages from vulnerabilities.pipelines import populate_vulnerability_summary_pipeline from vulnerabilities.pipelines import remove_duplicate_advisories +from vulnerabilities.pipelines.v2_improvers import compute_advisory_todo as compute_advisory_todo_v2 from vulnerabilities.pipelines.v2_improvers import compute_package_risk as compute_package_risk_v2 from vulnerabilities.pipelines.v2_improvers import ( computer_package_version_rank as compute_version_rank_v2, @@ -65,6 +66,7 @@ enhance_with_metasploit_v2.MetasploitImproverPipeline, compute_package_risk_v2.ComputePackageRiskPipeline, compute_version_rank_v2.ComputeVersionRankPipeline, + compute_advisory_todo_v2.ComputeToDo, compute_advisory_todo.ComputeToDo, ] ) diff --git a/vulnerabilities/migrations/0101_advisorytodov2_todorelatedadvisoryv2_and_more.py b/vulnerabilities/migrations/0101_advisorytodov2_todorelatedadvisoryv2_and_more.py new file mode 100644 index 000000000..2dd997eb2 --- /dev/null +++ b/vulnerabilities/migrations/0101_advisorytodov2_todorelatedadvisoryv2_and_more.py @@ -0,0 +1,137 @@ +# Generated by Django 4.2.22 on 2025-07-24 12:05 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ("vulnerabilities", "0100_remove_advisoryv2_affecting_packages_and_more"), + ] + + operations = [ + migrations.CreateModel( + name="AdvisoryToDoV2", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, primary_key=True, serialize=False, verbose_name="ID" + ), + ), + ( + "related_advisories_id", + models.CharField( + help_text="SHA1 digest of the unique_content_id field of the applicable advisories.", + max_length=40, + ), + ), + ( + "issue_type", + models.CharField( + choices=[ + ("MISSING_AFFECTED_PACKAGE", "Advisory is missing affected package"), + ("MISSING_FIXED_BY_PACKAGE", "Advisory is missing fixed-by package"), + ( + "MISSING_AFFECTED_AND_FIXED_BY_PACKAGES", + "Advisory is missing both affected and fixed-by packages", + ), + ("MISSING_SUMMARY", "Advisory is missing summary"), + ( + "CONFLICTING_FIXED_BY_PACKAGES", + "Advisories have conflicting fixed-by packages", + ), + ( + "CONFLICTING_AFFECTED_PACKAGES", + "Advisories have conflicting affected packages", + ), + ( + "CONFLICTING_AFFECTED_AND_FIXED_BY_PACKAGES", + "Advisories have conflicting affected and fixed-by packages", + ), + ( + "CONFLICTING_SEVERITY_SCORES", + "Advisories have conflicting severity scores", + ), + ], + db_index=True, + help_text="Select the issue that needs to be addressed from the available options.", + max_length=50, + ), + ), + ( + "issue_detail", + models.TextField(blank=True, help_text="Additional details about the issue."), + ), + ( + "created_at", + models.DateTimeField( + auto_now_add=True, + help_text="Timestamp indicating when this TODO was created.", + ), + ), + ( + "is_resolved", + models.BooleanField( + db_index=True, default=False, help_text="This TODO is resolved or not." + ), + ), + ( + "resolved_at", + models.DateTimeField( + blank=True, + help_text="Timestamp indicating when this TODO was resolved.", + null=True, + ), + ), + ( + "resolution_detail", + models.TextField( + blank=True, help_text="Additional detail on how this TODO was resolved." + ), + ), + ], + ), + migrations.CreateModel( + name="ToDoRelatedAdvisoryV2", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, primary_key=True, serialize=False, verbose_name="ID" + ), + ), + ( + "advisory", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, to="vulnerabilities.advisoryv2" + ), + ), + ( + "todo", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + to="vulnerabilities.advisorytodov2", + ), + ), + ], + options={ + "unique_together": {("todo", "advisory")}, + }, + ), + migrations.AddField( + model_name="advisorytodov2", + name="advisories", + field=models.ManyToManyField( + help_text="Advisory/ies where this TODO is applicable.", + related_name="advisory_todos", + through="vulnerabilities.ToDoRelatedAdvisoryV2", + to="vulnerabilities.advisoryv2", + ), + ), + migrations.AlterUniqueTogether( + name="advisorytodov2", + unique_together={("related_advisories_id", "issue_type")}, + ), + ] diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index d51b2fce6..60c20d5b9 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -2493,6 +2493,62 @@ class Meta: unique_together = ("related_advisories_id", "issue_type") +class AdvisoryToDoV2(models.Model): + """Track the TODOs for advisory/ies that need to be addressed.""" + + # Since we can not make advisories field (M2M field) unique + # (see https://code.djangoproject.com/ticket/702), we use related_advisories_id + # to avoid creating duplicate issue for same set of advisories, + related_advisories_id = models.CharField( + max_length=40, + help_text="SHA1 digest of the unique_content_id field of the applicable advisories.", + ) + + advisories = models.ManyToManyField( + "AdvisoryV2", + through="ToDoRelatedAdvisoryV2", + related_name="advisory_todos", + help_text="Advisory/ies where this TODO is applicable.", + ) + + issue_type = models.CharField( + max_length=50, + choices=ISSUE_TYPE_CHOICES, + db_index=True, + help_text="Select the issue that needs to be addressed from the available options.", + ) + + issue_detail = models.TextField( + blank=True, + help_text="Additional details about the issue.", + ) + + created_at = models.DateTimeField( + auto_now_add=True, + help_text="Timestamp indicating when this TODO was created.", + ) + + is_resolved = models.BooleanField( + default=False, + db_index=True, + help_text="This TODO is resolved or not.", + ) + + resolved_at = models.DateTimeField( + null=True, + blank=True, + help_text="Timestamp indicating when this TODO was resolved.", + ) + + resolution_detail = models.TextField( + blank=True, + help_text="Additional detail on how this TODO was resolved.", + ) + + class Meta: + unique_together = ("related_advisories_id", "issue_type") + + class AdvisorySeverity(models.Model): url = models.URLField( max_length=1024, @@ -2933,6 +2989,21 @@ class Meta: unique_together = ("todo", "advisory") +class ToDoRelatedAdvisoryV2(models.Model): + todo = models.ForeignKey( + AdvisoryToDoV2, + on_delete=models.CASCADE, + ) + + advisory = models.ForeignKey( + AdvisoryV2, + on_delete=models.CASCADE, + ) + + class Meta: + unique_together = ("todo", "advisory") + + class PackageQuerySetV2(BaseQuerySet, PackageURLQuerySet): def search(self, query: str = None): """ diff --git a/vulnerabilities/pipelines/v2_improvers/compute_advisory_todo.py b/vulnerabilities/pipelines/v2_improvers/compute_advisory_todo.py new file mode 100644 index 000000000..981f10e92 --- /dev/null +++ b/vulnerabilities/pipelines/v2_improvers/compute_advisory_todo.py @@ -0,0 +1,353 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + + +import json + +from aboutcode.pipeline import LoopProgress +from django.utils import timezone + +from vulnerabilities.models import AdvisoryAlias +from vulnerabilities.models import AdvisoryToDoV2 +from vulnerabilities.models import AdvisoryV2 +from vulnerabilities.models import ToDoRelatedAdvisoryV2 +from vulnerabilities.pipelines import VulnerableCodePipeline +from vulnerabilities.pipes.advisory import advisories_checksum + + +class ComputeToDo(VulnerableCodePipeline): + """Compute ToDos for Advisory.""" + + pipeline_id = "compute_advisory_todo_v2" + + @classmethod + def steps(cls): + return ( + cls.compute_individual_advisory_todo, + cls.detect_conflicting_advisories, + ) + + def compute_individual_advisory_todo(self): + """Create ToDos for missing summary, affected and fixed packages.""" + + advisories = AdvisoryV2.objects.all().prefetch_related( + "impacted_packages", + ) + advisories_count = advisories.count() + advisory_relation_to_create = {} + todo_to_create = [] + new_todos_count = 0 + batch_size = 5000 + + self.log( + f"Checking missing summary, affected and fixed packages in {advisories_count} Advisories" + ) + progress = LoopProgress( + total_iterations=advisories_count, + logger=self.log, + progress_step=1, + ) + for advisory in progress.iter(advisories.iterator(chunk_size=5000)): + advisory_todo_id = advisories_checksum(advisories=advisory) + check_missing_summary( + advisory=advisory, + todo_id=advisory_todo_id, + todo_to_create=todo_to_create, + advisory_relation_to_create=advisory_relation_to_create, + ) + + check_missing_affected_and_fixed_by_packages( + advisory=advisory, + todo_id=advisory_todo_id, + todo_to_create=todo_to_create, + advisory_relation_to_create=advisory_relation_to_create, + ) + + if len(todo_to_create) > batch_size: + new_todos_count += bulk_create_with_m2m( + todos=todo_to_create, + advisories=advisory_relation_to_create, + logger=self.log, + ) + advisory_relation_to_create.clear() + todo_to_create.clear() + + new_todos_count += bulk_create_with_m2m( + todos=todo_to_create, + advisories=advisory_relation_to_create, + logger=self.log, + ) + + self.log( + f"Successfully created {new_todos_count} ToDos for missing summary, affected and fixed packages" + ) + + def detect_conflicting_advisories(self): + """ + Create ToDos for advisories with conflicting opinions on fixed and affected + package versions for a vulnerability. + """ + aliases = AdvisoryAlias.objects.filter(alias__istartswith="cve") + aliases_count = aliases.count() + advisory_relation_to_create = {} + todo_to_create = [] + new_todos_count = 0 + batch_size = 5000 + + self.log(f"Cross validating advisory affected and fixed package for {aliases_count} CVEs") + + progress = LoopProgress( + total_iterations=aliases_count, + logger=self.log, + progress_step=1, + ) + for alias in progress.iter(aliases.iterator(chunk_size=2000)): + advisories = ( + alias.advisories.exclude( + advisory_todos__issue_type="MISSING_AFFECTED_AND_FIXED_BY_PACKAGES" + ) + .distinct() + .prefetch_related( + "impacted_packages", + ) + ) + + check_conflicting_affected_and_fixed_by_packages_for_alias( + advisories=advisories, + cve=alias, + todo_to_create=todo_to_create, + advisory_relation_to_create=advisory_relation_to_create, + ) + + if len(todo_to_create) > batch_size: + new_todos_count += bulk_create_with_m2m( + todos=todo_to_create, + advisories=advisory_relation_to_create, + logger=self.log, + ) + advisory_relation_to_create.clear() + todo_to_create.clear() + + new_todos_count += bulk_create_with_m2m( + todos=todo_to_create, + advisories=advisory_relation_to_create, + logger=self.log, + ) + + self.log( + f"Successfully created {new_todos_count} ToDos for conflicting affected and fixed packages" + ) + + +def check_missing_summary( + advisory: AdvisoryV2, + todo_id, + todo_to_create, + advisory_relation_to_create, +): + if not advisory.summary: + todo = AdvisoryToDoV2( + related_advisories_id=todo_id, + issue_type="MISSING_SUMMARY", + ) + advisory_relation_to_create[todo_id] = [advisory] + todo_to_create.append(todo) + + +def check_missing_affected_and_fixed_by_packages( + advisory: AdvisoryV2, + todo_id, + todo_to_create, + advisory_relation_to_create, +): + """ + Check for missing affected or fixed-by packages in the advisory + and create appropriate AdvisoryToDo. + + - If both affected and fixed packages are missing add `MISSING_AFFECTED_AND_FIXED_BY_PACKAGES`. + - If only the affected package is missing add `MISSING_AFFECTED_PACKAGE`. + - If only the fixed package is missing add `MISSING_FIXED_BY_PACKAGE`. + """ + has_affected_package = False + has_fixed_package = False + + for impacted in advisory.impacted_packages.all() or []: + if not impacted: + continue + + if has_affected_package and has_fixed_package: + break + if not has_affected_package and impacted.affecting_vers: + has_affected_package = True + if not has_fixed_package and impacted.fixed_vers: + has_fixed_package = True + + if has_affected_package and has_fixed_package: + return + + if not has_affected_package and not has_fixed_package: + issue_type = "MISSING_AFFECTED_AND_FIXED_BY_PACKAGES" + elif not has_affected_package: + issue_type = "MISSING_AFFECTED_PACKAGE" + elif not has_fixed_package: + issue_type = "MISSING_FIXED_BY_PACKAGE" + + if issue_type: + todo = AdvisoryToDoV2( + related_advisories_id=todo_id, + issue_type=issue_type, + ) + todo_to_create.append(todo) + advisory_relation_to_create[todo_id] = [advisory] + + +def check_conflicting_affected_and_fixed_by_packages_for_alias( + advisories, + cve, + todo_to_create, + advisory_relation_to_create, +): + """ + Add appropriate AdvisoryToDo for conflicting affected/fixed packages. + + Compute the comparison matrix for the given set of advisories. Iterate through each advisory + and compute and store fixed versionsrange and affected versionrange for each advisory, + keyed by purl. + + Use the matrix to determine conflicts in affected/fixed versions for each purl. If for any purl + there is more than one set of fixed versionrange or more than one set of affected versionrange, + it means the advisories have conflicting opinions on the fixed or affected packages. + + Example of comparison matrix: + { + "pkg:npm/foo/bar": { + "affected": { + Advisory1: frozenset(VersionRange1, VersionRange2), + Advisory2: frozenset(...), + }, + "fixed": { + Advisory1: frozenset(VersionRange1, VersionRange2), + Advisory2: frozenset(...), + }, + }, + "pkg:pypi/foobar": { + "affected": { + Advisory1: frozenset(...), + Advisory2: frozenset(...), + }, + "fixed": { + Advisory1: frozenset(...), + Advisory2: frozenset(...), + }, + }, + ... + } + """ + matrix = {} + for advisory in advisories: + advisory_id = advisory.unique_content_id + for impacted in advisory.impacted_packages.all() or []: + affected_purl = impacted.base_purl + + initialize_sub_matrix( + matrix=matrix, + affected_purl=affected_purl, + advisory=advisory, + ) + + if fixed_version_range := impacted.fixed_vers: + matrix[affected_purl]["fixed"][advisory_id].add(fixed_version_range) + + if affecting_version_range := impacted.affecting_vers: + matrix[affected_purl]["affected"][advisory_id].add(affecting_version_range) + + has_conflicting_affected_packages = False + has_conflicting_fixed_package = False + messages = [] + for purl, board in matrix.items(): + fixed = board.get("fixed", {}).values() + impacted = board.get("affected", {}).values() + + unique_set_of_affected_vers = {frozenset(vers) for vers in impacted} + unique_set_of_fixed_vers = {frozenset(vers) for vers in fixed} + + if len(unique_set_of_affected_vers) > 1: + has_conflicting_affected_packages = True + messages.append( + f"{cve}: {purl} with conflicting affected versions {unique_set_of_affected_vers}" + ) + if len(unique_set_of_fixed_vers) > 1: + has_conflicting_fixed_package = True + messages.append( + f"{cve}: {purl} with conflicting fixed version {unique_set_of_fixed_vers}" + ) + + if not has_conflicting_affected_packages and not has_conflicting_fixed_package: + return + + issue_type = "CONFLICTING_AFFECTED_AND_FIXED_BY_PACKAGES" + if not has_conflicting_fixed_package: + issue_type = "CONFLICTING_AFFECTED_PACKAGES" + elif not has_conflicting_affected_packages: + issue_type = "CONFLICTING_FIXED_BY_PACKAGES" + + issue_detail = { + "Conflict summary": messages, + "Conflict matrix": matrix, + } + + todo_id = advisories_checksum(advisories) + todo = AdvisoryToDoV2( + related_advisories_id=todo_id, + issue_type=issue_type, + issue_detail=json.dumps(issue_detail, default=list), + ) + todo_to_create.append(todo) + advisory_relation_to_create[todo_id] = list(advisories) + + +def initialize_sub_matrix(matrix, affected_purl, advisory): + advisory_id = advisory.unique_content_id + if affected_purl not in matrix: + matrix[affected_purl] = { + "affected": {advisory_id: set()}, + "fixed": {advisory_id: set()}, + } + else: + if advisory not in matrix[affected_purl]["affected"]: + matrix[affected_purl]["affected"][advisory_id] = set() + if advisory not in matrix[affected_purl]["fixed"]: + matrix[affected_purl]["fixed"][advisory_id] = set() + + +def bulk_create_with_m2m(todos, advisories, logger): + """Bulk create ToDos and also bulk create M2M ToDo Advisory relationships.""" + if not todos: + return 0 + + start_time = timezone.now() + try: + AdvisoryToDoV2.objects.bulk_create(objs=todos, ignore_conflicts=True) + except Exception as e: + logger(f"Error creating AdvisoryToDo: {e}") + + new_todos = AdvisoryToDoV2.objects.filter(created_at__gte=start_time) + + relations = [ + ToDoRelatedAdvisoryV2(todo=todo, advisory=advisory) + for todo in new_todos + for advisory in advisories[todo.related_advisories_id] + ] + + try: + ToDoRelatedAdvisoryV2.objects.bulk_create(relations) + except Exception as e: + logger(f"Error creating Advisory ToDo relations: {e}") + + return new_todos.count() diff --git a/vulnerabilities/pipes/advisory.py b/vulnerabilities/pipes/advisory.py index 3487b74db..412e94359 100644 --- a/vulnerabilities/pipes/advisory.py +++ b/vulnerabilities/pipes/advisory.py @@ -327,7 +327,7 @@ def import_advisory( def advisories_checksum(advisories: Union[Advisory, List[Advisory]]) -> str: - if isinstance(advisories, Advisory): + if isinstance(advisories, Advisory) or isinstance(advisories, AdvisoryV2): advisories = [advisories] contents = sorted([advisory.unique_content_id for advisory in advisories]) diff --git a/vulnerabilities/tests/pipelines/test_compute_advisory_todo_v2.py b/vulnerabilities/tests/pipelines/test_compute_advisory_todo_v2.py new file mode 100644 index 000000000..b0e7d06df --- /dev/null +++ b/vulnerabilities/tests/pipelines/test_compute_advisory_todo_v2.py @@ -0,0 +1,209 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from datetime import datetime + +from django.test import TestCase +from packageurl import PackageURL + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import AffectedPackageV2 +from vulnerabilities.importer import ReferenceV2 +from vulnerabilities.models import AdvisoryAlias +from vulnerabilities.models import AdvisoryToDoV2 +from vulnerabilities.models import AdvisoryV2 +from vulnerabilities.models import ImpactedPackage +from vulnerabilities.pipelines.v2_improvers.compute_advisory_todo import ComputeToDo + + +class TestComputeToDo(TestCase): + def setUp(self): + self.advisory_data1 = AdvisoryData( + summary="Test summary", + affected_packages=[ + AffectedPackageV2( + package=PackageURL(type="npm", name="package1"), + affected_version_range="vers:npm/>=1.0.0|<2.0.0", + fixed_version_range="vers:npm/2.0.0", + ) + ], + references_v2=[ReferenceV2(url="https://example.com/vuln1")], + url="https://test.url/", + ) + + self.advisory_data2 = AdvisoryData( + summary="Test summary", + affected_packages=[ + AffectedPackageV2( + package=PackageURL(type="npm", name="package1"), + affected_version_range="vers:npm/>=1.0.0|<2.0.0", + ) + ], + references_v2=[ReferenceV2(url="https://example.com/vuln1")], + url="https://test.url/", + ) + + self.advisory_data3 = AdvisoryData( + summary="Test summary", + affected_packages=[ + AffectedPackageV2( + package=PackageURL(type="npm", name="package1"), + fixed_version_range="vers:npm/2.0.0", + ) + ], + references_v2=[ReferenceV2(url="https://example.com/vuln1")], + url="https://test.url/", + ) + + self.advisory_data4 = AdvisoryData( + summary="Test summary", + affected_packages=[ + AffectedPackageV2( + package=PackageURL(type="npm", name="package1"), + affected_version_range="vers:npm/>=1.0.0|<=2.0.0", + fixed_version_range="vers:npm/2.0.1", + ) + ], + references_v2=[ReferenceV2(url="https://example.com/vuln1")], + url="https://test.url/", + ) + + def test_advisory_todo_missing_summary(self): + date = datetime.now() + adv = AdvisoryV2.objects.create( + unique_content_id="test_id", + url=self.advisory_data1.url, + summary="", + date_imported=date, + date_collected=date, + advisory_id="test_id", + avid="test_pipeline/test_id", + datasource_id="test_pipeline", + ) + for pkg in self.advisory_data1.affected_packages: + ImpactedPackage.objects.create( + advisory=adv, + base_purl=pkg.package, + affecting_vers=pkg.affected_version_range, + fixed_vers=pkg.fixed_version_range, + ) + pipeline = ComputeToDo() + pipeline.execute() + + todo = AdvisoryToDoV2.objects.first() + self.assertEqual(1, AdvisoryToDoV2.objects.count()) + self.assertEqual("MISSING_SUMMARY", todo.issue_type) + self.assertEqual(1, todo.advisories.count()) + + def test_advisory_todo_missing_fixed(self): + date = datetime.now() + adv = AdvisoryV2.objects.create( + unique_content_id="test_id", + url=self.advisory_data2.url, + summary=self.advisory_data2.summary, + date_imported=date, + date_collected=date, + advisory_id="test_id", + avid="test_pipeline/test_id", + datasource_id="test_pipeline", + ) + for pkg in self.advisory_data2.affected_packages: + ImpactedPackage.objects.create( + advisory=adv, + base_purl=pkg.package, + affecting_vers=pkg.affected_version_range, + fixed_vers=pkg.fixed_version_range or "", + ) + pipeline = ComputeToDo() + pipeline.execute() + + todo = AdvisoryToDoV2.objects.first() + self.assertEqual(1, AdvisoryToDoV2.objects.count()) + self.assertEqual("MISSING_FIXED_BY_PACKAGE", todo.issue_type) + self.assertEqual(1, todo.advisories.count()) + + def test_advisory_todo_missing_affected(self): + date = datetime.now() + adv = AdvisoryV2.objects.create( + unique_content_id="test_id", + url=self.advisory_data3.url, + summary=self.advisory_data3.summary, + date_imported=date, + date_collected=date, + advisory_id="test_id", + avid="test_pipeline/test_id", + datasource_id="test_pipeline", + ) + for pkg in self.advisory_data3.affected_packages: + ImpactedPackage.objects.create( + advisory=adv, + base_purl=pkg.package, + affecting_vers=pkg.affected_version_range or "", + fixed_vers=pkg.fixed_version_range, + ) + pipeline = ComputeToDo() + pipeline.execute() + + todo = AdvisoryToDoV2.objects.first() + self.assertEqual(1, AdvisoryToDoV2.objects.count()) + self.assertEqual("MISSING_AFFECTED_PACKAGE", todo.issue_type) + self.assertEqual(1, todo.advisories.count()) + + def test_advisory_todo_conflicting_fixed_affected(self): + alias = AdvisoryAlias.objects.create(alias="CVE-0000-0000") + date = datetime.now() + adv1 = AdvisoryV2.objects.create( + unique_content_id="test_id1", + url=self.advisory_data1.url, + summary=self.advisory_data1.summary, + date_imported=date, + date_collected=date, + advisory_id="test_id", + avid="test_pipeline/test_id_2", + datasource_id="test_pipeline", + ) + for pkg in self.advisory_data1.affected_packages: + ImpactedPackage.objects.create( + advisory=adv1, + base_purl=pkg.package, + affecting_vers=pkg.affected_version_range or "", + fixed_vers=pkg.fixed_version_range or "", + ) + adv1.aliases.add(alias) + adv2 = AdvisoryV2.objects.create( + unique_content_id="test_id2", + url=self.advisory_data4.url, + summary=self.advisory_data4.summary, + date_imported=date, + date_collected=date, + advisory_id="test_id", + avid="test_pipeline/test_id_2", + datasource_id="test_pipeline", + ) + for pkg in self.advisory_data4.affected_packages: + ImpactedPackage.objects.create( + advisory=adv2, + base_purl=pkg.package, + affecting_vers=pkg.affected_version_range or "", + fixed_vers=pkg.fixed_version_range or "", + ) + adv2.aliases.add(alias) + + self.assertEqual(0, AdvisoryToDoV2.objects.count()) + pipeline = ComputeToDo() + pipeline.execute() + + todo = AdvisoryToDoV2.objects.first() + self.assertEqual(1, AdvisoryToDoV2.objects.count()) + self.assertEqual("CONFLICTING_AFFECTED_AND_FIXED_BY_PACKAGES", todo.issue_type) + self.assertIn( + "CVE-0000-0000: pkg:npm/package1 with conflicting fixed version", todo.issue_detail + ) + self.assertEqual(2, todo.advisories.count()) + self.assertEqual(todo, adv2.advisory_todos.first())