Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions src/mfcqi/metrics/dependency_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
"""

import math
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import TimeoutError as FutureTimeoutError
from pathlib import Path
from typing import Any, Union, cast

Expand Down Expand Up @@ -61,6 +63,16 @@ class DependencySecurityMetric(Metric):
- Average 231 dependencies per application (Veracode 2024)
"""

def __init__(self, scan_timeout: float = 30.0):
"""Initialize dependency security metric.

Args:
scan_timeout: Maximum seconds to wait for each dependency file scan.
"""
self.scan_timeout = scan_timeout
self._scan_cache: dict[tuple[tuple[str, int], ...], tuple[float, list[dict[str, str]]]] = {}
self.last_scan_errors: list[dict[str, str]] = []

def extract(self, codebase: Path) -> float:
"""
Count weighted vulnerability density across ALL Python dependency formats.
Expand All @@ -75,7 +87,7 @@ def extract(self, codebase: Path) -> float:
Weighted vulnerability count (0.0 = no vulnerabilities)
"""
analyzer = PipAuditAnalyzer()
self.last_scan_errors: list[dict[str, str]] = []
self.last_scan_errors = []

# Find ALL Python dependency files (ecosystem-wide)
dependency_files: list[Path] = []
Expand All @@ -97,21 +109,61 @@ def extract(self, codebase: Path) -> float:
if not dependency_files:
return 0.0 # No dependencies to scan

cache_key = self._dependency_cache_key(dependency_files)
if cache_key in self._scan_cache:
cached_score, cached_errors = self._scan_cache[cache_key]
self.last_scan_errors = list(cached_errors)
return cached_score

# Scan ALL dependency files using intelligent dispatcher
weighted_vuln_count = 0.0
for dep_file in dependency_files:
result = analyzer.scan_dependency_file_with_status(dep_file)
if not result.success:
self.last_scan_errors.append({"file": str(dep_file), "error": result.error})
vulns = result.vulnerabilities
vulns = self._scan_dependency_file(analyzer, dep_file)

# For initial implementation, assign uniform weight per vulnerability
# pip-audit doesn't provide severity directly, so we use moderate weight
for _vuln in vulns:
weighted_vuln_count += 2.0 # MEDIUM severity weight

self._scan_cache[cache_key] = (weighted_vuln_count, list(self.last_scan_errors))
return weighted_vuln_count

def _dependency_cache_key(self, dependency_files: list[Path]) -> tuple[tuple[str, int], ...]:
"""Build a cache key from dependency file paths and modification times."""
key_parts = []
for dep_file in sorted(dependency_files):
try:
key_parts.append((str(dep_file.resolve()), dep_file.stat().st_mtime_ns))
except OSError:
key_parts.append((str(dep_file), 0))
return tuple(key_parts)

def _scan_dependency_file(
self, analyzer: PipAuditAnalyzer, dep_file: Path
) -> list[dict[str, Any]]:
"""Scan a dependency file with timeout handling."""
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(analyzer.scan_dependency_file_with_status, dep_file)
try:
result = future.result(timeout=self.scan_timeout)
if not result.success:
self.last_scan_errors.append({"file": str(dep_file), "error": result.error})
return result.vulnerabilities
except FutureTimeoutError:
future.cancel()
self.last_scan_errors.append(
{
"file": str(dep_file),
"error": f"dependency scan timed out after {self.scan_timeout}s",
}
)
return []
except Exception as exc:
self.last_scan_errors.append({"file": str(dep_file), "error": str(exc)})
return []
finally:
executor.shutdown(wait=False, cancel_futures=True)

def normalize(self, value: Union[float, dict[str, Any]]) -> float:
"""
Normalize vulnerability count to [0,1] range.
Expand Down
52 changes: 52 additions & 0 deletions tests/test_dependency_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,58 @@ def test_scan_failures_are_recorded_separately():
assert "Unsupported dependency file format" in metric.last_scan_errors[0]["error"]


def test_dependency_security_caches_scan_results(monkeypatch):
"""Test dependency scans are cached when dependency files do not change."""
from mfcqi.analysis.tools.pip_audit_analyzer import PipAuditScanResult
from mfcqi.metrics import dependency_security
from mfcqi.metrics.dependency_security import DependencySecurityMetric

class FakeAnalyzer:
calls = 0

def scan_dependency_file_with_status(self, _dep_file):
FakeAnalyzer.calls += 1
return PipAuditScanResult(vulnerabilities=[])

monkeypatch.setattr(dependency_security, "PipAuditAnalyzer", FakeAnalyzer)

metric = DependencySecurityMetric()

with tempfile.TemporaryDirectory() as tmpdir:
req_file = Path(tmpdir) / "requirements.txt"
req_file.write_text("requests==2.31.0\n")

assert metric.extract(Path(tmpdir)) == 0.0
assert metric.extract(Path(tmpdir)) == 0.0
assert FakeAnalyzer.calls == 1


def test_dependency_security_records_scan_timeout(monkeypatch):
"""Test dependency scans record timeout failures separately."""
import time

from mfcqi.analysis.tools.pip_audit_analyzer import PipAuditScanResult
from mfcqi.metrics import dependency_security
from mfcqi.metrics.dependency_security import DependencySecurityMetric

class SlowAnalyzer:
def scan_dependency_file_with_status(self, _dep_file):
time.sleep(0.05)
return PipAuditScanResult(vulnerabilities=[])

monkeypatch.setattr(dependency_security, "PipAuditAnalyzer", SlowAnalyzer)

metric = DependencySecurityMetric(scan_timeout=0.001)

with tempfile.TemporaryDirectory() as tmpdir:
req_file = Path(tmpdir) / "requirements.txt"
req_file.write_text("requests==2.31.0\n")

assert metric.extract(Path(tmpdir)) == 0.0
assert metric.last_scan_errors
assert "timed out" in metric.last_scan_errors[0]["error"]


def test_metric_weight():
"""Test that weight is 0.75 based on research."""
from mfcqi.metrics.dependency_security import DependencySecurityMetric
Expand Down
Loading