Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 45 additions & 10 deletions src/mfcqi/analysis/tools/pip_audit_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,20 @@
will gracefully degrade and return empty vulnerability lists.
"""

from dataclasses import dataclass
from pathlib import Path
from typing import Any


@dataclass
class PipAuditScanResult:
"""Structured result for a pip-audit dependency scan."""

vulnerabilities: list[dict[str, Any]]
success: bool = True
error: str = ""


class PipAuditAnalyzer:
"""Analyzer for scanning Python dependencies for known vulnerabilities.

Expand Down Expand Up @@ -68,9 +78,15 @@ def scan_requirements(self, requirements_file: Path) -> list[dict[str, Any]]:
List of vulnerability dictionaries with package info.
Returns empty list if pip-audit not available.
"""
return self.scan_requirements_with_status(requirements_file).vulnerabilities

def scan_requirements_with_status(self, requirements_file: Path) -> PipAuditScanResult:
"""Scan a requirements file and report scanner status separately."""
# Check if pip-audit is available
if not self._available:
return []
return PipAuditScanResult(
vulnerabilities=[], success=False, error="pip-audit is not available"
)

try:
# Lazy import here too for the RequirementSource
Expand Down Expand Up @@ -109,11 +125,10 @@ def scan_requirements(self, requirements_file: Path) -> list[dict[str, Any]]:
}
)

return vulnerabilities
return PipAuditScanResult(vulnerabilities=vulnerabilities)

except Exception:
# Return empty list on error (graceful degradation)
return []
except Exception as exc:
return PipAuditScanResult(vulnerabilities=[], success=False, error=str(exc))

def scan_pyproject(self, pyproject_file: Path) -> list[dict[str, Any]]:
"""
Expand All @@ -128,9 +143,15 @@ def scan_pyproject(self, pyproject_file: Path) -> list[dict[str, Any]]:
List of vulnerability dictionaries with package info.
Returns empty list if pip-audit not available.
"""
return self.scan_pyproject_with_status(pyproject_file).vulnerabilities

def scan_pyproject_with_status(self, pyproject_file: Path) -> PipAuditScanResult:
"""Scan a pyproject.toml file and report scanner status separately."""
# Check if pip-audit is available
if not self._available:
return []
return PipAuditScanResult(
vulnerabilities=[], success=False, error="pip-audit is not available"
)

try:
# Lazy import here too for the PyProjectSource
Expand Down Expand Up @@ -169,11 +190,10 @@ def scan_pyproject(self, pyproject_file: Path) -> list[dict[str, Any]]:
}
)

return vulnerabilities
return PipAuditScanResult(vulnerabilities=vulnerabilities)

except Exception:
# Return empty list on error (graceful degradation)
return []
except Exception as exc:
return PipAuditScanResult(vulnerabilities=[], success=False, error=str(exc))

def scan_dependency_file(self, dep_file: Path) -> list[dict[str, Any]]:
"""
Expand Down Expand Up @@ -202,3 +222,18 @@ def scan_dependency_file(self, dep_file: Path) -> list[dict[str, Any]]:
# setup.py, setup.cfg, Pipfile not yet supported
# Would require additional dependency source implementations
return []

def scan_dependency_file_with_status(self, dep_file: Path) -> PipAuditScanResult:
"""Scan any supported dependency file and report scanner status separately."""
filename = dep_file.name

if filename == "pyproject.toml":
return self.scan_pyproject_with_status(dep_file)
elif filename.startswith("requirements") and filename.endswith(".txt"):
return self.scan_requirements_with_status(dep_file)
else:
return PipAuditScanResult(
vulnerabilities=[],
success=False,
error=f"Unsupported dependency file format: {filename}",
)
6 changes: 5 additions & 1 deletion src/mfcqi/metrics/dependency_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def extract(self, codebase: Path) -> float:
Weighted vulnerability count (0.0 = no vulnerabilities)
"""
analyzer = PipAuditAnalyzer()
self.last_scan_errors: list[dict[str, str]] = []

# Find ALL Python dependency files (ecosystem-wide)
dependency_files: list[Path] = []
Expand All @@ -99,7 +100,10 @@ def extract(self, codebase: Path) -> float:
# Scan ALL dependency files using intelligent dispatcher
weighted_vuln_count = 0.0
for dep_file in dependency_files:
vulns = analyzer.scan_dependency_file(dep_file)
result = analyzer.scan_dependency_file_with_status(dep_file)
if not result.success:
self.last_scan_errors.append({"file": str(dep_file), "error": result.error})
vulns = result.vulnerabilities

# For initial implementation, assign uniform weight per vulnerability
# pip-audit doesn't provide severity directly, so we use moderate weight
Expand Down
18 changes: 18 additions & 0 deletions tests/test_dependency_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,24 @@ def test_multiple_vulnerabilities_weighted():
assert metric.normalize(20.0) < 0.05 # Critical vulns


def test_scan_failures_are_recorded_separately():
"""Test dependency scanner failures are distinguishable from vulnerabilities."""
from mfcqi.metrics.dependency_security import DependencySecurityMetric

metric = DependencySecurityMetric()

with tempfile.TemporaryDirectory() as tmpdir:
setup_file = Path(tmpdir) / "setup.py"
setup_file.write_text("from setuptools import setup\nsetup(name='example')\n")

raw_score = metric.extract(Path(tmpdir))

assert raw_score == 0.0
assert metric.last_scan_errors
assert metric.last_scan_errors[0]["file"] == str(setup_file)
assert "Unsupported dependency file format" in metric.last_scan_errors[0]["error"]


def test_metric_weight():
"""Test that weight is 0.75 based on research."""
from mfcqi.metrics.dependency_security import DependencySecurityMetric
Expand Down
24 changes: 24 additions & 0 deletions tests/test_pip_audit_api_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,27 @@ def test_pip_audit_detects_vulnerabilities():
assert "version" in vuln
assert "vulnerability_id" in vuln
assert vuln["package"] == "requests"


def test_pip_audit_structured_scan_reports_failures(monkeypatch):
"""Structured scans should distinguish scanner failure from clean results."""
from mfcqi.analysis.tools.pip_audit_analyzer import PipAuditAnalyzer

with tempfile.TemporaryDirectory() as tmpdir:
req_file = Path(tmpdir) / "requirements.txt"
req_file.write_text("requests==2.31.0")

analyzer = PipAuditAnalyzer()
if analyzer.auditor is None:
return

def raise_scan_error(_source):
raise RuntimeError("audit service unavailable")

monkeypatch.setattr(analyzer.auditor, "audit", raise_scan_error)

result = analyzer.scan_requirements_with_status(req_file)

assert result.success is False
assert result.vulnerabilities == []
assert "audit service unavailable" in result.error
Loading