|
8 | 8 | # |
9 | 9 |
|
10 | 10 | import logging |
11 | | -from datetime import timezone |
12 | | -from typing import Iterable |
13 | | -from urllib.parse import urljoin |
14 | 11 |
|
15 | | -import defusedxml.ElementTree as DET |
16 | | -import requests |
17 | | -from dateutil import parser as dateparser |
| 12 | +import re |
| 13 | +from bs4 import BeautifulSoup |
18 | 14 | from packageurl import PackageURL |
19 | | -from univers.version_range import OpensslVersionRange |
20 | | -from univers.versions import OpensslVersion |
| 15 | +from vulnerabilities.utils import get_item |
| 16 | +from vulnerabilities.utils import fetch_response |
| 17 | +from datetime import timezone |
| 18 | +from dateutil import parser as dateparser |
21 | 19 |
|
22 | 20 | from vulnerabilities.importer import AdvisoryData |
23 | 21 | from vulnerabilities.importer import AffectedPackage |
24 | 22 | from vulnerabilities.importer import Importer |
25 | 23 | from vulnerabilities.importer import Reference |
26 | 24 | from vulnerabilities.importer import VulnerabilitySeverity |
27 | 25 | from vulnerabilities.severity_systems import SCORING_SYSTEMS |
| 26 | +from univers.version_range import OpensslVersionRange |
28 | 27 |
|
29 | 28 | logger = logging.getLogger(__name__) |
30 | 29 |
|
31 | | - |
32 | 30 | class OpensslImporter(Importer): |
33 | | - spdx_license_expression = "Apache-2.0" |
34 | | - license_url = "https://github.com/openssl/openssl/blob/master/LICENSE.txt" |
35 | | - url = "https://www.openssl.org/news/vulnerabilities.xml" |
36 | | - importer_name = "OpenSSL Importer" |
37 | | - |
38 | | - def fetch(self): |
39 | | - response = requests.get(url=self.url) |
40 | | - if not response.status_code == 200: |
41 | | - logger.error(f"Error while fetching {self.url}: {response.status_code}") |
42 | | - return |
43 | | - return response.content |
44 | | - |
45 | | - def advisory_data(self) -> Iterable[AdvisoryData]: |
46 | | - xml_response = self.fetch() |
47 | | - return parse_vulnerabilities(xml_response) |
48 | | - |
49 | | - |
50 | | -def parse_vulnerabilities(xml_response) -> Iterable[AdvisoryData]: |
51 | | - root = DET.fromstring(xml_response) |
52 | | - for xml_issue in root: |
53 | | - if xml_issue.tag == "issue": |
54 | | - advisory = to_advisory_data(xml_issue) |
55 | | - if advisory: |
56 | | - yield advisory |
57 | | - |
58 | | - |
59 | | -def to_advisory_data(xml_issue) -> AdvisoryData: |
60 | | - """ |
61 | | - Return AdvisoryData from given xml_issue |
62 | | - """ |
63 | | - |
64 | | - purl = PackageURL(type="openssl", name="openssl") |
65 | | - cve = advisory_url = severity = summary = None |
66 | | - safe_pkg_versions = {} |
67 | | - vuln_pkg_versions_by_base_version = {} |
68 | | - aliases = [] |
69 | | - references = [] |
70 | | - affected_packages = [] |
71 | | - date_published = xml_issue.attrib["public"].strip() |
72 | | - |
73 | | - for info in xml_issue: |
74 | | - if info.tag == "impact": |
75 | | - severity = VulnerabilitySeverity( |
76 | | - system=SCORING_SYSTEMS["generic_textual"], value=info.attrib["severity"] |
77 | | - ) |
78 | | - |
79 | | - elif info.tag == "advisory": |
80 | | - advisory_url = info.attrib["url"] |
81 | | - if not advisory_url.startswith("https://web.archive.org"): |
82 | | - advisory_url = urljoin("https://www.openssl.org", advisory_url) |
83 | | - |
84 | | - elif info.tag == "cve": |
85 | | - cve = info.attrib.get("name") |
86 | | - # use made up alias to compensate for case when advisory doesn't have CVE-ID |
87 | | - madeup_alias = f"VC-OPENSSL-{date_published}" |
88 | | - if cve: |
89 | | - cve = f"CVE-{cve}" |
90 | | - madeup_alias = f"{madeup_alias}-{cve}" |
91 | | - aliases.append(cve) |
92 | | - references.append( |
93 | | - Reference(reference_id=cve, url=f"https://nvd.nist.gov/vuln/detail/{cve}") |
94 | | - ) |
95 | | - aliases.append(madeup_alias) |
96 | | - |
97 | | - elif info.tag == "affects": |
98 | | - affected_base = info.attrib["base"] |
99 | | - affected_version = info.attrib["version"] |
100 | | - if affected_base.startswith("fips"): |
101 | | - logger.error( |
102 | | - f"{affected_base!r} is a OpenSSL-FIPS Object Module and isn't supported by OpensslImporter. Use a different importer." |
103 | | - ) |
104 | | - return |
105 | | - if affected_base in vuln_pkg_versions_by_base_version: |
106 | | - vuln_pkg_versions_by_base_version[affected_base].append(affected_version) |
107 | | - else: |
108 | | - vuln_pkg_versions_by_base_version[affected_base] = [affected_version] |
109 | | - |
110 | | - elif info.tag == "fixed": |
111 | | - fixed_base = info.attrib["base"] |
112 | | - fixed_version = info.attrib["version"] |
113 | | - safe_pkg_versions[fixed_base] = fixed_version |
114 | | - for commit in info: |
115 | | - commit_hash = commit.attrib["hash"] |
116 | | - references.append( |
117 | | - Reference( |
118 | | - url=urljoin("https://github.com/openssl/openssl/commit/", commit_hash) |
119 | | - ) |
120 | | - ) |
121 | | - |
122 | | - elif info.tag == "description": |
123 | | - summary = " ".join(info.text.split()) |
124 | | - |
125 | | - elif info.tag in ("reported", "problemtype", "title"): |
126 | | - # as of now, these info isn't useful for AdvisoryData |
127 | | - # for more see: https://github.com/nexB/vulnerablecode/issues/688 |
128 | | - continue |
129 | | - else: |
130 | | - logger.error( |
131 | | - f"{info.tag!r} is a newly introduced tag. Modify the importer to make use of this new info." |
132 | | - ) |
133 | | - |
134 | | - for base_version, affected_versions in vuln_pkg_versions_by_base_version.items(): |
135 | | - affected_version_range = OpensslVersionRange.from_versions(affected_versions) |
136 | | - fixed_version = None |
137 | | - if base_version in safe_pkg_versions: |
138 | | - fixed_version = OpensslVersion(safe_pkg_versions[base_version]) |
139 | | - affected_package = AffectedPackage( |
140 | | - package=purl, |
141 | | - affected_version_range=affected_version_range, |
142 | | - fixed_version=fixed_version, |
| 31 | + |
| 32 | + root_url = 'https://openssl-library.org/news/vulnerabilities/index.html' |
| 33 | + license_url = 'https://spdx.org/licenses/OpenSSL-standalone.html' |
| 34 | + spdx_license_expression = 'OpenSSL-standalone' |
| 35 | + |
| 36 | + def advisory_data(self): |
| 37 | + output_data = get_adv_data(self.root_url) |
| 38 | + for data in output_data: |
| 39 | + yield self.to_advisory(data) |
| 40 | + |
| 41 | + def to_advisory(self,data): |
| 42 | + #alias |
| 43 | + alias = get_item(data,"CVE") |
| 44 | + |
| 45 | + #published data |
| 46 | + date_published = get_item(data,'date_published') |
| 47 | + parsed_date_published = dateparser.parse(date_published, yearfirst=True).replace( |
| 48 | + tzinfo=timezone.utc |
143 | 49 | ) |
144 | | - affected_packages.append(affected_package) |
145 | | - |
146 | | - if severity and advisory_url: |
147 | | - references.append(Reference(url=advisory_url, severities=[severity])) |
148 | | - elif advisory_url: |
149 | | - references.append(Reference(url=advisory_url)) |
150 | | - |
151 | | - parsed_date_published = dateparser.parse(date_published, yearfirst=True).replace( |
152 | | - tzinfo=timezone.utc |
153 | | - ) |
154 | | - |
155 | | - return AdvisoryData( |
156 | | - aliases=aliases, |
157 | | - summary=summary, |
158 | | - affected_packages=affected_packages, |
159 | | - references=references, |
160 | | - date_published=parsed_date_published, |
161 | | - url=advisory_url if advisory_url else "https://www.openssl.org/news/vulnerabilities.xml", |
162 | | - ) |
| 50 | + |
| 51 | + #affected packages |
| 52 | + affected_packages = [] |
| 53 | + affected_package_out = get_item(data,'affected_packages') |
| 54 | + for affected in affected_package_out: |
| 55 | + if 'fips' in affected: |
| 56 | + break |
| 57 | + versions = re.findall(r"(?<=from\s)([^\s]+)|(?<=before\s)([^\s]+)", affected) |
| 58 | + versions = [v for group in versions for v in group if v] # Output: ['1.0.1', '1.0.1j'] |
| 59 | + affected_version = OpensslVersionRange.from_versions(versions) |
| 60 | + affected_packages.append(AffectedPackage( |
| 61 | + package=PackageURL( |
| 62 | + type="openssl", |
| 63 | + name="openssl" |
| 64 | + ), |
| 65 | + affected_version_range=affected_version |
| 66 | + )) |
| 67 | + |
| 68 | + #Severity |
| 69 | + severity = VulnerabilitySeverity( |
| 70 | + system=SCORING_SYSTEMS["generic_textual"], value=get_item(data,"severity") |
| 71 | + ) |
| 72 | + |
| 73 | + #Reference |
| 74 | + references = [] |
| 75 | + for reference in get_item(data,"references"): |
| 76 | + references.append(Reference( |
| 77 | + severities=[severity], |
| 78 | + reference_id=alias, |
| 79 | + url=reference |
| 80 | + )) |
| 81 | + |
| 82 | + #summary |
| 83 | + summary = get_item(data,"summary") |
| 84 | + |
| 85 | + return AdvisoryData( |
| 86 | + aliases=alias, |
| 87 | + summary=summary, |
| 88 | + affected_packages=affected_packages, |
| 89 | + references=references, |
| 90 | + date_published=parsed_date_published, |
| 91 | + url=self.root_url+'#'+alias |
| 92 | + ) |
| 93 | + |
| 94 | +''' |
| 95 | +The structure is like: |
| 96 | +<h3> CVE |
| 97 | +<dl> |
| 98 | + <dt> |
| 99 | + <dd> |
| 100 | +in <dd> affected packages as <li> |
| 101 | +in <dd> references as <li> <a> |
| 102 | +''' |
| 103 | +def get_adv_data(url): |
| 104 | + try: |
| 105 | + response = fetch_response(url).content |
| 106 | + soup = BeautifulSoup(response,'html.parser') |
| 107 | + except: |
| 108 | + logger.error(f"Failed to fetch URL {url}") |
| 109 | + |
| 110 | + advisories =[] |
| 111 | + |
| 112 | + #all the CVEs are h3 with id="CVE-.." |
| 113 | + for cve_section in soup.find_all("h3"): |
| 114 | + data_output = { |
| 115 | + "date_published" : '', |
| 116 | + "CVE" : '', |
| 117 | + "affected_packages" :[], |
| 118 | + "references" : [], |
| 119 | + "summary" : '', |
| 120 | + "severity" : '' |
| 121 | + } |
| 122 | + |
| 123 | + #CVE is in a link |
| 124 | + data_output["CVE"] = cve_section.find("a").text |
| 125 | + |
| 126 | + #the <dl> tag in this section |
| 127 | + dl = cve_section.find_next_sibling("dl") |
| 128 | + for dt,dd in zip(dl.find_all('dt'),dl.find_all('dd')): #combines both the lists,for better iteration |
| 129 | + key = dt.text |
| 130 | + value = dd.text |
| 131 | + |
| 132 | + #Severity |
| 133 | + if key == "Severity": |
| 134 | + data_output["severity"] =value |
| 135 | + #Published Date |
| 136 | + elif key == "Published at": |
| 137 | + data_output['date_published'] = value |
| 138 | + #Affected Packages |
| 139 | + elif key == "Affected": |
| 140 | + affected_list = [li.text.strip() for li in dd.find_all("li")] |
| 141 | + data_output["affected_packages"] = affected_list |
| 142 | + #references |
| 143 | + elif key == "References": |
| 144 | + references = [a["href"] for a in dd.find_all("a")] |
| 145 | + data_output["references"] = references |
| 146 | + |
| 147 | + #for summary |
| 148 | + for sibling in dl.find_next_siblings(): |
| 149 | + if sibling.name == "h2" or sibling.name == "h3": |
| 150 | + break |
| 151 | + if sibling.name == "p": |
| 152 | + if 'Issue summary:' in sibling.text: |
| 153 | + data_output["summary"] = sibling.text.strip("Issue summary:") |
| 154 | + |
| 155 | + |
| 156 | + #append all the output data to the list |
| 157 | + advisories.append(data_output) |
| 158 | + |
| 159 | + #return the list with all the advisory data |
| 160 | + return advisories |
| 161 | + |
0 commit comments