Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/anchore_security_cli/identifiers/aliases.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def parse_identifier_from_url(url: str) -> str | None:
class Aliases:
cve: list[str] = field(default_factory=list)
gcve: list[str] = field(default_factory=list)
cnvd: list[str] = field(default_factory=list)
github: list[str] = field(default_factory=list)
chainguard: list[str] = field(default_factory=list)
bitnami: list[str] = field(default_factory=list)
Expand Down Expand Up @@ -113,6 +114,7 @@ def normalize(cls, alias: str) -> str:
def from_list(cls, aliases: list[str], provider: str | None = None): # noqa: C901, PLR0912, PLR0915
cve = set()
gcve = set()
cnvd = set()
github = set()
chainguard = set()
bitnami = set()
Expand Down Expand Up @@ -163,6 +165,8 @@ def from_list(cls, aliases: list[str], provider: str | None = None): # noqa: C9
cve_id = gcve_to_cve(a)
if cve_id:
cve.add(cve_id)
elif a.startswith("CNVD-"):
cnvd.add(a)
elif a.startswith("GHSA-"):
github.add(a)
elif a.startswith("CGA-"):
Expand Down Expand Up @@ -234,6 +238,7 @@ def from_list(cls, aliases: list[str], provider: str | None = None): # noqa: C9
return Aliases(
cve=list(cve),
gcve=list(gcve),
cnvd=list(cnvd),
github=list(github),
chainguard=list(chainguard),
bitnami=list(bitnami),
Expand Down
4 changes: 4 additions & 0 deletions src/anchore_security_cli/identifiers/providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from anchore_security_cli.identifiers.providers.bellsoft import BellSoft
from anchore_security_cli.identifiers.providers.bitnami import Bitnami
from anchore_security_cli.identifiers.providers.chainguard import Chainguard
from anchore_security_cli.identifiers.providers.cnvd import CNVD
from anchore_security_cli.identifiers.providers.cpan import CPAN
from anchore_security_cli.identifiers.providers.cve5 import CVE5
from anchore_security_cli.identifiers.providers.debian import Debian
Expand Down Expand Up @@ -37,6 +38,7 @@ class Providers:
cve5: CVE5
github: GitHub
gcve: GCVE
cnvd: CNVD
chainguard: Chainguard
bitnami: Bitnami
psf: PSF
Expand Down Expand Up @@ -109,6 +111,7 @@ def fetch_all() -> Providers:
cve5 = executor.submit(CVE5)
github = executor.submit(GitHub)
gcve = executor.submit(GCVE)
cnvd = executor.submit(CNVD)
openssf_malicious_packages = executor.submit(OpenSSFMaliciousPackages)
ubuntu = executor.submit(Ubuntu)
chainguard = executor.submit(Chainguard)
Expand Down Expand Up @@ -140,6 +143,7 @@ def fetch_all() -> Providers:
cve5=cve5.result(),
github=github.result(),
gcve=gcve.result(),
cnvd=cnvd.result(),
chainguard=chainguard.result(),
bitnami=bitnami.result(),
psf=psf.result(),
Expand Down
69 changes: 69 additions & 0 deletions src/anchore_security_cli/identifiers/providers/cnvd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging

import orjson
import requests

from anchore_security_cli.identifiers.aliases import Aliases
from anchore_security_cli.identifiers.providers.provider import Provider, ProviderRecord


class CNVD(Provider):
def __init__(self):
super().__init__(
name="China National Vulnerability Database",
)

def _normalise_identifier(self, identifier: str) -> str:
components = identifier.split("-", 1)
if len(components) < 2:
return identifier

prefix = components[0].upper()
return f"{prefix}-{components[1]}"

def _fetch(self) -> list[ProviderRecord]:
records = []
r = requests.get(
url="https://vulnerability.circl.lu/dumps/cnvd.ndjson",
timeout=30,
stream=True,
)
r.raise_for_status()

for record in r.iter_lines():
cnvd = orjson.loads(record)

cnvd_id = cnvd.get("number")
if not cnvd_id:
continue

cnvd_id = self._normalise_identifier(cnvd_id)
if not cnvd_id.startswith("CNVD-"):
logging.warning(f"Skipping CNVD record with unexpected id: {cnvd_id!r}")
continue

aliases = [cnvd_id]

cves = cnvd.get("cves", {}).get("cve")
if cves:
# This might be a single entry or a list, so handle both
if isinstance(cves, dict):
cves = [cves]

for c in cves:
cve_id = c.get("cveNumber")
if cve_id:
aliases.append(self._normalise_identifier(cve_id))

published = cnvd.get("openTime")
logging.trace(f"processing CNVD record for {cnvd_id}")

records.append(
ProviderRecord(
id=cnvd_id,
published=self._parse_date(published),
aliases=Aliases.from_list(aliases, provider=self.name),
),
)

return records