Skip to content

Commit ed954ec

Browse files
committed
Add tests for pysec importer
Signed-off-by: Tushar Goel <tushar.goel.dav@gmail.com>
1 parent b91f43f commit ed954ec

File tree

2 files changed

+192
-62
lines changed

2 files changed

+192
-62
lines changed
Lines changed: 51 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,39 @@
1-
import pytest
2-
from unittest.mock import patch, MagicMock
31
from datetime import datetime
2+
from unittest.mock import MagicMock
3+
from unittest.mock import patch
44

5-
from vulnerabilities.importer import AdvisoryData, AffectedPackage, Reference, VulnerabilitySeverity
6-
from vulnerabilities.utils import get_item
5+
import pytest
76
from packageurl import PackageURL
87
from univers.version_constraint import VersionConstraint
98
from univers.version_range import RANGE_CLASS_BY_SCHEMES
109
from univers.versions import SemverVersion
10+
11+
from vulnerabilities.importer import AdvisoryData
12+
from vulnerabilities.importer import AffectedPackage
13+
from vulnerabilities.importer import Reference
14+
from vulnerabilities.importer import VulnerabilitySeverity
1115
from vulnerabilities.pipelines.v2_importers.github_importer import GitHubAPIImporterPipeline
12-
from vulnerabilities.pipelines.v2_importers.github_importer import get_cwes_from_github_advisory, get_purl
16+
from vulnerabilities.pipelines.v2_importers.github_importer import get_cwes_from_github_advisory
17+
from vulnerabilities.pipelines.v2_importers.github_importer import get_purl
18+
from vulnerabilities.utils import get_item
19+
1320

1421
@pytest.fixture
1522
def mock_fetch():
16-
with patch("vulnerabilities.pipelines.v2_importers.github_importer.utils.fetch_github_graphql_query") as mock:
23+
with patch(
24+
"vulnerabilities.pipelines.v2_importers.github_importer.utils.fetch_github_graphql_query"
25+
) as mock:
1726
yield mock
1827

1928

2029
def test_advisories_count(mock_fetch):
2130
# Mock the GraphQL query response for advisory count
22-
mock_fetch.return_value = {
23-
"data": {
24-
"securityVulnerabilities": {
25-
"totalCount": 10
26-
}
27-
}
28-
}
29-
31+
mock_fetch.return_value = {"data": {"securityVulnerabilities": {"totalCount": 10}}}
32+
3033
pipeline = GitHubAPIImporterPipeline()
31-
34+
3235
count = pipeline.advisories_count()
33-
36+
3437
# Assert that the count is correct
3538
assert count == 10
3639

@@ -44,71 +47,66 @@ def test_collect_advisories(mock_fetch):
4447
{
4548
"node": {
4649
"advisory": {
47-
"identifiers": [
48-
{"type": "GHSA", "value": "GHSA-1234-ABCD"}
49-
],
50+
"identifiers": [{"type": "GHSA", "value": "GHSA-1234-ABCD"}],
5051
"summary": "Sample advisory description",
51-
"references": [{"url": "https://github.com/advisories/GHSA-1234-ABCD"}],
52+
"references": [
53+
{"url": "https://github.com/advisories/GHSA-1234-ABCD"}
54+
],
5255
"severity": "HIGH",
53-
"cwes": {
54-
"nodes": [{"cweId": "CWE-123"}]
55-
},
56-
"publishedAt": "2023-01-01T00:00:00Z"
56+
"cwes": {"nodes": [{"cweId": "CWE-123"}]},
57+
"publishedAt": "2023-01-01T00:00:00Z",
5758
},
5859
"firstPatchedVersion": {"identifier": "1.2.3"},
5960
"package": {"name": "example-package"},
60-
"vulnerableVersionRange": ">=1.0.0,<=1.2.0"
61+
"vulnerableVersionRange": ">=1.0.0,<=1.2.0",
6162
}
6263
}
6364
],
64-
"pageInfo": {
65-
"hasNextPage": False,
66-
"endCursor": None
67-
}
65+
"pageInfo": {"hasNextPage": False, "endCursor": None},
6866
}
6967
}
7068
}
7169

7270
# Mock the response from GitHub GraphQL query
7371
mock_fetch.return_value = advisory_data
74-
72+
7573
# Instantiate the pipeline
7674
pipeline = GitHubAPIImporterPipeline()
77-
75+
7876
# Collect advisories
7977
advisories = list(pipeline.collect_advisories())
80-
78+
8179
# Check if advisories were correctly parsed
8280
assert len(advisories) == 1
8381
advisory = advisories[0]
84-
82+
8583
# Validate advisory fields
8684
assert advisory.advisory_id == "GHSA-1234-ABCD"
8785
assert advisory.summary == "Sample advisory description"
8886
assert advisory.url == "https://github.com/advisories/GHSA-1234-ABCD"
8987
assert len(advisory.references_v2) == 1
9088
assert advisory.references_v2[0].reference_id == "GHSA-1234-ABCD"
9189
assert advisory.severities[0].value == "HIGH"
92-
90+
9391
# Validate affected package and version range
9492
affected_package = advisory.affected_packages[0]
9593
assert isinstance(affected_package.package, PackageURL)
9694
assert affected_package.package.name == "example-package"
97-
95+
9896
# Check CWE extraction
9997
assert advisory.weaknesses == [123]
10098

