Skip to content

Commit ca0b3ae

Browse files
committed
Add AdvisoryV2 models
Signed-off-by: Tushar Goel <tushar.goel.dav@gmail.com>
1 parent 584b077 commit ca0b3ae

File tree

2 files changed

+374
-0
lines changed

2 files changed

+374
-0
lines changed

vulnerabilities/importer.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,54 @@ def from_url(cls, url):
145145
return cls(url=url)
146146

147147

148+
@dataclasses.dataclass(eq=True)
149+
@functools.total_ordering
150+
class ReferenceV2:
151+
reference_id: str = ""
152+
reference_type: str = ""
153+
url: str = ""
154+
155+
def __post_init__(self):
156+
if not self.url:
157+
raise TypeError("Reference must have a url")
158+
if self.reference_id and not isinstance(self.reference_id, str):
159+
self.reference_id = str(self.reference_id)
160+
161+
def __lt__(self, other):
162+
if not isinstance(other, Reference):
163+
return NotImplemented
164+
return self._cmp_key() < other._cmp_key()
165+
166+
# TODO: Add cache
167+
def _cmp_key(self):
168+
return (self.reference_id, self.reference_type, self.url)
169+
170+
def to_dict(self):
171+
"""Return a normalized dictionary representation"""
172+
return {
173+
"reference_id": self.reference_id,
174+
"reference_type": self.reference_type,
175+
"url": self.url
176+
}
177+
178+
@classmethod
179+
def from_dict(cls, ref: dict):
180+
return cls(
181+
reference_id=str(ref["reference_id"]),
182+
reference_type=ref.get("reference_type") or "",
183+
url=ref["url"],
184+
)
185+
186+
@classmethod
187+
def from_url(cls, url):
188+
reference_id = get_reference_id(url)
189+
if "GHSA-" in reference_id.upper():
190+
return cls(reference_id=reference_id, url=url)
191+
if is_cve(reference_id):
192+
return cls(url=url, reference_id=reference_id.upper())
193+
return cls(url=url)
194+
195+
148196
class UnMergeablePackageError(Exception):
149197
"""
150198
Raised when a package cannot be merged with another one.
@@ -352,6 +400,74 @@ def from_dict(cls, advisory_data):
352400
return cls(**transformed)
353401

354402

403+
@dataclasses.dataclass(order=True)
404+
class AdvisoryDataV2:
405+
"""
406+
This data class expresses the contract between data sources and the import runner.
407+
408+
If a vulnerability_id is present then:
409+
summary or affected_packages or references must be present
410+
otherwise
411+
either affected_package or references should be present
412+
413+
date_published must be aware datetime
414+
"""
415+
416+
aliases: List[str] = dataclasses.field(default_factory=list)
417+
summary: Optional[str] = ""
418+
affected_packages: List[AffectedPackage] = dataclasses.field(default_factory=list)
419+
references: List[ReferenceV2] = dataclasses.field(default_factory=list)
420+
date_published: Optional[datetime.datetime] = None
421+
weaknesses: List[int] = dataclasses.field(default_factory=list)
422+
severities: List[VulnerabilitySeverity] = dataclasses.field(default_factory=list)
423+
url: Optional[str] = None
424+
425+
def __post_init__(self):
426+
if self.date_published and not self.date_published.tzinfo:
427+
logger.warning(f"AdvisoryData with no tzinfo: {self!r}")
428+
if self.summary:
429+
self.summary = self.clean_summary(self.summary)
430+
431+
def clean_summary(self, summary):
432+
# https://nvd.nist.gov/vuln/detail/CVE-2013-4314
433+
# https://github.com/cms-dev/cms/issues/888#issuecomment-516977572
434+
summary = summary.strip()
435+
if summary:
436+
summary = summary.replace("\x00", "\uFFFD")
437+
return summary
438+
439+
def to_dict(self):
440+
return {
441+
"aliases": self.aliases,
442+
"summary": self.summary,
443+
"affected_packages": [pkg.to_dict() for pkg in self.affected_packages],
444+
"references": [ref.to_dict() for ref in self.references],
445+
"date_published": self.date_published.isoformat() if self.date_published else None,
446+
"weaknesses": self.weaknesses,
447+
"url": self.url if self.url else "",
448+
}
449+
450+
@classmethod
451+
def from_dict(cls, advisory_data):
452+
date_published = advisory_data["date_published"]
453+
transformed = {
454+
"aliases": advisory_data["aliases"],
455+
"summary": advisory_data["summary"],
456+
"affected_packages": [
457+
AffectedPackage.from_dict(pkg)
458+
for pkg in advisory_data["affected_packages"]
459+
if pkg is not None
460+
],
461+
"references": [Reference.from_dict(ref) for ref in advisory_data["references"]],
462+
"date_published": datetime.datetime.fromisoformat(date_published)
463+
if date_published
464+
else None,
465+
"weaknesses": advisory_data["weaknesses"],
466+
"url": advisory_data.get("url") or None,
467+
}
468+
return cls(**transformed)
469+
470+
355471
class NoLicenseError(Exception):
356472
pass
357473

vulnerabilities/models.py

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1815,3 +1815,261 @@ class CodeFix(CodeChange):
18151815
related_name="code_fix",
18161816
help_text="The fixing package version with this code fix",
18171817
)
1818+
1819+
1820+
class AdvisorySeverity(models.Model):
1821+
url = models.URLField(
1822+
max_length=1024,
1823+
null=True,
1824+
help_text="URL to the vulnerability severity",
1825+
db_index=True,
1826+
)
1827+
1828+
scoring_system_choices = tuple(
1829+
(system.identifier, system.name) for system in SCORING_SYSTEMS.values()
1830+
)
1831+
1832+
scoring_system = models.CharField(
1833+
max_length=50,
1834+
choices=scoring_system_choices,
1835+
help_text="Identifier for the scoring system used. Available choices are: {} ".format(
1836+
",\n".join(f"{sid}: {sname}" for sid, sname in scoring_system_choices)
1837+
),
1838+
)
1839+
1840+
value = models.CharField(max_length=50, help_text="Example: 9.0, Important, High")
1841+
1842+
scoring_elements = models.CharField(
1843+
max_length=150,
1844+
null=True,
1845+
help_text="Supporting scoring elements used to compute the score values. "
1846+
"For example a CVSS vector string as used to compute a CVSS score.",
1847+
)
1848+
1849+
published_at = models.DateTimeField(
1850+
blank=True, null=True, help_text="UTC Date of publication of the vulnerability severity"
1851+
)
1852+
1853+
objects = BaseQuerySet.as_manager()
1854+
1855+
class Meta:
1856+
ordering = ["url", "scoring_system", "value"]
1857+
1858+
1859+
class AdvisoryWeakness(models.Model):
1860+
"""
1861+
A weakness is a software weakness that is associated with a vulnerability.
1862+
"""
1863+
1864+
cwe_id = models.IntegerField(help_text="CWE id")
1865+
vulnerabilities = models.ManyToManyField(Vulnerability, related_name="weaknesses")
1866+
1867+
cwe_by_id = {}
1868+
1869+
def get_cwe(self, cwe_id):
1870+
if not self.cwe_by_id:
1871+
db = Database()
1872+
for weakness in db.get_cwes():
1873+
self.cwe_by_id[str(weakness.cwe_id)] = weakness
1874+
return self.cwe_by_id[cwe_id]
1875+
1876+
@property
1877+
def cwe(self):
1878+
return f"CWE-{self.cwe_id}"
1879+
1880+
@property
1881+
def weakness(self):
1882+
"""
1883+
Return a queryset of Weakness for this vulnerability.
1884+
"""
1885+
try:
1886+
weakness = self.get_cwe(str(self.cwe_id))
1887+
return weakness
1888+
except Exception as e:
1889+
logger.warning(f"Could not find CWE {self.cwe_id}: {e}")
1890+
1891+
@property
1892+
def name(self):
1893+
"""Return the weakness's name."""
1894+
return self.weakness.name if self.weakness else ""
1895+
1896+
@property
1897+
def description(self):
1898+
"""Return the weakness's description."""
1899+
return self.weakness.description if self.weakness else ""
1900+
1901+
def to_dict(self):
1902+
return {"cwe_id": self.cwe_id, "name": self.name, "description": self.description}
1903+
1904+
1905+
class AdvisoryReference(models.Model):
1906+
url = models.URLField(
1907+
max_length=1024,
1908+
help_text="URL to the vulnerability reference",
1909+
unique=True,
1910+
)
1911+
1912+
ADVISORY = "advisory"
1913+
EXPLOIT = "exploit"
1914+
MAILING_LIST = "mailing_list"
1915+
BUG = "bug"
1916+
OTHER = "other"
1917+
1918+
REFERENCE_TYPES = [
1919+
(ADVISORY, "Advisory"),
1920+
(EXPLOIT, "Exploit"),
1921+
(MAILING_LIST, "Mailing List"),
1922+
(BUG, "Bug"),
1923+
(OTHER, "Other"),
1924+
]
1925+
1926+
reference_type = models.CharField(max_length=20, choices=REFERENCE_TYPES, blank=True)
1927+
1928+
reference_id = models.CharField(
1929+
max_length=200,
1930+
help_text="An optional reference ID, such as DSA-4465-1 when available",
1931+
blank=True,
1932+
db_index=True,
1933+
)
1934+
1935+
class Meta:
1936+
ordering = ["reference_id", "url", "reference_type"]
1937+
1938+
def __str__(self):
1939+
reference_id = f" {self.reference_id}" if self.reference_id else ""
1940+
return f"{self.url}{reference_id}"
1941+
1942+
@property
1943+
def is_cpe(self):
1944+
"""
1945+
Return True if this is a CPE reference.
1946+
"""
1947+
return self.reference_id.startswith("cpe")
1948+
1949+
1950+
class AdvisoryAlias(models.Model):
1951+
alias = models.CharField(
1952+
max_length=50,
1953+
unique=True,
1954+
blank=False,
1955+
null=False,
1956+
help_text="An alias is a unique vulnerability identifier in some database, "
1957+
"such as CVE-2020-2233",
1958+
)
1959+
1960+
class Meta:
1961+
ordering = ["alias"]
1962+
1963+
def __str__(self):
1964+
return self.alias
1965+
1966+
@cached_property
1967+
def url(self):
1968+
"""
1969+
Create a URL for the alias.
1970+
"""
1971+
alias: str = self.alias
1972+
if alias.startswith("CVE"):
1973+
return f"https://nvd.nist.gov/vuln/detail/{alias}"
1974+
1975+
if alias.startswith("GHSA"):
1976+
return f"https://github.com/advisories/{alias}"
1977+
1978+
if alias.startswith("NPM-"):
1979+
id = alias.lstrip("NPM-")
1980+
return f"https://github.com/nodejs/security-wg/blob/main/vuln/npm/{id}.json"
1981+
1982+
1983+
class AdvisoryV2(models.Model):
1984+
"""
1985+
An advisory represents data directly obtained from upstream transformed
1986+
into structured data
1987+
"""
1988+
1989+
advisory_id = models.CharField(
1990+
max_length=50,
1991+
blank=False,
1992+
null=False,
1993+
unique=False,
1994+
help_text="An advisory is a unique vulnerability identifier in some database, "
1995+
"such as CVE-2020-2233",
1996+
)
1997+
1998+
unique_content_id = models.CharField(
1999+
max_length=64,
2000+
blank=False,
2001+
null=False,
2002+
unique=True,
2003+
help_text="A 64 character unique identifier for the content of the advisory since we use sha256 as hex",
2004+
)
2005+
summary = models.TextField(
2006+
blank=True,
2007+
)
2008+
aliases = models.ManyToManyField(
2009+
AdvisoryAlias,
2010+
related_name="advisories",
2011+
help_text="A list of serializable Alias objects",
2012+
)
2013+
affected_packages = models.JSONField(
2014+
blank=True, default=list, help_text="A list of serializable AffectedPackage objects"
2015+
)
2016+
references = models.ManyToManyField(
2017+
AdvisoryReference,
2018+
related_name="advisories",
2019+
help_text="A list of serializable Reference objects",
2020+
)
2021+
severities = models.ManyToManyField(
2022+
AdvisorySeverity,
2023+
related_name="advisories",
2024+
help_text="A list of vulnerability severities associated with this advisory.",
2025+
)
2026+
weaknesses = models.ManyToManyField(
2027+
AdvisoryWeakness,
2028+
related_name="advisories",
2029+
help_text="A list of software weaknesses associated with this advisory.",
2030+
)
2031+
date_published = models.DateTimeField(
2032+
blank=True, null=True, help_text="UTC Date of publication of the advisory"
2033+
)
2034+
date_collected = models.DateTimeField(help_text="UTC Date on which the advisory was collected")
2035+
date_imported = models.DateTimeField(
2036+
blank=True, null=True, help_text="UTC Date on which the advisory was imported"
2037+
)
2038+
created_by = models.CharField(
2039+
max_length=100,
2040+
help_text="Fully qualified name of the importer prefixed with the"
2041+
"module name importing the advisory. Eg:"
2042+
"vulnerabilities.pipeline.nginx_importer.NginxImporterPipeline",
2043+
)
2044+
url = models.URLField(
2045+
blank=False,
2046+
null=False,
2047+
help_text="Link to the advisory on the upstream website",
2048+
)
2049+
2050+
objects = AdvisoryQuerySet.as_manager()
2051+
2052+
class Meta:
2053+
ordering = ["date_published", "unique_content_id"]
2054+
2055+
def save(self, *args, **kwargs):
2056+
self.full_clean()
2057+
return super().save(*args, **kwargs)
2058+
2059+
def to_advisory_data(self) -> "AdvisoryDataV2":
2060+
from vulnerabilities.importer import AdvisoryDataV2
2061+
from vulnerabilities.importer import AffectedPackage
2062+
from vulnerabilities.importer import ReferenceV2
2063+
2064+
return AdvisoryDataV2(
2065+
aliases=[item.alias for item in self.aliases.all()],
2066+
summary=self.summary,
2067+
affected_packages=[
2068+
AffectedPackage.from_dict(pkg) for pkg in self.affected_packages if pkg
2069+
],
2070+
references=[ReferenceV2.from_dict(ref) for ref in self.references],
2071+
date_published=self.date_published,
2072+
weaknesses=self.weaknesses,
2073+
severities=self.severities,
2074+
url=self.url,
2075+
)

0 commit comments

Comments
 (0)