Skip to content

Commit 0283699

Browse files
committed
Add Local VulnerableCod Datasource in VulnTotal and allow live evaluation #1984
Signed-off-by: Michael Ehab Mikhail <michael.ehab@hotmail.com>
1 parent dcb0511 commit 0283699

3 files changed

Lines changed: 265 additions & 0 deletions

File tree

vulntotal/datasources/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from vulntotal.datasources import safetydb
1616
from vulntotal.datasources import snyk
1717
from vulntotal.datasources import vulnerablecode
18+
from vulntotal.datasources import vulnerablecode_local
1819
from vulntotal.validator import DataSource
1920

2021
DATASOURCE_REGISTRY = {
@@ -26,4 +27,5 @@
2627
"osv": osv.OSVDataSource,
2728
"snyk": snyk.SnykDataSource,
2829
"vulnerablecode": vulnerablecode.VulnerableCodeDataSource,
30+
"vulnerablecode_local": vulnerablecode_local.LocalVulnerableCodeDataSource,
2931
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/nexB/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import logging
11+
import os
12+
from urllib.parse import urljoin
13+
14+
import requests
15+
from dotenv import load_dotenv
16+
from packageurl import PackageURL
17+
18+
from vulntotal.datasources.vulnerablecode import VulnerableCodeDataSource
19+
from vulntotal.validator import VendorData
20+
21+
logger = logging.getLogger(__name__)
22+
23+
24+
def _is_true(val: str | None) -> bool:
25+
return (val is not None) and str(val).strip().lower() in {"1", "true", "yes", "on"}
26+
27+
28+
class LocalVulnerableCodeDataSource(VulnerableCodeDataSource):
29+
live_eval_api_path = "api/v2/live-evaluation/evaluate"
30+
vc_purl_search_api_path = "api/v2/advisories-packages/bulk_search/"
31+
32+
def __init__(self):
33+
super().__init__()
34+
load_dotenv()
35+
36+
host = os.environ.get("VCIO_HOST", "localhost").rstrip("/")
37+
port = os.environ.get("VCIO_PORT", "8000")
38+
39+
if host.startswith("http://") or host.startswith("https://"):
40+
base = host
41+
else:
42+
base = f"http://{host}:{port}"
43+
44+
self.global_instance = f"{base}/"
45+
46+
self._enable_live_eval = _is_true(os.environ.get("ENABLE_LIVE_EVAL", False))
47+
48+
def _trigger_live_evaluation(self, purl: PackageURL) -> bool:
49+
"""Trigger live evaluation for the given purl on the local VCIO instance.
50+
51+
Returns True if the trigger was accepted and False otherwise.
52+
"""
53+
url = urljoin(self.global_instance, self.live_eval_api_path)
54+
try:
55+
response = requests.post(url, json={"purl_string": str(purl)})
56+
except Exception as e:
57+
logger.error(f"Live evaluation trigger failed for {purl}: {e}")
58+
return False
59+
60+
if response.status_code != 202:
61+
logger.error(
62+
f"Live evaluation trigger for {purl} failed with status {response.status_code}: {response.text}"
63+
)
64+
return False
65+
66+
logger.info(f"Live evaluation accepted for {purl} on {url}")
67+
return True
68+
69+
def fetch_post_json(self, payload):
70+
url = urljoin(self.global_instance, self.vc_purl_search_api_path)
71+
try:
72+
response = requests.post(url, json=payload)
73+
except Exception as e:
74+
logger.error(f"Error while fetching {url}: {e}")
75+
return
76+
if response.status_code != 200:
77+
logger.error(f"Error while fetching {url}")
78+
return
79+
return response.json()
80+
81+
def datasource_advisory(self, purl):
82+
if purl.type not in self.supported_ecosystem() or purl.version is None:
83+
return
84+
85+
if self._enable_live_eval:
86+
self._trigger_live_evaluation(purl)
87+
88+
metadata = self.fetch_post_json({"purls": [str(purl)]})
89+
self._raw_dump.append(metadata)
90+
if not metadata:
91+
return
92+
93+
packages = metadata.get("packages") or []
94+
advisories_map = metadata.get("advisories") or {}
95+
if not packages:
96+
return
97+
98+
pkg_entry = next((pkg for pkg in packages if pkg.get("purl") == str(purl)), packages[0])
99+
affected_map = pkg_entry.get("affected_by_vulnerabilities", {}) or {}
100+
101+
for advisory_id, details in affected_map.items():
102+
fixed_versions = []
103+
fixed_purls = details.get("fixed_by_packages") or []
104+
for fp in fixed_purls:
105+
try:
106+
ver = PackageURL.from_string(fp).version
107+
if ver:
108+
fixed_versions.append(ver)
109+
except Exception:
110+
continue
111+
112+
advisory_key = advisory_id.split("/")[-1]
113+
advisory_obj = advisories_map.get(advisory_key, {})
114+
aliases = advisory_obj.get("aliases") or []
115+
116+
yield VendorData(
117+
purl=PackageURL(purl.type, purl.namespace, purl.name),
118+
aliases=aliases,
119+
affected_versions=[purl.version],
120+
fixed_versions=fixed_versions,
121+
)
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import os
11+
from typing import List
12+
13+
import pytest
14+
from packageurl import PackageURL
15+
16+
from vulntotal.datasources.vulnerablecode_local import LocalVulnerableCodeDataSource
17+
18+
19+
class FakeResponse:
20+
def __init__(self, status_code=200, json_data=None, text=""):
21+
self.status_code = status_code
22+
self._json = json_data
23+
self.text = text or ("" if json_data is None else str(json_data))
24+
25+
def json(self):
26+
return self._json
27+
28+
29+
def make_v2_advisories_response(
30+
pkg_purl: str, advisory_id: str, aliases: List[str], fixes: List[str]
31+
):
32+
return {
33+
"packages": [
34+
{
35+
"purl": pkg_purl,
36+
"affected_by_vulnerabilities": {
37+
"live_v2_importer_name/"
38+
+ advisory_id: {
39+
"advisory_id": "live_v2_importer_name/" + advisory_id,
40+
"fixed_by_packages": fixes,
41+
"code_fixes": [],
42+
}
43+
},
44+
}
45+
],
46+
"advisories": {
47+
advisory_id: {
48+
"advisory_id": "live_v2_importer_name/" + advisory_id,
49+
"aliases": aliases,
50+
}
51+
},
52+
}
53+
54+
55+
def test_local_vulnerablecode_v2_bulk_search_and_vendor_data(monkeypatch):
56+
monkeypatch.setenv("VCIO_HOST", "localhost")
57+
monkeypatch.setenv("VCIO_PORT", "1234")
58+
monkeypatch.setenv("ENABLE_LIVE_EVAL", "0")
59+
60+
calls = []
61+
62+
def fake_post(url, json=None, **kwargs):
63+
calls.append((url, json))
64+
if url.endswith("/api/v2/advisories-packages/bulk_search/"):
65+
return FakeResponse(
66+
200,
67+
make_v2_advisories_response(
68+
pkg_purl="pkg:pypi/demo@1.2.3",
69+
advisory_id="ADV-123",
70+
aliases=["CVE-2024-0001", "GHSA-foo"],
71+
fixes=["pkg:pypi/demo@1.2.4", "pkg:pypi/demo@1.3.0"],
72+
),
73+
)
74+
75+
return FakeResponse(404, {"detail": "not found"})
76+
77+
monkeypatch.setattr("vulntotal.datasources.vulnerablecode_local.requests.post", fake_post)
78+
79+
ds = LocalVulnerableCodeDataSource()
80+
purl = PackageURL.from_string("pkg:pypi/demo@1.2.3")
81+
82+
results = list(ds.datasource_advisory(purl))
83+
84+
assert any(
85+
"/api/v2/advisories-packages/bulk_search/" in url for url, _ in calls
86+
), "v2 advisories bulk_search should be called"
87+
88+
assert not any(
89+
"/api/v2/live-evaluation/evaluate" in url for url, _ in calls
90+
), "live evaluation should not be called when disabled"
91+
92+
assert len(results) == 1
93+
vd = results[0].to_dict()
94+
assert vd["purl"] == "pkg:pypi/demo"
95+
assert vd["aliases"] == ["CVE-2024-0001", "GHSA-foo"]
96+
assert vd["affected_versions"] == ["1.2.3"]
97+
assert sorted(vd["fixed_versions"]) == ["1.2.4", "1.3.0"]
98+
99+
100+
def test_local_vulnerablecode_triggers_live_evaluation_when_enabled(monkeypatch):
101+
monkeypatch.setenv("VCIO_HOST", "localhost")
102+
monkeypatch.setenv("VCIO_PORT", "1234")
103+
monkeypatch.setenv("ENABLE_LIVE_EVAL", "1")
104+
105+
calls = []
106+
107+
def fake_post(url, json=None, **kwargs): # noqa: A002 (shadowing builtins)
108+
calls.append((url, json))
109+
if url.endswith("/api/v2/live-evaluation/evaluate"):
110+
return FakeResponse(202, {"status": "accepted"})
111+
if url.endswith("/api/v2/advisories-packages/bulk_search/"):
112+
return FakeResponse(
113+
200,
114+
make_v2_advisories_response(
115+
pkg_purl="pkg:pypi/demo@1.2.3",
116+
advisory_id="ADV-999",
117+
aliases=["CVE-2025-1111"],
118+
fixes=["pkg:pypi/demo@1.2.5"],
119+
),
120+
)
121+
return FakeResponse(404, {"detail": "not found"})
122+
123+
monkeypatch.setattr("vulntotal.datasources.vulnerablecode_local.requests.post", fake_post)
124+
125+
ds = LocalVulnerableCodeDataSource()
126+
purl = PackageURL.from_string("pkg:pypi/demo@1.2.3")
127+
128+
results = list(ds.datasource_advisory(purl))
129+
130+
urls = [u for u, _ in calls]
131+
assert any(
132+
"/api/v2/live-evaluation/evaluate" in url for url in urls
133+
), "live evaluation endpoint should be called when enabled"
134+
assert any(
135+
"/api/v2/advisories-packages/bulk_search/" in url for url in urls
136+
), "v2 advisories bulk_search should be called"
137+
138+
assert len(results) == 1
139+
vd = results[0].to_dict()
140+
assert vd["aliases"] == ["CVE-2025-1111"]
141+
assert vd["affected_versions"] == ["1.2.3"]
142+
assert vd["fixed_versions"] == ["1.2.5"]

0 commit comments

Comments
 (0)