diff --git a/src/mfcqi/metrics/dependency_security.py b/src/mfcqi/metrics/dependency_security.py index 7bc5e01..e638693 100644 --- a/src/mfcqi/metrics/dependency_security.py +++ b/src/mfcqi/metrics/dependency_security.py @@ -34,6 +34,8 @@ """ import math +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import TimeoutError as FutureTimeoutError from pathlib import Path from typing import Any, Union, cast @@ -61,6 +63,16 @@ class DependencySecurityMetric(Metric): - Average 231 dependencies per application (Veracode 2024) """ + def __init__(self, scan_timeout: float = 30.0): + """Initialize dependency security metric. + + Args: + scan_timeout: Maximum seconds to wait for each dependency file scan. + """ + self.scan_timeout = scan_timeout + self._scan_cache: dict[tuple[tuple[str, int], ...], tuple[float, list[dict[str, str]]]] = {} + self.last_scan_errors: list[dict[str, str]] = [] + def extract(self, codebase: Path) -> float: """ Count weighted vulnerability density across ALL Python dependency formats. @@ -75,7 +87,7 @@ def extract(self, codebase: Path) -> float: Weighted vulnerability count (0.0 = no vulnerabilities) """ analyzer = PipAuditAnalyzer() - self.last_scan_errors: list[dict[str, str]] = [] + self.last_scan_errors = [] # Find ALL Python dependency files (ecosystem-wide) dependency_files: list[Path] = [] @@ -97,21 +109,61 @@ def extract(self, codebase: Path) -> float: if not dependency_files: return 0.0 # No dependencies to scan + cache_key = self._dependency_cache_key(dependency_files) + if cache_key in self._scan_cache: + cached_score, cached_errors = self._scan_cache[cache_key] + self.last_scan_errors = list(cached_errors) + return cached_score + # Scan ALL dependency files using intelligent dispatcher weighted_vuln_count = 0.0 for dep_file in dependency_files: - result = analyzer.scan_dependency_file_with_status(dep_file) - if not result.success: - self.last_scan_errors.append({"file": str(dep_file), "error": result.error}) - vulns = result.vulnerabilities + vulns = self._scan_dependency_file(analyzer, dep_file) # For initial implementation, assign uniform weight per vulnerability # pip-audit doesn't provide severity directly, so we use moderate weight for _vuln in vulns: weighted_vuln_count += 2.0 # MEDIUM severity weight + self._scan_cache[cache_key] = (weighted_vuln_count, list(self.last_scan_errors)) return weighted_vuln_count + def _dependency_cache_key(self, dependency_files: list[Path]) -> tuple[tuple[str, int], ...]: + """Build a cache key from dependency file paths and modification times.""" + key_parts = [] + for dep_file in sorted(dependency_files): + try: + key_parts.append((str(dep_file.resolve()), dep_file.stat().st_mtime_ns)) + except OSError: + key_parts.append((str(dep_file), 0)) + return tuple(key_parts) + + def _scan_dependency_file( + self, analyzer: PipAuditAnalyzer, dep_file: Path + ) -> list[dict[str, Any]]: + """Scan a dependency file with timeout handling.""" + executor = ThreadPoolExecutor(max_workers=1) + future = executor.submit(analyzer.scan_dependency_file_with_status, dep_file) + try: + result = future.result(timeout=self.scan_timeout) + if not result.success: + self.last_scan_errors.append({"file": str(dep_file), "error": result.error}) + return result.vulnerabilities + except FutureTimeoutError: + future.cancel() + self.last_scan_errors.append( + { + "file": str(dep_file), + "error": f"dependency scan timed out after {self.scan_timeout}s", + } + ) + return [] + except Exception as exc: + self.last_scan_errors.append({"file": str(dep_file), "error": str(exc)}) + return [] + finally: + executor.shutdown(wait=False, cancel_futures=True) + def normalize(self, value: Union[float, dict[str, Any]]) -> float: """ Normalize vulnerability count to [0,1] range. diff --git a/tests/test_dependency_security.py b/tests/test_dependency_security.py index c503bc8..8700d77 100644 --- a/tests/test_dependency_security.py +++ b/tests/test_dependency_security.py @@ -125,6 +125,58 @@ def test_scan_failures_are_recorded_separately(): assert "Unsupported dependency file format" in metric.last_scan_errors[0]["error"] +def test_dependency_security_caches_scan_results(monkeypatch): + """Test dependency scans are cached when dependency files do not change.""" + from mfcqi.analysis.tools.pip_audit_analyzer import PipAuditScanResult + from mfcqi.metrics import dependency_security + from mfcqi.metrics.dependency_security import DependencySecurityMetric + + class FakeAnalyzer: + calls = 0 + + def scan_dependency_file_with_status(self, _dep_file): + FakeAnalyzer.calls += 1 + return PipAuditScanResult(vulnerabilities=[]) + + monkeypatch.setattr(dependency_security, "PipAuditAnalyzer", FakeAnalyzer) + + metric = DependencySecurityMetric() + + with tempfile.TemporaryDirectory() as tmpdir: + req_file = Path(tmpdir) / "requirements.txt" + req_file.write_text("requests==2.31.0\n") + + assert metric.extract(Path(tmpdir)) == 0.0 + assert metric.extract(Path(tmpdir)) == 0.0 + assert FakeAnalyzer.calls == 1 + + +def test_dependency_security_records_scan_timeout(monkeypatch): + """Test dependency scans record timeout failures separately.""" + import time + + from mfcqi.analysis.tools.pip_audit_analyzer import PipAuditScanResult + from mfcqi.metrics import dependency_security + from mfcqi.metrics.dependency_security import DependencySecurityMetric + + class SlowAnalyzer: + def scan_dependency_file_with_status(self, _dep_file): + time.sleep(0.05) + return PipAuditScanResult(vulnerabilities=[]) + + monkeypatch.setattr(dependency_security, "PipAuditAnalyzer", SlowAnalyzer) + + metric = DependencySecurityMetric(scan_timeout=0.001) + + with tempfile.TemporaryDirectory() as tmpdir: + req_file = Path(tmpdir) / "requirements.txt" + req_file.write_text("requests==2.31.0\n") + + assert metric.extract(Path(tmpdir)) == 0.0 + assert metric.last_scan_errors + assert "timed out" in metric.last_scan_errors[0]["error"] + + def test_metric_weight(): """Test that weight is 0.75 based on research.""" from mfcqi.metrics.dependency_security import DependencySecurityMetric