10199

102100
def test_get_purl(mock_fetch):
103101
# Test for package URL generation
104102
result = get_purl("cargo", "example/package-name")
105-
103+
106104
# Validate that the correct PackageURL is generated
107105
assert isinstance(result, PackageURL)
108106
assert result.type == "cargo"
109107
assert result.namespace == None
110108
assert result.name == "example/package-name"
111-
109+
112110

113111
def test_process_response(mock_fetch):
114112
# Mock advisory data as input for the process_response function
@@ -119,67 +117,58 @@ def test_process_response(mock_fetch):
119117
{
120118
"node": {
121119
"advisory": {
122-
"identifiers": [
123-
{"type": "GHSA", "value": "GHSA-5678-EFGH"}
124-
],
120+
"identifiers": [{"type": "GHSA", "value": "GHSA-5678-EFGH"}],
125121
"summary": "Another advisory",
126-
"references": [{"url": "https://github.com/advisories/GHSA-5678-EFGH"}],
122+
"references": [
123+
{"url": "https://github.com/advisories/GHSA-5678-EFGH"}
124+
],
127125
"severity": "MEDIUM",
128-
"cwes": {
129-
"nodes": [{"cweId": "CWE-200"}]
130-
},
131-
"publishedAt": "2023-02-01T00:00:00Z"
126+
"cwes": {"nodes": [{"cweId": "CWE-200"}]},
127+
"publishedAt": "2023-02-01T00:00:00Z",
132128
},
133129
"firstPatchedVersion": {"identifier": "2.0.0"},
134130
"package": {"name": "another-package"},
135-
"vulnerableVersionRange": ">=2.0.0,<=3.0.0"
131+
"vulnerableVersionRange": ">=2.0.0,<=3.0.0",
136132
}
137133
}
138134
],
139-
"pageInfo": {
140-
"hasNextPage": False,
141-
"endCursor": None
142-
}
135+
"pageInfo": {"hasNextPage": False, "endCursor": None},
143136
}
144137
}
145138
}
146-
139+
147140
# Mock the response from GitHub GraphQL query
148141
mock_fetch.return_value = advisory_data
149-
142+
150143
# Process the mock response
151144
result = list(GitHubAPIImporterPipeline().collect_advisories())
152-
145+
153146
# Check the results
154147
assert len(result) == 1
155148
advisory = result[0]
156-
149+
157150
# Validate the advisory data
158151
assert advisory.advisory_id == "GHSA-5678-EFGH"
159152
assert advisory.summary == "Another advisory"
160153
assert advisory.url == "https://github.com/advisories/GHSA-5678-EFGH"
161-
154+
162155
# Check CWE extraction
163156
assert advisory.weaknesses == [200]
164157

165158

166159
def test_get_cwes_from_github_advisory(mock_fetch):
167160
# Mock CWEs extraction from GitHub advisory
168-
advisory_data = {
169-
"cwes": {
170-
"nodes": [{"cweId": "CWE-522"}]
171-
}
172-
}
161+
advisory_data = {"cwes": {"nodes": [{"cweId": "CWE-522"}]}}
173162

174163
cwes = get_cwes_from_github_advisory(advisory_data)
175-
164+
176165
# Validate the CWE ID extraction
177166
assert cwes == [522]
178167

179168

