Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ install_requires =
#vulntotal
python-dotenv
texttable
extractcode[full]==31.0.0


[options.extras_require]
Expand Down
20 changes: 20 additions & 0 deletions vulnerabilities/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ def from_url(cls, url):
reference_id = get_reference_id(url)
if "GHSA-" in reference_id.upper():
return cls(reference_id=reference_id, url=url)
if reference_id.startswith(("RHSA-", "RHEA-", "RHBA-")):
return cls(reference_id=reference_id, url=url)
if is_cve(reference_id):
return cls(url=url, reference_id=reference_id.upper())
return cls(url=url)
Expand Down Expand Up @@ -458,6 +460,24 @@ def clean_summary(self, summary):
return summary

def to_dict(self):
is_adv_v2 = (
self.advisory_id
or self.severities
or self.references_v2
or (self.affected_packages and isinstance(self.affected_packages[0], AffectedPackageV2))
)
if is_adv_v2:
return {
"advisory_id": self.advisory_id,
"aliases": self.aliases,
"summary": self.summary,
"affected_packages": [pkg.to_dict() for pkg in self.affected_packages],
"references_v2": [ref.to_dict() for ref in self.references_v2],
"severities": [sev.to_dict() for sev in self.severities],
"date_published": self.date_published.isoformat() if self.date_published else None,
"weaknesses": self.weaknesses,
"url": self.url if self.url else "",
}
return {
"aliases": self.aliases,
"summary": self.summary,
Expand Down
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from vulnerabilities.pipelines.v2_importers import postgresql_importer as postgresql_importer_v2
from vulnerabilities.pipelines.v2_importers import pypa_importer as pypa_importer_v2
from vulnerabilities.pipelines.v2_importers import pysec_importer as pysec_importer_v2
from vulnerabilities.pipelines.v2_importers import redhat_importer as redhat_importer_v2
from vulnerabilities.pipelines.v2_importers import vulnrichment_importer as vulnrichment_importer_v2
from vulnerabilities.pipelines.v2_importers import xen_importer as xen_importer_v2
from vulnerabilities.utils import create_registry
Expand All @@ -79,6 +80,7 @@
postgresql_importer_v2.PostgreSQLImporterPipeline,
mozilla_importer_v2.MozillaImporterPipeline,
github_osv_importer_v2.GithubOSVImporterPipeline,
redhat_importer_v2.RedHatImporterPipeline,
nvd_importer.NVDImporterPipeline,
github_importer.GitHubAPIImporterPipeline,
gitlab_importer.GitLabImporterPipeline,
Expand Down
1 change: 1 addition & 0 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2890,6 +2890,7 @@ def to_advisory_data(self) -> "AdvisoryData":
from vulnerabilities.importer import AdvisoryData

return AdvisoryData(
advisory_id=self.advisory_id,
aliases=[item.alias for item in self.aliases.all()],
summary=self.summary,
affected_packages=[
Expand Down
2 changes: 2 additions & 0 deletions vulnerabilities/pipelines/v2_importers/archlinux_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# See https://aboutcode.org for more information about nexB OSS projects.
#

import json
from typing import Iterable
from typing import Mapping

Expand Down Expand Up @@ -97,4 +98,5 @@ def parse_advisory(self, record) -> AdvisoryData:
affected_packages=affected_packages,
weaknesses=[],
url=f"https://security.archlinux.org/{avg_name}.json",
original_advisory_text=json.dumps(record),
)
195 changes: 195 additions & 0 deletions vulnerabilities/pipelines/v2_importers/redhat_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import json
import logging
import shutil
import tempfile
from io import DEFAULT_BUFFER_SIZE
from pathlib import Path
from typing import Iterable
from urllib.parse import urljoin

import dateparser
import requests
from extractcode import ExtractError
from packageurl import PackageURL
from univers.version_range import RpmVersionRange
from univers.version_range import VersionRange

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.pipes import extractcode_utils
from vulnerabilities.severity_systems import REDHAT_AGGREGATE
from vulnerabilities.utils import load_json
from vulntotal import vulntotal_utils


class RedHatImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""Import RedHat Advisories (RHSA, RHEA and RHBA)

Ingest CSAF advisories published by RedHat, including Red Hat Security Advisory (RHSA),
Red Hat Enhancement Advisory (RHEA), and Red Hat Bug Fix Advisory (RHBA).
"""

pipeline_id = "redhat_importer_v2"
spdx_license_expression = "CC-BY-4.0"
license_url = "https://access.redhat.com/security/data/"
url = "https://security.access.redhat.com/data/csaf/v2/advisories/"

@classmethod
def steps(cls):
return (
cls.fetch,
cls.collect_and_store_advisories,
cls.clean_download,
)

def fetch(self):
archive_latest_url = urljoin(self.url, "archive_latest.txt")
response = requests.get(archive_latest_url)
response.raise_for_status()
self.latest_archive_name = response.text.strip()

self.location = self.cleanup_location = Path(tempfile.mkdtemp())
archive_path = self.location / self.latest_archive_name
archive_url = urljoin(self.url, self.latest_archive_name)

response = requests.get(archive_url, stream=True)
response.raise_for_status()

with open(archive_path, "wb") as f:
for chunk in response.iter_content(chunk_size=DEFAULT_BUFFER_SIZE):
f.write(chunk)

if errors := extractcode_utils.extract_archive(
source=archive_path,
destination=self.location,
):
self.log(
f"Error while extracting archive {archive_path}: {errors}",
level=logging.ERROR,
)
raise ExtractError(errors)

def advisories_count(self) -> int:
return sum(1 for _ in self.location.rglob("*.json"))

def collect_advisories(self) -> Iterable[AdvisoryData]:
for record in self.location.rglob("*.json"):
yield self.parse_advisory(record)

def parse_advisory(self, record):
advisory = load_json(record)
document = advisory.get("document", {})
if (csaf_version := document.get("csaf_version")) and not csaf_version == "2.0":
self.log(f"Unsupported CSAF version: {csaf_version}.", level=logging.ERROR)
return

severities = []
references = []
impacts = []
affected_packages = []
notes = document.get("notes", [])
adv_sub_path = f"{record.parent.name}/{record.name}"
url = urljoin(self.url, adv_sub_path)
advisory_id = get_item(document, "tracking", "id")
release_date = get_item(document, "tracking", "initial_release_date")

summary = "\n\n".join(
note["text"] for note in notes if note["category"] != "legal_disclaimer"
)
aliases = [vul["cve"] for vul in advisory.get("vulnerabilities", [])]

for ref in document.get("references", []):
ref_url = ref.get("url")
if ref_url.startswith("https://bugzilla.redhat.com/"):
references.append(
ReferenceV2(
reference_id=ref.get("summary"),
reference_type="bug",
url=ref_url,
)
)
continue
references.append(ReferenceV2.from_url(url=ref_url))

if aggregate_severity := document.get("aggregate_severity"):
severities.append(
VulnerabilitySeverity(
system=REDHAT_AGGREGATE,
value=aggregate_severity["text"],
url=url,
)
)

impacts = get_item(advisory, "product_tree", "branches", 0, "branches", default=[])
for impact in impacts:
if impact["category"] == "product_family":
continue
for branch in impact.get("branches", []):
if purl := get_item(
branch,
"product",
"product_identification_helper",
"purl",
default=None,
):
if not purl.startswith("pkg:rpm/"):
continue
package_purl = PackageURL.from_string(purl=purl)
fixed_version = package_purl.version
if not fixed_version:
continue

fixed_version_range = RpmVersionRange.from_versions([fixed_version])
affected_version_range = VersionRange.from_string(f"vers:rpm/<{fixed_version}")
purl_dict = package_purl.to_dict()
del purl_dict["version"]
base_purl = PackageURL(**purl_dict)

affected_packages.append(
AffectedPackageV2(
package=base_purl,
affected_version_range=affected_version_range,
fixed_version_range=fixed_version_range,
)
)

return AdvisoryData(
advisory_id=advisory_id,
aliases=aliases,
summary=summary,
references_v2=references,
affected_packages=affected_packages,
severities=severities,
weaknesses=[],
date_published=dateparser.parse(release_date) if release_date else None,
url=url,
original_advisory_text=json.dumps(advisory),
)

def clean_download(self):
if hasattr(self, "cleanup_location") and self.cleanup_location.exists():
self.log(f"Removing downloaded archive: {self.latest_archive_name}")
shutil.rmtree(self.cleanup_location)

def on_failure(self):
self.clean_download()


def get_item(entity, *attributes, default=None):
try:
result = vulntotal_utils.get_item(entity, *attributes)
except (KeyError, IndexError, TypeError) as e:
result = default
return result
20 changes: 20 additions & 0 deletions vulnerabilities/pipes/extractcode_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

from extractcode import api


def extract_archive(source, destination):
"""Extract an archive at `source` to `destination`directory."""
errors = {}
for event in api.extract_archive(source, destination):
if event.done and event.errors:
errors[str(event.source)] = event.errors

return errors
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import json
import os
from pathlib import Path
from unittest.mock import Mock
from unittest.mock import patch

from django.test import TestCase

from vulnerabilities.models import AdvisoryV2
from vulnerabilities.models import PackageV2
from vulnerabilities.pipelines.v2_importers.redhat_importer import RedHatImporterPipeline
from vulnerabilities.tests import util_tests

TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "redhat" / "csaf_2_0"


class TestArchLinuxImporterPipeline(TestCase):
@patch("vulnerabilities.pipelines.v2_importers.redhat_importer.RedHatImporterPipeline.fetch")
def test_redhat_advisories_v2(self, mock_fetch):
mock_fetch.__name__ = "fetch"
pipeline = RedHatImporterPipeline()
pipeline.location = TEST_DATA
pipeline.execute()
self.assertEqual(6, AdvisoryV2.objects.count())
self.assertEqual(93, PackageV2.objects.count())
expected_file = TEST_DATA.parent / "redhat_advisoryv2-expected.json"
result = [adv.to_advisory_data().to_dict() for adv in AdvisoryV2.objects.all()]
util_tests.check_results_against_json(result, expected_file)
Loading