diff --git a/src/anchore_security_cli/identifiers/aliases.py b/src/anchore_security_cli/identifiers/aliases.py index c71d1e1..51a7c5b 100644 --- a/src/anchore_security_cli/identifiers/aliases.py +++ b/src/anchore_security_cli/identifiers/aliases.py @@ -62,6 +62,7 @@ def parse_identifier_from_url(url: str) -> str | None: class Aliases: cve: list[str] = field(default_factory=list) gcve: list[str] = field(default_factory=list) + cnvd: list[str] = field(default_factory=list) github: list[str] = field(default_factory=list) chainguard: list[str] = field(default_factory=list) bitnami: list[str] = field(default_factory=list) @@ -113,6 +114,7 @@ def normalize(cls, alias: str) -> str: def from_list(cls, aliases: list[str], provider: str | None = None): # noqa: C901, PLR0912, PLR0915 cve = set() gcve = set() + cnvd = set() github = set() chainguard = set() bitnami = set() @@ -163,6 +165,8 @@ def from_list(cls, aliases: list[str], provider: str | None = None): # noqa: C9 cve_id = gcve_to_cve(a) if cve_id: cve.add(cve_id) + elif a.startswith("CNVD-"): + cnvd.add(a) elif a.startswith("GHSA-"): github.add(a) elif a.startswith("CGA-"): @@ -234,6 +238,7 @@ def from_list(cls, aliases: list[str], provider: str | None = None): # noqa: C9 return Aliases( cve=list(cve), gcve=list(gcve), + cnvd=list(cnvd), github=list(github), chainguard=list(chainguard), bitnami=list(bitnami), diff --git a/src/anchore_security_cli/identifiers/providers/__init__.py b/src/anchore_security_cli/identifiers/providers/__init__.py index c5a78a1..66d0192 100644 --- a/src/anchore_security_cli/identifiers/providers/__init__.py +++ b/src/anchore_security_cli/identifiers/providers/__init__.py @@ -6,6 +6,7 @@ from anchore_security_cli.identifiers.providers.bellsoft import BellSoft from anchore_security_cli.identifiers.providers.bitnami import Bitnami from anchore_security_cli.identifiers.providers.chainguard import Chainguard +from anchore_security_cli.identifiers.providers.cnvd import CNVD from anchore_security_cli.identifiers.providers.cpan import CPAN from anchore_security_cli.identifiers.providers.cve5 import CVE5 from anchore_security_cli.identifiers.providers.debian import Debian @@ -37,6 +38,7 @@ class Providers: cve5: CVE5 github: GitHub gcve: GCVE + cnvd: CNVD chainguard: Chainguard bitnami: Bitnami psf: PSF @@ -109,6 +111,7 @@ def fetch_all() -> Providers: cve5 = executor.submit(CVE5) github = executor.submit(GitHub) gcve = executor.submit(GCVE) + cnvd = executor.submit(CNVD) openssf_malicious_packages = executor.submit(OpenSSFMaliciousPackages) ubuntu = executor.submit(Ubuntu) chainguard = executor.submit(Chainguard) @@ -140,6 +143,7 @@ def fetch_all() -> Providers: cve5=cve5.result(), github=github.result(), gcve=gcve.result(), + cnvd=cnvd.result(), chainguard=chainguard.result(), bitnami=bitnami.result(), psf=psf.result(), diff --git a/src/anchore_security_cli/identifiers/providers/cnvd.py b/src/anchore_security_cli/identifiers/providers/cnvd.py new file mode 100644 index 0000000..86a5134 --- /dev/null +++ b/src/anchore_security_cli/identifiers/providers/cnvd.py @@ -0,0 +1,69 @@ +import logging + +import orjson +import requests + +from anchore_security_cli.identifiers.aliases import Aliases +from anchore_security_cli.identifiers.providers.provider import Provider, ProviderRecord + + +class CNVD(Provider): + def __init__(self): + super().__init__( + name="China National Vulnerability Database", + ) + + def _normalise_identifier(self, identifier: str) -> str: + components = identifier.split("-", 1) + if len(components) < 2: + return identifier + + prefix = components[0].upper() + return f"{prefix}-{components[1]}" + + def _fetch(self) -> list[ProviderRecord]: + records = [] + r = requests.get( + url="https://vulnerability.circl.lu/dumps/cnvd.ndjson", + timeout=30, + stream=True, + ) + r.raise_for_status() + + for record in r.iter_lines(): + cnvd = orjson.loads(record) + + cnvd_id = cnvd.get("number") + if not cnvd_id: + continue + + cnvd_id = self._normalise_identifier(cnvd_id) + if not cnvd_id.startswith("CNVD-"): + logging.warning(f"Skipping CNVD record with unexpected id: {cnvd_id!r}") + continue + + aliases = [cnvd_id] + + cves = cnvd.get("cves", {}).get("cve") + if cves: + # This might be a single entry or a list, so handle both + if isinstance(cves, dict): + cves = [cves] + + for c in cves: + cve_id = c.get("cveNumber") + if cve_id: + aliases.append(self._normalise_identifier(cve_id)) + + published = cnvd.get("openTime") + logging.trace(f"processing CNVD record for {cnvd_id}") + + records.append( + ProviderRecord( + id=cnvd_id, + published=self._parse_date(published), + aliases=Aliases.from_list(aliases, provider=self.name), + ), + ) + + return records