180169
def test_invalid_package_type_in_get_purl(mock_fetch):
181170
# Test for invalid package type
182171
result = get_purl("invalidpkg", "example/package-name")
183-
172+
184173
# Assert that None is returned for an invalid package type
185174
assert result is None
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import json
2+
from io import BytesIO
3+
from unittest.mock import MagicMock
4+
from unittest.mock import patch
5+
from zipfile import BadZipFile
6+
from zipfile import ZipFile
7+
8+
import pytest
9+
10+
from vulnerabilities.importer import AdvisoryData
11+
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline
12+
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
13+
from vulnerabilities.pipelines.v2_importers.pysec_importer import (
14+
PyPIImporterPipeline, # Path to the PyPI Importer
15+
)
16+
17+
18+
@pytest.fixture
19+
def mock_zip_data():
20+
# Create mock zip data for testing
21+
zip_buffer = BytesIO()
22+
with ZipFile(zip_buffer, mode="w") as zip_file:
23+
# Create a sample advisory file inside the zip
24+
advisory_data = {
25+
"advisory_id": "PYSEC-1234",
26+
"summary": "Sample PyPI advisory",
27+
"references": [{"url": "https://pypi.org/advisory/PYSEC-1234"}],
28+
"package": {"name": "example-package"},
29+
"affected_versions": ">=1.0.0,<=2.0.0",
30+
}
31+
# Save the sample advisory as a JSON file
32+
with zip_file.open("PYSEC-1234.json", "w") as f:
33+
f.write(json.dumps(advisory_data).encode("utf-8"))
34+
zip_buffer.seek(0)
35+
return zip_buffer
36+
37+
38+
@pytest.fixture
39+
def mock_requests_get():
40+
with patch("requests.get") as mock:
41+
yield mock
42+
43+
44+
def test_fetch_zip(mock_requests_get, mock_zip_data):
45+
# Mock the `requests.get` to return the mock zip data
46+
mock_requests_get.return_value.content = mock_zip_data.read()
47+
48+
pipeline = PyPIImporterPipeline()
49+
50+
# Call the `fetch_zip` method
51+
pipeline.fetch_zip()
52+
53+
# Reset the position of mock_zip_data to 0 before comparing
54+
mock_zip_data.seek(0)
55+
56+
# Verify that the zip file content is correctly assigned
57+
assert pipeline.advisory_zip == mock_zip_data.read()
58+
59+
60+
def test_advisories_count(mock_requests_get, mock_zip_data):
61+
# Mock the `requests.get` to return the mock zip data
62+
mock_requests_get.return_value.content = mock_zip_data.read()
63+
64+
pipeline = PyPIImporterPipeline()
65+
66+
# Fetch the zip data
67+
pipeline.fetch_zip()
68+
69+
# Test advisories count
70+
count = pipeline.advisories_count()
71+
72+
# Verify that it correctly counts the number of advisory files starting with 'PYSEC-'
73+
assert count == 1
74+
75+
76+
def test_collect_advisories(mock_requests_get, mock_zip_data):
77+
# Mock the `requests.get` to return the mock zip data
78+
mock_requests_get.return_value.content = mock_zip_data.read()
79+
80+
pipeline = PyPIImporterPipeline()
81+
82+
# Fetch the zip data
83+
pipeline.fetch_zip()
84+
85+
# Mock the `parse_advisory_data_v2` function to return a dummy AdvisoryData
86+
with patch("vulnerabilities.importers.osv.parse_advisory_data_v2") as mock_parse:
87+
mock_parse.return_value = AdvisoryData(
88+
advisory_id="PYSEC-1234",
89+
summary="Sample PyPI advisory",
90+
references_v2=[{"url": "https://pypi.org/advisory/PYSEC-1234"}],
91+
affected_packages=[],
92+
weaknesses=[],
93+
url="https://pypi.org/advisory/PYSEC-1234",
94+
)
95+
96+
# Call the `collect_advisories` method
97+
advisories = list(pipeline.collect_advisories())
98+
99+
# Ensure we have 1 advisory
100+
assert len(advisories) == 1
101+
102+
# Verify advisory data
103+
advisory = advisories[0]
104+
assert advisory.advisory_id == "PYSEC-1234"
105+
assert advisory.summary == "Sample PyPI advisory"
106+
assert advisory.url == "https://pypi.org/advisory/PYSEC-1234"
107+
108+
109+
def test_collect_advisories_invalid_file(mock_requests_get, mock_zip_data):
110+
# Create a mock zip with an invalid file name
111+
zip_buffer = BytesIO()
112+
with ZipFile(zip_buffer, mode="w") as zip_file:
113+
zip_file.writestr("INVALID_FILE.txt", "Invalid content")
114+
115+
zip_buffer.seek(0)
116+
mock_requests_get.return_value.content = zip_buffer.read()
117+
118+
pipeline = PyPIImporterPipeline()
119+
120+
# Fetch the zip data
121+
pipeline.fetch_zip()
122+
123+
# Mock the `parse_advisory_data_v2` function
124+
with patch("vulnerabilities.importers.osv.parse_advisory_data_v2") as mock_parse:
125+
mock_parse.return_value = AdvisoryData(
126+
advisory_id="PYSEC-1234",
127+
summary="Sample PyPI advisory",
128+
references_v2=[{"url": "https://pypi.org/advisory/PYSEC-1234"}],
129+
affected_packages=[],
130+
weaknesses=[],
131+
url="https://pypi.org/advisory/PYSEC-1234",
132+
)
133+
134+
# Call the `collect_advisories` method and check the logging for invalid file
135+
with patch(
136+
"vulnerabilities.pipelines.VulnerableCodeBaseImporterPipelineV2.log"
137+
) as mock_log:
138+
advisories = list(pipeline.collect_advisories())
139+
140+
# Ensure no advisories were yielded due to the invalid file
141+
assert len(advisories) == 0

0 commit comments

Comments
 (0)