diff --git a/README.md b/README.md index 0920684..986bb0b 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ ## What is DockSec? -DockSec is an **OWASP Lab Project** that bridges the gap between complex security scan results and actionable developer fixes. It integrates industry-standard scanners (Trivy, Hadolint, Docker Scout) with advanced AI to provide **context-aware security analysis**. +DockSec is an **OWASP Lab Project** that bridges the gap between complex security scan results and actionable developer fixes. It integrates industry-standard scanners (Trivy, Grype, Hadolint, Docker Scout) with advanced AI to provide **context-aware security analysis**. Instead of overwhelming you with a list of 200+ CVEs, DockSec: @@ -36,6 +36,7 @@ Instead of overwhelming you with a list of 200+ CVEs, DockSec: - **Explains** vulnerabilities in plain English, not just security jargon. - **Suggests** specific, line-by-line fixes for your Dockerfile. - **Generates** professional, interactive security reports for your team. +- **Cross-validates** findings across multiple scanners so you catch what one scanner misses. Think of it as having a security expert sitting right next to you, reviewing your Dockerfiles in real-time. @@ -49,10 +50,10 @@ Think of it as having a security expert sitting right next to you, reviewing you DockSec follows a robust four-stage pipeline: -1. **Scan**: Runs Trivy, Hadolint, and Docker Scout locally on your environment. -2. **Analyze**: AI correlates findings across all scanners to remove noise and assess real-world impact. +1. **Scan**: Runs Trivy and/or Grype for CVE detection, Hadolint for Dockerfile linting, and Docker Scout for base-image analysis — all locally on your environment. +2. **Analyze**: AI correlates findings across all scanners to remove noise and assess real-world impact. When running both Trivy and Grype, results are automatically deduplicated and cross-validated. 3. **Recommend**: Generates human-readable explanations and specific remediation steps. -4. **Report**: Exports actionable results in JSON, PDF, HTML, or Markdown formats. +4. **Report**: Exports actionable results in JSON, PDF, HTML, or CSV formats — each report includes a **Scanner Coverage** section showing exactly which scanner(s) flagged each CVE. --- @@ -87,6 +88,9 @@ Integrate DockSec into your GitHub Actions workflow: # Install DockSec pip install docksec +# Install all required external tools (Trivy, Hadolint, Grype) +docksec-setup + # Scan a Dockerfile (AI-powered) # Reports will be saved to ~/.docksec/results/ docksec Dockerfile @@ -104,18 +108,52 @@ docksec --image-only -i myapp:latest docksec Dockerfile --scan-only ``` +### Choosing a Vulnerability Scanner + +DockSec supports three scanner modes via the `--scanner` flag: + +```bash +# Default: use Trivy only (fast, widely adopted) +docksec --image-only -i myapp:latest --scanner trivy + +# Use Grype only (Anchore's scanner, often finds additional CVEs) +docksec --image-only -i myapp:latest --scanner grype + +# Use both scanners and deduplicate results (maximum coverage) +docksec --image-only -i myapp:latest --scanner all + +# Works with full scans too +docksec Dockerfile -i myapp:latest --scanner all + +# Works with Docker Compose +docksec --compose docker-compose.yml --scanner all +``` + +You can also set the default scanner via an environment variable (useful in CI/CD): + +```bash +# Set a persistent default — no need to pass --scanner on every command +export DOCKSEC_SCANNER=all +docksec --image-only -i myapp:latest +``` + +> **Why use `--scanner all`?** +> Trivy and Grype use different vulnerability databases and detection methods. In practice they each find CVEs the other misses. Running both and deduplicating gives you the highest confidence results — CVEs flagged by both scanners are shown with a **"Both"** badge in reports, making them the highest-priority findings to fix. + --- ## Features - **Smart Analysis**: AI explains what vulnerabilities mean for *your* specific setup. +- **Dual Vulnerability Scanner**: Choose Trivy, Grype, or run **both at once** (`--scanner all`) for maximum CVE coverage with automatic deduplication. +- **Scanner Coverage Reports**: Every report includes a breakdown of which scanner(s) found each CVE — CVEs confirmed by both scanners are highlighted as highest priority. - **Multi-LLM Support**: Use OpenAI, Anthropic Claude (4.x), Google Gemini (1.5+), or local models via Ollama. - **Docker Compose Scanning**: Detect orchestration-level misconfigurations and scan all services in a compose file. -- **Deep Integration**: Combines Trivy (vulnerabilities), Hadolint (linting), and Docker Scout. +- **Deep Integration**: Combines Trivy, Grype, Hadolint (linting), and Docker Scout. - **Security Scoring**: Get a 0-100 score to track your security posture over time. - **Centralized Reporting**: All reports are neatly organized in `~/.docksec/results/` by default. -- **Rich Formats**: Professional exports in HTML (interactive), PDF, JSON, and CSV. -- **CI/CD Ready**: Designed for easy integration into GitHub Actions and build pipelines. +- **Rich Formats**: Professional exports in HTML (interactive, with scanner badges), PDF, JSON, and CSV. +- **CI/CD Ready**: Designed for easy integration into GitHub Actions and build pipelines. Set `DOCKSEC_SCANNER=all` in your environment for maximum coverage with no code changes. - **GitHub Action**: Available on the GitHub Marketplace for automated security scans. --- @@ -129,15 +167,16 @@ Here is a comparison of how DockSec relates to other container security tools. | License and cost | Free, open source (MIT) | Free, open source (Apache 2.0) | Commercial (limited free tier) | Commercial (limited free tier) | | Governance | OWASP Lab Project, vendor neutral | Open source, maintained by Aqua | Single vendor | Single vendor | | Detects CVEs and Dockerfile misconfigurations | Yes | Yes | Yes | Yes | +| Dual scanner (Trivy + Grype) with deduplication | Yes (`--scanner all`) | No | No | No | | Contextual, line level Dockerfile remediation | Yes (line specific rewrites with explanation) | No (detection only) | Yes (base image upgrade advice, fix PRs) | Yes (AI AutoFix PRs) | | Runs fully offline / air gapped | Yes (local LLM via Ollama, scan only mode, no API key) | Yes for scanning (no remediation layer) | No (cloud platform) | No (hosted platform) | | Your image data stays on your network | Yes | Yes | No | No | | Bring your own LLM / model choice | Yes (OpenAI, Anthropic, Gemini, or local Ollama) | Not applicable | No (proprietary AI) | No (proprietary AI) | | Self hostable, no platform deployment | Yes | Yes | No | No | | Vendor lock in | None | None | Yes | Yes | -| Security score (0 to 100) and multi format reports (HTML, PDF, JSON, CSV, Markdown) | Yes | Partial (machine formats, no remediation report) | Partial (dashboard reports) | Partial (dashboard reports) | +| Security score (0 to 100) and multi format reports (HTML, PDF, JSON, CSV) | Yes | Partial (machine formats, no remediation report) | Partial (dashboard reports) | Partial (dashboard reports) | -DockSec is the only one of these that pairs contextual, line level Dockerfile remediation with a fully open source, OWASP governed, locally runnable design. Snyk and Aikido offer capable AI remediation, but only as commercial cloud platforms that send your data to their service. Trivy is open source and local but stops at detection and does not help you fix anything. DockSec fills the gap for developers and for regulated or air gapped teams who need both the fix guidance and full control of their data, at no cost. +DockSec is the only one of these that pairs contextual, line level Dockerfile remediation with a fully open source, OWASP governed, locally runnable design. Snyk and Aikido offer capable AI remediation, but only as commercial cloud platforms that send your data to their service. Trivy is open source and local but stops at detection and does not help you fix anything. DockSec fills the gap for developers and for regulated or air gapped teams who need both the fix guidance and full control of their data, at no cost. With `--scanner all`, DockSec runs both Trivy and Grype and automatically deduplicates the results — giving you broader CVE coverage than either scanner alone, without duplicates or extra noise. --- diff --git a/docksec/cli.py b/docksec/cli.py index 5790873..6fd08cf 100644 --- a/docksec/cli.py +++ b/docksec/cli.py @@ -1,8 +1,8 @@ #!/usr/bin/env python3 -import sys -import os import argparse +import os +import sys def get_version() -> str: @@ -15,14 +15,18 @@ def get_version() -> str: """ try: from importlib.metadata import version + return version("docksec") except Exception: # package not installed; fall through to source fallback pass try: import re - setup_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'setup.py') - with open(setup_path, 'r') as f: + + setup_path = os.path.join( + os.path.dirname(os.path.dirname(__file__)), "setup.py" + ) + with open(setup_path, "r") as f: match = re.search(r'version\s*=\s*["\']([^"\']+)["\']', f.read()) if match: return match.group(1) @@ -31,6 +35,7 @@ def get_version() -> str: return "unknown" + def main() -> None: """ Main entry point for the DockSec CLI tool. @@ -38,91 +43,169 @@ def main() -> None: """ # Set CLI mode to suppress INFO logs for user-facing output os.environ["DOCKSEC_CLI_MODE"] = "true" - + from docksec.enums import LLMProvider - parser = argparse.ArgumentParser(description='Docker Security Analysis Tool') - parser.add_argument('dockerfile', nargs='?', help='Path to the Dockerfile to analyze (optional when using --image-only or --compose)') - parser.add_argument('-i', '--image', help='Docker image name to scan') - parser.add_argument('-c', '--compose', nargs='?', const='auto', help='Path to docker-compose file to scan. If no path is provided, auto-detects in current directory.') - parser.add_argument('-o', '--output', help='Output file for the report (default: security_report.txt)') - parser.add_argument('--ai-only', action='store_true', help='Run only AI-based recommendations (requires Dockerfile)') - parser.add_argument('--scan-only', action='store_true', help='Run only Dockerfile/image scanning (requires --image)') - parser.add_argument('--image-only', action='store_true', help='Scan only the Docker image without Dockerfile analysis') - parser.add_argument('--provider', choices=LLMProvider.values(), - help='LLM provider to use (default: openai, can also set LLM_PROVIDER env var)') - parser.add_argument('--model', help='Model name to use (e.g., gpt-4o, claude-haiku-4-5, gemini-1.5-pro, llama3.1)') - parser.add_argument('--compact-output', action='store_true', help='Use compact output format (less verbose)') - parser.add_argument('--skip-ai-scoring', action='store_true', help='Skip AI-based security scoring (use local scoring only)') - parser.add_argument('--version', action='version', version=f'DockSec {get_version()}') - + + parser = argparse.ArgumentParser(description="Docker Security Analysis Tool") + parser.add_argument( + "dockerfile", + nargs="?", + help="Path to the Dockerfile to analyze (optional when using --image-only or --compose)", + ) + parser.add_argument("-i", "--image", help="Docker image name to scan") + parser.add_argument( + "-c", + "--compose", + nargs="?", + const="auto", + help="Path to docker-compose file to scan. If no path is provided, auto-detects in current directory.", + ) + parser.add_argument( + "-o", + "--output", + help="Output file for the report (default: security_report.txt)", + ) + parser.add_argument( + "--ai-only", + action="store_true", + help="Run only AI-based recommendations (requires Dockerfile)", + ) + parser.add_argument( + "--scan-only", + action="store_true", + help="Run only Dockerfile/image scanning (requires --image)", + ) + parser.add_argument( + "--image-only", + action="store_true", + help="Scan only the Docker image without Dockerfile analysis", + ) + parser.add_argument( + "--provider", + choices=LLMProvider.values(), + help="LLM provider to use (default: openai, can also set LLM_PROVIDER env var)", + ) + parser.add_argument( + "--model", + help="Model name to use (e.g., gpt-4o, claude-haiku-4-5, gemini-1.5-pro, llama3.1)", + ) + parser.add_argument( + "--compact-output", + action="store_true", + help="Use compact output format (less verbose)", + ) + parser.add_argument( + "--skip-ai-scoring", + action="store_true", + help="Skip AI-based security scoring (use local scoring only)", + ) + parser.add_argument( + "--scanner", + choices=["trivy", "grype", "all"], + default=None, + help="Vulnerability scanner to use: trivy (default), grype, or all (both, deduplicated). " + "Can also be set via DOCKSEC_SCANNER environment variable.", + ) + parser.add_argument( + "--version", action="version", version=f"DockSec {get_version()}" + ) + args = parser.parse_args() - + + # Resolve --scanner: CLI flag > DOCKSEC_SCANNER env var > default "trivy" + if args.scanner is None: + env_scanner = os.environ.get("DOCKSEC_SCANNER", "trivy").lower() + args.scanner = ( + env_scanner if env_scanner in ("trivy", "grype", "all") else "trivy" + ) + # Set provider and model from CLI args if provided (overrides env vars) if args.provider: os.environ["LLM_PROVIDER"] = args.provider if args.model: os.environ["LLM_MODEL"] = args.model - + # Set compact output mode if requested if args.compact_output: os.environ["DOCKSEC_COMPACT_OUTPUT"] = "true" - + # Validate argument combinations if args.image_only and args.ai_only: - print("Error: --image-only and --ai-only cannot be used together (AI analysis requires Dockerfile)") + print( + "Error: --image-only and --ai-only cannot be used together (AI analysis requires Dockerfile)" + ) sys.exit(1) - + if args.image_only and args.scan_only: - print("Error: --image-only and --scan-only cannot be used together (use --image-only for image-only scanning)") + print( + "Error: --image-only and --scan-only cannot be used together (use --image-only for image-only scanning)" + ) sys.exit(1) - + # Validate Dockerfile requirement if not args.image_only and not args.compose and not args.dockerfile: - print("Error: Dockerfile path is required unless using --image-only or --compose") + print( + "Error: Dockerfile path is required unless using --image-only or --compose" + ) print("Usage examples:") - print(" docksec Dockerfile -i myapp:latest # Analyze both Dockerfile and image") + print( + " docksec Dockerfile -i myapp:latest # Analyze both Dockerfile and image" + ) print(" docksec --image-only -i myapp:latest # Scan only the image") - print(" docksec --compose docker-compose.yml # Scan compose file and its services") + print( + " docksec --compose docker-compose.yml # Scan compose file and its services" + ) print(" docksec --ai-only Dockerfile # AI analysis only") sys.exit(1) - + # Validate that the Dockerfile exists (if provided) if args.dockerfile and not os.path.isfile(args.dockerfile): print(f"Error: Dockerfile not found at {args.dockerfile}") sys.exit(1) - + # Validate image requirement for image-based operations if args.image_only and not args.image: - print("Error: Image name is required for image-only scanning. Use -i/--image to specify the Docker image.") + print( + "Error: Image name is required for image-only scanning. Use -i/--image to specify the Docker image." + ) print("Example: docksec --image-only -i myapp:latest") sys.exit(1) - + # In scan-only mode, if no image is provided, we'll only run Dockerfile analysis if args.scan_only and not args.image: - print("[INFO] No image provided for scan-only mode. Running Dockerfile analysis only.") - + print( + "[INFO] No image provided for scan-only mode. Running Dockerfile analysis only." + ) + # Determine which tools to run if args.compose: run_ai = not args.scan_only run_scan = True run_compose_analysis = True mode_desc = "Compose Analysis" - + # Auto-detect compose file if needed compose_path = args.compose - if compose_path == 'auto': - for name in ['docker-compose.yml', 'docker-compose.yaml', 'compose.yml', 'compose.yaml']: + if compose_path == "auto": + for name in [ + "docker-compose.yml", + "docker-compose.yaml", + "compose.yml", + "compose.yaml", + ]: if os.path.isfile(name): compose_path = name break - if compose_path == 'auto': - print("Error: Could not auto-detect a docker-compose file in the current directory.") + if compose_path == "auto": + print( + "Error: Could not auto-detect a docker-compose file in the current directory." + ) sys.exit(1) - + if not os.path.isfile(compose_path): print(f"Error: Compose file not found at {compose_path}") sys.exit(1) - + args.compose = compose_path elif args.image_only: run_ai = False @@ -145,48 +228,63 @@ def main() -> None: run_scan = bool(args.image) run_compose_analysis = False mode_desc = "Full Analysis (AI + Scanner)" - + print(f"\n[INFO] Mode: {mode_desc}") from docksec.config import RESULTS_DIR from docksec.config_manager import get_config from docksec.enums import LLMProvider + print(f"[INFO] Reports will be saved to: {RESULTS_DIR}") + if run_scan: + scanner_label = { + "trivy": "Trivy", + "grype": "Grype", + "all": "Trivy + Grype", + }.get(args.scanner, args.scanner) + print(f"[INFO] Vulnerability scanner: {scanner_label}") if run_ai: config = get_config() print(f"[INFO] AI Provider: {config.llm_provider}") - + # Initialize AI findings storage ai_findings = None - + # Run the AI-based recommendation tool if run_ai: print("\n=== Running AI-based Dockerfile analysis ===") try: # Import required modules from main.py + from pathlib import Path + + from docksec.config import ( + RESULTS_DIR, + docker_agent_prompt, + truncate_dockerfile, + ) from docksec.utils import ( - load_docker_file, - get_llm, + AnalyzesResponse, analyze_security, - AnalyzesResponse + get_llm, + load_docker_file, ) - from docksec.config import docker_agent_prompt, truncate_dockerfile, RESULTS_DIR - from pathlib import Path - + # Set up the same components as main.py llm = get_llm() - + # Use appropriate structured output method based on provider config = get_config() provider = config.llm_provider - + if provider == LLMProvider.OPENAI: - Report_llm = llm.with_structured_output(AnalyzesResponse, method="json_mode") + Report_llm = llm.with_structured_output( + AnalyzesResponse, method="json_mode" + ) else: # For Anthropic, Google, and Ollama, let LangChain choose the best method (usually tool calling) Report_llm = llm.with_structured_output(AnalyzesResponse) - + analyser_chain = docker_agent_prompt | Report_llm - + # Load and analyze the file if run_compose_analysis: filecontent = load_docker_file(docker_file_path=Path(args.compose)) @@ -194,54 +292,72 @@ def main() -> None: else: filecontent = load_docker_file(docker_file_path=Path(args.dockerfile)) file_type = "Dockerfile" - + if not filecontent: print(f"Error: No {file_type} content found.") return - + # Truncate content to reduce token usage - truncated_content = truncate_dockerfile(filecontent, max_lines=150, max_chars=4000) if run_compose_analysis else truncate_dockerfile(filecontent, max_lines=50, max_chars=2000) - + truncated_content = ( + truncate_dockerfile(filecontent, max_lines=150, max_chars=4000) + if run_compose_analysis + else truncate_dockerfile(filecontent, max_lines=50, max_chars=2000) + ) + response = analyser_chain.invoke({"filecontent": truncated_content}) - ai_findings = analyze_security(response, compact=True, report_path=RESULTS_DIR) - + ai_findings = analyze_security( + response, compact=True, report_path=RESULTS_DIR + ) + except ImportError as e: print(f"Error: Required modules not found - {e}") sys.exit(1) except Exception as e: print(f"Error running AI analysis: {e}") - + # Run the scanner tool if run_scan: - scan_type = "compose" if run_compose_analysis else ("image-only" if args.image_only else "full") + scan_type = ( + "compose" + if run_compose_analysis + else ("image-only" if args.image_only else "full") + ) print(f"\n=== Running {scan_type} security scanner ===") try: from docksec.docker_scanner import DockerSecurityScanner - + if run_compose_analysis: from docksec.compose_scanner import ComposeOrchestrator + orchestrator = ComposeOrchestrator( args.compose, scan_only=not run_ai, - skip_ai_scoring=args.skip_ai_scoring + skip_ai_scoring=args.skip_ai_scoring, + scanner=args.scanner, ) print(f"Scanning Compose file: {args.compose}") results = orchestrator.run_full_scan("CRITICAL,HIGH") - + # We need a scanner instance just for scoring and reporting - scanner = DockerSecurityScanner(None, None, scan_only=not run_ai, skip_ai_scoring=args.skip_ai_scoring) + scanner = DockerSecurityScanner( + None, + None, + scan_only=not run_ai, + skip_ai_scoring=args.skip_ai_scoring, + ) scanner.image_name = "Multiple Services" scanner.dockerfile_path = args.compose else: # Initialize the scanner dockerfile_path = None if args.image_only else args.dockerfile scanner = DockerSecurityScanner( - dockerfile_path, - args.image, + dockerfile_path, + args.image, scan_only=not run_ai, - skip_ai_scoring=args.skip_ai_scoring + skip_ai_scoring=args.skip_ai_scoring, + scanner=args.scanner, ) - + # Run appropriate scan based on mode if args.image_only: # Image-only scan - skip Dockerfile analysis @@ -250,24 +366,28 @@ def main() -> None: else: # Full scan including Dockerfile results = scanner.run_full_scan("CRITICAL,HIGH") - + # Calculate security score scanner.analysis_score = scanner.get_security_score(results) - + # Add AI findings to results if available if ai_findings: results["ai_findings"] = ai_findings - + # Generate all reports scanner.generate_all_reports(results) - + # Run advanced scan if available and image is provided (skip for compose) - if hasattr(scanner, 'advanced_scan') and args.image and not run_compose_analysis: + if ( + hasattr(scanner, "advanced_scan") + and args.image + and not run_compose_analysis + ): print("\n=== Running Advanced Scan ===") scanner.advanced_scan() - + print("\n=== Scanning Complete ===") - + except ValueError as e: print(f"Scanner error: {e}") except ImportError as e: @@ -275,11 +395,12 @@ def main() -> None: sys.exit(1) except Exception as e: print(f"Error running scanner: {e}") - + if not run_ai and not run_scan: print("No analysis performed. Use --help for usage information.") else: print("\nAnalysis complete!") + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/docksec/compose_scanner.py b/docksec/compose_scanner.py index a8ff38a..a4e9cdb 100644 --- a/docksec/compose_scanner.py +++ b/docksec/compose_scanner.py @@ -1,18 +1,21 @@ import os -from typing import Dict, List, Any from pathlib import Path +from typing import Any, Dict, List try: from ruamel.yaml import YAML except ImportError: - raise ImportError("ruamel.yaml is required for compose scanning. Install with: pip install ruamel.yaml") + raise ImportError( + "ruamel.yaml is required for compose scanning. Install with: pip install ruamel.yaml" + ) +from docksec.docker_scanner import DockerSecurityScanner from docksec.enums import Severity from docksec.utils import get_custom_logger -from docksec.docker_scanner import DockerSecurityScanner logger = get_custom_logger(__name__) + class ComposeScanner: def __init__(self, compose_path: str): self.compose_path = Path(compose_path).resolve() @@ -21,10 +24,10 @@ def __init__(self, compose_path: str): self.yaml.allow_duplicate_keys = True self.data = None self.findings = [] - + def parse(self) -> bool: try: - with open(self.compose_path, 'r') as f: + with open(self.compose_path, "r") as f: self.data = self.yaml.load(f) if not isinstance(self.data, dict): logger.error("Invalid compose file: not a dictionary") @@ -34,31 +37,42 @@ def parse(self) -> bool: logger.error(f"Failed to parse compose file: {e}") return False - def _add_finding(self, rule_id: str, severity: Severity, title: str, description: str, remediation: str, service: str, line: int): - self.findings.append({ - "VulnerabilityID": rule_id, - "Severity": severity.value, - "Title": title, - "Description": description, - "Remediation": remediation, - "Target": f"{self.compose_path.name}:{service}:{line}", - "PkgName": "docker-compose", - "InstalledVersion": "N/A", - "Status": "affected", - "CVSS": "N/A", - "PrimaryURL": "" - }) + def _add_finding( + self, + rule_id: str, + severity: Severity, + title: str, + description: str, + remediation: str, + service: str, + line: int, + ): + self.findings.append( + { + "VulnerabilityID": rule_id, + "Severity": severity.value, + "Title": title, + "Description": description, + "Remediation": remediation, + "Target": f"{self.compose_path.name}:{service}:{line}", + "PkgName": "docker-compose", + "InstalledVersion": "N/A", + "Status": "affected", + "CVSS": "N/A", + "PrimaryURL": "", + } + ) def _get_line(self, node: Any, default: int = 0) -> int: - if hasattr(node, 'lc') and node.lc.line is not None: + if hasattr(node, "lc") and node.lc.line is not None: return node.lc.line + 1 return default def scan(self) -> List[Dict]: - if not self.data or 'services' not in self.data: + if not self.data or "services" not in self.data: return self.findings - services = self.data.get('services', {}) + services = self.data.get("services", {}) if not isinstance(services, dict): return self.findings @@ -67,7 +81,7 @@ def scan(self) -> List[Dict]: for service_name, service_config in services.items(): if not isinstance(service_config, dict): continue - + service_line = self._get_line(service_config) # CRITICAL checks @@ -75,17 +89,25 @@ def scan(self) -> List[Dict]: self._check_privileged(service_name, service_config, service_line) self._check_host_network(service_name, service_config, service_line) self._check_host_namespace(service_name, service_config, service_line) - self._check_dangerous_capabilities(service_name, service_config, service_line) + self._check_dangerous_capabilities( + service_name, service_config, service_line + ) self._check_sensitive_host_mount(service_name, service_config, service_line) # HIGH checks self._check_plaintext_secret_env(service_name, service_config, service_line) - self._check_port_bound_all_interfaces(service_name, service_config, service_line) - self._check_disabled_security_opt(service_name, service_config, service_line) + self._check_port_bound_all_interfaces( + service_name, service_config, service_line + ) + self._check_disabled_security_opt( + service_name, service_config, service_line + ) self._check_no_non_root_user(service_name, service_config, service_line) # MEDIUM checks - self._check_latest_or_untagged_image(service_name, service_config, service_line) + self._check_latest_or_untagged_image( + service_name, service_config, service_line + ) self._check_no_resource_limits(service_name, service_config, service_line) self._check_env_file_secret_risk(service_name, service_config, service_line) self._check_writable_root_fs(service_name, service_config, service_line) @@ -93,283 +115,388 @@ def scan(self) -> List[Dict]: # LOW checks self._check_no_new_privileges(service_name, service_config, service_line) self._check_missing_healthcheck(service_name, service_config, service_line) - - if 'networks' in service_config: + + if "networks" in service_config: all_services_default_network = False if all_services_default_network and services: self._add_finding( - "compose-no-network-segmentation", Severity.LOW, "No Network Segmentation", + "compose-no-network-segmentation", + Severity.LOW, + "No Network Segmentation", "All services sit on the default network with no segmentation.", "Define separate networks and connect only services that must talk.", - "global", self._get_line(services) + "global", + self._get_line(services), ) return self.findings def _check_socket_mount(self, service: str, config: dict, default_line: int): - volumes = config.get('volumes', []) + volumes = config.get("volumes", []) if not isinstance(volumes, list): return for i, vol in enumerate(volumes): vol_str = str(vol) - if '/var/run/docker.sock' in vol_str: + if "/var/run/docker.sock" in vol_str: line = self._get_line(volumes, default_line) - if hasattr(volumes, 'lc') and hasattr(volumes.lc, 'data') and volumes.lc.data: + if ( + hasattr(volumes, "lc") + and hasattr(volumes.lc, "data") + and volumes.lc.data + ): # Try to get specific item line pass self._add_finding( - "compose-docker-socket-mount", Severity.CRITICAL, "Docker Socket Mount", + "compose-docker-socket-mount", + Severity.CRITICAL, + "Docker Socket Mount", "A service bind-mounts /var/run/docker.sock.", "Remove the mount; if socket access is genuinely required, front it with a scoped socket proxy.", - service, line + service, + line, ) def _check_privileged(self, service: str, config: dict, default_line: int): - if config.get('privileged') is True: + if config.get("privileged") is True: self._add_finding( - "compose-privileged", Severity.CRITICAL, "Privileged Container", + "compose-privileged", + Severity.CRITICAL, + "Privileged Container", "privileged: true on a service.", "Remove it and grant only the specific cap_add capabilities actually needed.", - service, self._get_line(config.get('privileged', config), default_line) + service, + self._get_line(config.get("privileged", config), default_line), ) def _check_host_network(self, service: str, config: dict, default_line: int): - if config.get('network_mode') == 'host': + if config.get("network_mode") == "host": self._add_finding( - "compose-host-network", Severity.CRITICAL, "Host Network Mode", + "compose-host-network", + Severity.CRITICAL, + "Host Network Mode", "network_mode: host.", "Use a defined network and publish only required ports.", - service, self._get_line(config.get('network_mode', config), default_line) + service, + self._get_line(config.get("network_mode", config), default_line), ) def _check_host_namespace(self, service: str, config: dict, default_line: int): - if config.get('pid') == 'host' or config.get('ipc') == 'host': + if config.get("pid") == "host" or config.get("ipc") == "host": self._add_finding( - "compose-host-namespace", Severity.CRITICAL, "Host Namespace", + "compose-host-namespace", + Severity.CRITICAL, + "Host Namespace", "pid: host or ipc: host.", "Remove unless strictly required.", - service, self._get_line(config, default_line) + service, + self._get_line(config, default_line), ) - def _check_dangerous_capabilities(self, service: str, config: dict, default_line: int): - cap_add = config.get('cap_add', []) + def _check_dangerous_capabilities( + self, service: str, config: dict, default_line: int + ): + cap_add = config.get("cap_add", []) if not isinstance(cap_add, list): return - dangerous = {'SYS_ADMIN', 'NET_ADMIN', 'SYS_PTRACE', 'ALL'} + dangerous = {"SYS_ADMIN", "NET_ADMIN", "SYS_PTRACE", "ALL"} for cap in cap_add: if str(cap).upper() in dangerous: self._add_finding( - "compose-dangerous-capabilities", Severity.CRITICAL, "Dangerous Capabilities", + "compose-dangerous-capabilities", + Severity.CRITICAL, + "Dangerous Capabilities", f"cap_add includes {cap}.", "Drop to least privilege.", - service, self._get_line(cap_add, default_line) + service, + self._get_line(cap_add, default_line), ) - def _check_sensitive_host_mount(self, service: str, config: dict, default_line: int): - volumes = config.get('volumes', []) + def _check_sensitive_host_mount( + self, service: str, config: dict, default_line: int + ): + volumes = config.get("volumes", []) if not isinstance(volumes, list): return - sensitive = ['/:', '/etc:', '/root:', '/var/run:', '/proc:', '/sys:'] + sensitive = ["/:", "/etc:", "/root:", "/var/run:", "/proc:", "/sys:"] for vol in volumes: vol_str = str(vol) if any(vol_str.startswith(s) for s in sensitive): self._add_finding( - "compose-sensitive-host-mount", Severity.CRITICAL, "Sensitive Host Mount", + "compose-sensitive-host-mount", + Severity.CRITICAL, + "Sensitive Host Mount", f"Bind mount of sensitive host directory: {vol_str.split(':')[0]}.", "Scope to a specific subpath and mount read-only where possible.", - service, self._get_line(volumes, default_line) + service, + self._get_line(volumes, default_line), ) - def _check_plaintext_secret_env(self, service: str, config: dict, default_line: int): - env = config.get('environment', {}) + def _check_plaintext_secret_env( + self, service: str, config: dict, default_line: int + ): + env = config.get("environment", {}) if isinstance(env, list): # Handle list format: - VAR=value for item in env: - if isinstance(item, str) and '=' in item: - k, v = item.split('=', 1) - if self._is_secret_key(k) and v and not v.startswith('${'): + if isinstance(item, str) and "=" in item: + k, v = item.split("=", 1) + if self._is_secret_key(k) and v and not v.startswith("${"): self._add_finding( - "compose-plaintext-secret-env", Severity.HIGH, "Plaintext Secret Environment Variable", + "compose-plaintext-secret-env", + Severity.HIGH, + "Plaintext Secret Environment Variable", "A likely secret in an environment value.", "Move to Docker secrets or an injected secret store; never commit.", - service, self._get_line(env, default_line) + service, + self._get_line(env, default_line), ) elif isinstance(env, dict): for k, v in env.items(): - if self._is_secret_key(k) and v and not str(v).startswith('${'): + if self._is_secret_key(k) and v and not str(v).startswith("${"): self._add_finding( - "compose-plaintext-secret-env", Severity.HIGH, "Plaintext Secret Environment Variable", + "compose-plaintext-secret-env", + Severity.HIGH, + "Plaintext Secret Environment Variable", "A likely secret in an environment value.", "Move to Docker secrets or an injected secret store; never commit.", - service, self._get_line(env, default_line) + service, + self._get_line(env, default_line), ) def _is_secret_key(self, key: str) -> bool: key = str(key).lower() - return any(s in key for s in ['password', 'secret', 'token', 'api_key', 'private_key', 'private-key']) - - def _check_port_bound_all_interfaces(self, service: str, config: dict, default_line: int): - ports = config.get('ports', []) + return any( + s in key + for s in [ + "password", + "secret", + "token", + "api_key", + "private_key", + "private-key", + ] + ) + + def _check_port_bound_all_interfaces( + self, service: str, config: dict, default_line: int + ): + ports = config.get("ports", []) if not isinstance(ports, list): return for port in ports: port_str = str(port) # If it's just "8080:80" or "8080", it binds to 0.0.0.0 by default # If it's "127.0.0.1:8080:80", it's bound to localhost - if ':' in port_str and not port_str.startswith('127.0.0.1:') and not port_str.startswith('localhost:'): + if ( + ":" in port_str + and not port_str.startswith("127.0.0.1:") + and not port_str.startswith("localhost:") + ): self._add_finding( - "compose-port-bound-all-interfaces", Severity.HIGH, "Port Bound to All Interfaces", + "compose-port-bound-all-interfaces", + Severity.HIGH, + "Port Bound to All Interfaces", "A sensitive or admin port published with no host IP, binding 0.0.0.0.", "Bind to 127.0.0.1, or use expose for internal-only traffic.", - service, self._get_line(ports, default_line) + service, + self._get_line(ports, default_line), ) - def _check_disabled_security_opt(self, service: str, config: dict, default_line: int): - sec_opt = config.get('security_opt', []) + def _check_disabled_security_opt( + self, service: str, config: dict, default_line: int + ): + sec_opt = config.get("security_opt", []) if not isinstance(sec_opt, list): return for opt in sec_opt: opt_str = str(opt).lower() - if 'apparmor:unconfined' in opt_str or 'seccomp:unconfined' in opt_str: + if "apparmor:unconfined" in opt_str or "seccomp:unconfined" in opt_str: self._add_finding( - "compose-disabled-security-opt", Severity.HIGH, "Disabled Security Options", + "compose-disabled-security-opt", + Severity.HIGH, + "Disabled Security Options", "security_opt sets apparmor:unconfined or seccomp:unconfined.", "Keep default profiles unless there is a tested reason.", - service, self._get_line(sec_opt, default_line) + service, + self._get_line(sec_opt, default_line), ) def _check_no_non_root_user(self, service: str, config: dict, default_line: int): - if 'user' not in config: + if "user" not in config: self._add_finding( - "compose-no-non-root-user", Severity.HIGH, "No Non-Root User", + "compose-no-non-root-user", + Severity.HIGH, + "No Non-Root User", "A service has no user directive and the image likely runs as root.", "Set a non-root user.", - service, default_line + service, + default_line, ) - def _check_latest_or_untagged_image(self, service: str, config: dict, default_line: int): - image = config.get('image', '') - if image and (':' not in image or image.endswith(':latest')): + def _check_latest_or_untagged_image( + self, service: str, config: dict, default_line: int + ): + image = config.get("image", "") + if image and (":" not in image or image.endswith(":latest")): self._add_finding( - "compose-latest-or-untagged-image", Severity.MEDIUM, "Latest or Untagged Image", + "compose-latest-or-untagged-image", + Severity.MEDIUM, + "Latest or Untagged Image", "image uses :latest or has no tag.", "Pin to a specific, ideally digest-addressed version.", - service, self._get_line(config.get('image', config), default_line) + service, + self._get_line(config.get("image", config), default_line), ) def _check_no_resource_limits(self, service: str, config: dict, default_line: int): has_limits = False - if 'deploy' in config and isinstance(config['deploy'], dict): - if 'resources' in config['deploy'] and isinstance(config['deploy']['resources'], dict): - if 'limits' in config['deploy']['resources']: + if "deploy" in config and isinstance(config["deploy"], dict): + if "resources" in config["deploy"] and isinstance( + config["deploy"]["resources"], dict + ): + if "limits" in config["deploy"]["resources"]: has_limits = True - if 'mem_limit' in config or 'cpu_limit' in config: + if "mem_limit" in config or "cpu_limit" in config: has_limits = True - + if not has_limits: self._add_finding( - "compose-no-resource-limits", Severity.MEDIUM, "No Resource Limits", + "compose-no-resource-limits", + Severity.MEDIUM, + "No Resource Limits", "No memory or CPU limits.", "Set limits to bound the DoS and blast-radius surface.", - service, default_line + service, + default_line, ) - def _check_env_file_secret_risk(self, service: str, config: dict, default_line: int): - if 'env_file' in config: + def _check_env_file_secret_risk( + self, service: str, config: dict, default_line: int + ): + if "env_file" in config: self._add_finding( - "compose-env-file-secret-risk", Severity.MEDIUM, "Environment File Secret Risk", + "compose-env-file-secret-risk", + Severity.MEDIUM, + "Environment File Secret Risk", "env_file points to a file that may carry secrets into the repo.", "Keep secret files out of version control and use a secret manager.", - service, self._get_line(config.get('env_file', config), default_line) + service, + self._get_line(config.get("env_file", config), default_line), ) def _check_writable_root_fs(self, service: str, config: dict, default_line: int): - if config.get('read_only') is not True: + if config.get("read_only") is not True: self._add_finding( - "compose-writable-root-fs", Severity.MEDIUM, "Writable Root Filesystem", + "compose-writable-root-fs", + Severity.MEDIUM, + "Writable Root Filesystem", "no read_only: true on a service that does not need a writable root.", "Set read_only with explicit tmpfs where needed.", - service, default_line + service, + default_line, ) def _check_no_new_privileges(self, service: str, config: dict, default_line: int): - sec_opt = config.get('security_opt', []) + sec_opt = config.get("security_opt", []) if isinstance(sec_opt, list): - has_no_new_privs = any('no-new-privileges:true' in str(opt).lower() for opt in sec_opt) + has_no_new_privs = any( + "no-new-privileges:true" in str(opt).lower() for opt in sec_opt + ) if not has_no_new_privs: self._add_finding( - "compose-no-new-privileges", Severity.LOW, "Missing No-New-Privileges", + "compose-no-new-privileges", + Severity.LOW, + "Missing No-New-Privileges", "security_opt is missing no-new-privileges:true.", "Add it.", - service, default_line + service, + default_line, ) else: self._add_finding( - "compose-no-new-privileges", Severity.LOW, "Missing No-New-Privileges", + "compose-no-new-privileges", + Severity.LOW, + "Missing No-New-Privileges", "security_opt is missing no-new-privileges:true.", "Add it.", - service, default_line + service, + default_line, ) def _check_missing_healthcheck(self, service: str, config: dict, default_line: int): - if 'healthcheck' not in config: + if "healthcheck" not in config: self._add_finding( - "compose-missing-healthcheck", Severity.LOW, "Missing Healthcheck", + "compose-missing-healthcheck", + Severity.LOW, + "Missing Healthcheck", "A long-running service has no healthcheck.", "Add one for safer restarts.", - service, default_line + service, + default_line, ) def get_services(self) -> Dict[str, Dict]: - if not self.data or 'services' not in self.data: + if not self.data or "services" not in self.data: return {} - return self.data.get('services', {}) + return self.data.get("services", {}) + class ComposeOrchestrator: - def __init__(self, compose_path: str, scan_only: bool = False, skip_ai_scoring: bool = False): + def __init__( + self, + compose_path: str, + scan_only: bool = False, + skip_ai_scoring: bool = False, + scanner: str = "trivy", + ): self.compose_path = compose_path self.scan_only = scan_only self.skip_ai_scoring = skip_ai_scoring + self.vuln_scanner = scanner self.scanner = ComposeScanner(compose_path) - + def run_full_scan(self, severity: str = "CRITICAL,HIGH") -> Dict: if not self.scanner.parse(): return { - 'dockerfile_scan': {'success': False, 'output': "Failed to parse compose file", 'skipped': False}, - 'image_scan': {'success': False, 'output': None, 'skipped': True}, - 'json_data': [], - 'timestamp': "", - 'image_name': "N/A", - 'dockerfile_path': self.compose_path, - 'scan_mode': 'compose' + "dockerfile_scan": { + "success": False, + "output": "Failed to parse compose file", + "skipped": False, + }, + "image_scan": {"success": False, "output": None, "skipped": True}, + "json_data": [], + "timestamp": "", + "image_name": "N/A", + "dockerfile_path": self.compose_path, + "scan_mode": "compose", } - + compose_findings = self.scanner.scan() all_findings = list(compose_findings) - + services = self.scanner.get_services() - + dockerfile_outputs = [] image_outputs = [] all_success = True - + for service_name, config in services.items(): if not isinstance(config, dict): continue - + dockerfile_path = None - image_name = config.get('image') - - build = config.get('build') + image_name = config.get("image") + + build = config.get("build") if build: if isinstance(build, str): - dockerfile_path = os.path.join(build, 'Dockerfile') + dockerfile_path = os.path.join(build, "Dockerfile") elif isinstance(build, dict): - context = build.get('context', '.') - dockerfile = build.get('dockerfile', 'Dockerfile') + context = build.get("context", ".") + dockerfile = build.get("dockerfile", "Dockerfile") dockerfile_path = os.path.join(context, dockerfile) - + if dockerfile_path and not os.path.isfile(dockerfile_path): # Try relative to compose file compose_dir = os.path.dirname(self.compose_path) @@ -378,74 +505,101 @@ def run_full_scan(self, severity: str = "CRITICAL,HIGH") -> Dict: dockerfile_path = alt_path else: dockerfile_path = None - + if not dockerfile_path and not image_name: continue - - logger.info(f"Scanning service {service_name} (Dockerfile: {dockerfile_path}, Image: {image_name})") - + + logger.info( + f"Scanning service {service_name} (Dockerfile: {dockerfile_path}, Image: {image_name})" + ) + try: service_scanner = DockerSecurityScanner( dockerfile_path=dockerfile_path, image_name=image_name, scan_only=self.scan_only, - skip_ai_scoring=self.skip_ai_scoring + skip_ai_scoring=self.skip_ai_scoring, + scanner=self.vuln_scanner, ) - + # Disable cache for service scans to ensure fresh results? # service_scanner.use_cache = False - + if dockerfile_path and not image_name: # Only dockerfile df_success, df_output = service_scanner.scan_dockerfile() if not df_success: all_success = False if df_output: - dockerfile_outputs.append(f"--- Service: {service_name} ---\n{df_output}") + dockerfile_outputs.append( + f"--- Service: {service_name} ---\n{df_output}" + ) elif image_name and not dockerfile_path: # Only image res = service_scanner.run_image_only_scan(severity) - if not res['image_scan']['success']: + if not res["image_scan"]["success"]: all_success = False - if res['image_scan']['output']: - image_outputs.append(f"--- Service: {service_name} ---\n{res['image_scan']['output']}") - if res.get('json_data'): + if res["image_scan"]["output"]: + image_outputs.append( + f"--- Service: {service_name} ---\n{res['image_scan']['output']}" + ) + if res.get("json_data"): # Tag findings with service name - for f in res['json_data']: - f['Target'] = f"{service_name} ({f.get('Target', '')})" - all_findings.extend(res['json_data']) + for f in res["json_data"]: + f["Target"] = f"{service_name} ({f.get('Target', '')})" + all_findings.extend(res["json_data"]) else: # Both res = service_scanner.run_full_scan(severity) - if not res['dockerfile_scan']['success'] or not res['image_scan']['success']: + if ( + not res["dockerfile_scan"]["success"] + or not res["image_scan"]["success"] + ): all_success = False - if res['dockerfile_scan']['output'] and not res['dockerfile_scan'].get('skipped'): - dockerfile_outputs.append(f"--- Service: {service_name} ---\n{res['dockerfile_scan']['output']}") - if res['image_scan']['output'] and not res['image_scan'].get('skipped'): - image_outputs.append(f"--- Service: {service_name} ---\n{res['image_scan']['output']}") - if res.get('json_data'): - for f in res['json_data']: - f['Target'] = f"{service_name} ({f.get('Target', '')})" - all_findings.extend(res['json_data']) + if res["dockerfile_scan"]["output"] and not res[ + "dockerfile_scan" + ].get("skipped"): + dockerfile_outputs.append( + f"--- Service: {service_name} ---\n{res['dockerfile_scan']['output']}" + ) + if res["image_scan"]["output"] and not res["image_scan"].get( + "skipped" + ): + image_outputs.append( + f"--- Service: {service_name} ---\n{res['image_scan']['output']}" + ) + if res.get("json_data"): + for f in res["json_data"]: + f["Target"] = f"{service_name} ({f.get('Target', '')})" + all_findings.extend(res["json_data"]) except Exception as e: logger.error(f"Failed to scan service {service_name}: {e}") all_success = False - + from datetime import datetime + return { - 'dockerfile_scan': { - 'success': all_success, - 'output': "\n\n".join(dockerfile_outputs) if dockerfile_outputs else "No Dockerfile issues found or scanned.", - 'skipped': not bool(dockerfile_outputs) + "dockerfile_scan": { + "success": all_success, + "output": ( + "\n\n".join(dockerfile_outputs) + if dockerfile_outputs + else "No Dockerfile issues found or scanned." + ), + "skipped": not bool(dockerfile_outputs), }, - 'image_scan': { - 'success': all_success, - 'output': "\n\n".join(image_outputs) if image_outputs else "No image issues found or scanned.", - 'skipped': not bool(image_outputs) + "image_scan": { + "success": all_success, + "output": ( + "\n\n".join(image_outputs) + if image_outputs + else "No image issues found or scanned." + ), + "skipped": not bool(image_outputs), }, - 'json_data': all_findings, - 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - 'image_name': "Multiple Services", - 'dockerfile_path': self.compose_path, - 'scan_mode': 'compose' + "json_data": all_findings, + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "image_name": "Multiple Services", + "dockerfile_path": self.compose_path, + "scan_mode": "compose", } diff --git a/docksec/docker_scanner.py b/docksec/docker_scanner.py index d4c9514..2d1d7f0 100644 --- a/docksec/docker_scanner.py +++ b/docksec/docker_scanner.py @@ -1,72 +1,76 @@ -import os -import json -import subprocess import csv import hashlib -from typing import List, Tuple, Dict, Optional -from datetime import datetime -from fpdf import FPDF -import sys +import json +import os import re +import subprocess +import sys +from collections import defaultdict +from datetime import datetime from pathlib import Path +from typing import Dict, List, Optional, Tuple + +from fpdf import FPDF + from docksec.config import RESULTS_DIR, docker_score_prompt from docksec.enums import Severity -from docksec.utils import ScoreResponse, get_llm, print_section, get_custom_logger -from collections import defaultdict +from docksec.utils import ScoreResponse, get_custom_logger, get_llm, print_section # Initialize logger logger = get_custom_logger(__name__) + class ScanResultsCache: """Simple cache for scan results to avoid re-scanning same images.""" - + def __init__(self, cache_dir: str = RESULTS_DIR): self.cache_file = os.path.join(cache_dir, ".docksec_cache.json") self.cache = self._load_cache() - + def _load_cache(self) -> Dict: """Load cache from disk.""" if os.path.exists(self.cache_file): try: - with open(self.cache_file, 'r') as f: + with open(self.cache_file, "r") as f: return json.load(f) except (json.JSONDecodeError, IOError): return {} return {} - + def _save_cache(self) -> None: """Save cache to disk.""" try: - with open(self.cache_file, 'w') as f: + with open(self.cache_file, "w") as f: json.dump(self.cache, f, indent=2) except IOError as e: logger.warning(f"Failed to save cache: {e}") - + def get_key(self, image_name: str) -> str: """Generate cache key from image name.""" return hashlib.md5(image_name.encode()).hexdigest() - + def get(self, image_name: str) -> Optional[Dict]: """Get cached results for an image.""" key = self.get_key(image_name) return self.cache.get(key) - + def set(self, image_name: str, results: Dict) -> None: """Cache scan results for an image.""" key = self.get_key(image_name) self.cache[key] = { "image": image_name, "timestamp": datetime.now().isoformat(), - "results": results + "results": results, } self._save_cache() - + def clear_old(self, days: int = 7) -> None: """Clear cache entries older than specified days.""" from datetime import timedelta + cutoff = datetime.now() - timedelta(days=days) keys_to_delete = [] - + for key, entry in self.cache.items(): try: entry_time = datetime.fromisoformat(entry.get("timestamp", "")) @@ -74,26 +78,33 @@ def clear_old(self, days: int = 7) -> None: keys_to_delete.append(key) except (ValueError, TypeError): keys_to_delete.append(key) - + for key in keys_to_delete: del self.cache[key] - + if keys_to_delete: self._save_cache() logger.info(f"Cleared {len(keys_to_delete)} old cache entries") + class DockerSecurityScanner: + @property + def _cache_key(self) -> str: + """Cache key incorporating image name and scanner mode to prevent cross-scanner hits.""" + scanner_mode = getattr(self, "scanner", "trivy") + return f"{self.image_name}[{scanner_mode}]" + @staticmethod def _validate_file_path(file_path: str) -> Path: """ Validate and sanitize file path to prevent path traversal attacks. - + Args: file_path: Path to validate - + Returns: Path object if valid - + Raises: ValueError: If path is invalid or contains path traversal attempts """ @@ -102,7 +113,7 @@ def _validate_file_path(file_path: str) -> Path: # Check the raw string before resolution — Path.resolve() removes '..' # so checking the resolved path would silently allow traversal attempts. - if '..' in file_path: + if ".." in file_path: raise ValueError(f"Invalid path: path traversal detected in '{file_path}'") try: @@ -110,106 +121,137 @@ def _validate_file_path(file_path: str) -> Path: return path except (OSError, ValueError) as e: raise ValueError(f"Invalid file path '{file_path}': {str(e)}") - + @staticmethod def _validate_image_name(image_name: str) -> str: """ Validate Docker image name format. - + Args: image_name: Docker image name to validate - + Returns: Sanitized image name - + Raises: ValueError: If image name is invalid """ if not image_name: raise ValueError("Image name cannot be empty") - + # Basic validation - image names should be alphanumeric with :, /, -, _, . # More lenient than strict Docker validation, but prevents obvious injection if len(image_name) > 512: # Docker image name max length - raise ValueError(f"Image name too long (max 512 characters): {len(image_name)}") - + raise ValueError( + f"Image name too long (max 512 characters): {len(image_name)}" + ) + # Check for path traversal attempts - if '..' in image_name or image_name.startswith('/'): - raise ValueError(f"Image name contains path traversal or absolute path: '{image_name}'") - + if ".." in image_name or image_name.startswith("/"): + raise ValueError( + f"Image name contains path traversal or absolute path: '{image_name}'" + ) + # Whitelist: Docker image names allow alphanumeric, '/', ':', '-', '_', '.', '@' # Anything outside this set (spaces, shell metacharacters, etc.) is rejected. - if not re.match(r'^[a-zA-Z0-9/:._\-@]+$', image_name): + if not re.match(r"^[a-zA-Z0-9/:._\-@]+$", image_name): raise ValueError(f"Image name contains invalid characters: '{image_name}'") - + return image_name.strip() - + @staticmethod def _validate_severity(severity: str) -> str: """ Validate severity string for Trivy. - + Args: severity: Comma-separated severity levels - + Returns: Validated severity string - + Raises: ValueError: If severity contains invalid values """ if not severity: raise ValueError("Severity cannot be empty") - + valid_severities = Severity.values() - severity_list = [s.strip().upper() for s in severity.split(',')] + severity_list = [s.strip().upper() for s in severity.split(",")] for sev in severity_list: if sev not in valid_severities: - raise ValueError(f"Invalid severity level: {sev}. Valid values: {', '.join(valid_severities)}") - - return ','.join(severity_list) - - def _print_compact_vulnerability_summary(self, vulnerabilities: List[Dict]) -> None: + raise ValueError( + f"Invalid severity level: {sev}. Valid values: {', '.join(valid_severities)}" + ) + + return ",".join(severity_list) + + def _print_compact_vulnerability_summary( + self, vulnerabilities: List[Dict], label: str = "" + ) -> None: """ Print a compact summary of vulnerabilities without full details. Shows count by severity in a single-line format. - + Args: vulnerabilities: List of vulnerability dictionaries + label: Optional prefix label (e.g. "[Trivy]", "[Grype]") """ + prefix = f"{label} " if label else "" if not vulnerabilities: - print("[SUCCESS] No vulnerabilities found.") + print(f" {prefix}[SUCCESS] No vulnerabilities found.") return - + severity_counts = defaultdict(int) for vuln in vulnerabilities: - severity = vuln.get('Severity', Severity.UNKNOWN) + severity = vuln.get("Severity", Severity.UNKNOWN) severity_counts[severity] += 1 - + # Print compact single-line summary total = sum(severity_counts.values()) - severity_order = [Severity.CRITICAL, Severity.HIGH, Severity.MEDIUM, Severity.LOW] + severity_order = [ + Severity.CRITICAL, + Severity.HIGH, + Severity.MEDIUM, + Severity.LOW, + ] summary_parts = [] - + for severity in severity_order: count = severity_counts.get(severity, 0) if count > 0: summary_parts.append(f"{severity}: {count}") - - print(f" [VULNERABILITIES] {' | '.join(summary_parts)} | Total: {total}") - + + print( + f" {prefix}[VULNERABILITIES] {' | '.join(summary_parts)} | Total: {total}" + ) + # Show top 3 critical/high only - critical_high = [v for v in vulnerabilities if v.get('Severity') in [Severity.CRITICAL, Severity.HIGH]] + critical_high = [ + v + for v in vulnerabilities + if v.get("Severity") in [Severity.CRITICAL, Severity.HIGH] + ] if critical_high: print(" Top Issues:") for i, vuln in enumerate(critical_high[:3], 1): - title = vuln.get('Title', 'N/A') + title = vuln.get("Title", "N/A") if title and len(title) > 60: title = title[:57] + "..." - print(f" • [{vuln.get('Severity')}] {vuln.get('VulnerabilityID', 'N/A')}: {title}") - - def __init__(self, dockerfile_path: Optional[str], image_name: Optional[str], results_dir: str = RESULTS_DIR, scan_only: bool = False, skip_ai_scoring: bool = False): + print( + f" • [{vuln.get('Severity')}] {vuln.get('VulnerabilityID', 'N/A')}: {title}" + ) + + def __init__( + self, + dockerfile_path: Optional[str], + image_name: Optional[str], + results_dir: str = RESULTS_DIR, + scan_only: bool = False, + skip_ai_scoring: bool = False, + scanner: str = "trivy", + ): """ Initialize the Docker Security Scanner with a Dockerfile path and/or image name. Verifies that required tools are installed and the specified files exist. @@ -231,47 +273,64 @@ def __init__(self, dockerfile_path: Optional[str], image_name: Optional[str], re self.dockerfile_path = str(validated_path) else: self.dockerfile_path = None - - self.required_tools = ['trivy'] + + # Validate scanner choice + valid_scanners = ("trivy", "grype", "all") + if scanner not in valid_scanners: + raise ValueError( + f"Invalid scanner: '{scanner}'. Valid options: {valid_scanners}" + ) + self.scanner = scanner + + self.required_tools = ["trivy"] if self.image_name: - self.required_tools.append('docker') + self.required_tools.append("docker") if self.dockerfile_path: - self.required_tools.append('hadolint') + self.required_tools.append("hadolint") self.RESULTS_DIR = results_dir self.scan_only = scan_only self.skip_ai_scoring = skip_ai_scoring - self.analysis_score = None # Initialize to avoid AttributeError when accessed before calculation - + self.analysis_score = ( + None # Initialize to avoid AttributeError when accessed before calculation + ) + # Initialize score chain: skip if scan_only or skip_ai_scoring flags are set if scan_only or skip_ai_scoring: self.score_chain = None else: try: - from docksec.enums import LLMProvider from docksec.config_manager import get_config + from docksec.enums import LLMProvider + config = get_config() provider = config.llm_provider llm = get_llm() - + if provider == LLMProvider.OPENAI: - self.score_chain = docker_score_prompt | llm.with_structured_output(ScoreResponse, method="json_mode") + self.score_chain = docker_score_prompt | llm.with_structured_output( + ScoreResponse, method="json_mode" + ) else: - self.score_chain = docker_score_prompt | llm.with_structured_output(ScoreResponse) + self.score_chain = docker_score_prompt | llm.with_structured_output( + ScoreResponse + ) except Exception as e: logger.warning(f"Failed to initialize AI scoring: {e}") self.score_chain = None - + # Ensure results directory exists try: os.makedirs(self.RESULTS_DIR, exist_ok=True) except Exception as e: logger.error(f"Failed to create results directory {self.RESULTS_DIR}: {e}") # Fallback is handled in config.py, but this is a safety check - + # Initialize output mode for console display - self.compact_output = os.getenv("DOCKSEC_COMPACT_OUTPUT", "false").lower() == "true" - + self.compact_output = ( + os.getenv("DOCKSEC_COMPACT_OUTPUT", "false").lower() == "true" + ) + # Initialize cache self.cache = ScanResultsCache(self.RESULTS_DIR) self.use_cache = os.getenv("DOCKSEC_USE_CACHE", "true").lower() == "true" @@ -284,26 +343,62 @@ def __init__(self, dockerfile_path: Optional[str], image_name: Optional[str], re for tool in missing_tools: error_msg += f"\n{tool.upper()}:\n{self._get_tool_installation_instructions(tool)}\n" raise ValueError(error_msg) - + + # Check optional Grype availability (does not raise — Grype is optional) + if self.scanner in ("grype", "all"): + try: + subprocess.run( + ["grype", "version"], + capture_output=True, + check=True, + timeout=10, + shell=False, + ) + self._grype_available = True + except ( + subprocess.CalledProcessError, + FileNotFoundError, + subprocess.TimeoutExpired, + ): + self._grype_available = False + if self.scanner == "grype": + print("[WARNING] Grype not found. Falling back to Trivy.") + print( + "[TIP] Install Grype: run docksec-setup or visit https://github.com/anchore/grype" + ) + self.scanner = "trivy" + else: + print( + "[WARNING] Grype not found. Using Trivy only for --scanner all." + ) + print( + "[TIP] Install Grype: run docksec-setup or visit https://github.com/anchore/grype" + ) + else: + self._grype_available = False + # Verify Dockerfile exists (after validation) if self.dockerfile_path and not os.path.exists(self.dockerfile_path): raise ValueError(f"Dockerfile not found at {self.dockerfile_path}") - + # Verify Docker image exists (using validated image_name) if provided if self.image_name: try: subprocess.run( - ['docker', 'image', 'inspect', self.image_name], + ["docker", "image", "inspect", self.image_name], capture_output=True, check=True, text=True, timeout=30, - shell=False # Explicitly disable shell for security + shell=False, # Explicitly disable shell for security ) except subprocess.CalledProcessError as e: # Check if the error is due to permission issues error_output = e.stderr.lower() if e.stderr else "" - if "permission denied" in error_output or "cannot connect to the docker daemon" in error_output: + if ( + "permission denied" in error_output + or "cannot connect to the docker daemon" in error_output + ): raise ValueError( f"Unable to access Docker. This may require elevated permissions.\n" f"Possible solutions:\n" @@ -318,54 +413,76 @@ def __init__(self, dockerfile_path: Optional[str], image_name: Optional[str], re raise ValueError( "Docker command not found. Please ensure Docker is installed and accessible in your PATH." ) + def run_image_only_scan(self, severity: str = "CRITICAL,HIGH") -> Dict: """ Run image-only security scan without Dockerfile analysis. - + Args: severity: Comma-separated list of severity levels to scan for - + Returns: Dictionary containing scan results """ # Check cache first if self.use_cache: - cached = self.cache.get(self.image_name) + cached = self.cache.get(self._cache_key) if cached: - print(f"[INFO] Using cached scan results for {self.image_name} (scanned at {cached.get('timestamp', 'N/A')})") - print("[TIP] To bypass cache, set environment variable DOCKSEC_USE_CACHE=false") - return cached.get('results', {}) - + print( + f"[INFO] Using cached scan results for {self.image_name} (scanned at {cached.get('timestamp', 'N/A')})" + ) + print( + "[TIP] To bypass cache, set environment variable DOCKSEC_USE_CACHE=false" + ) + return cached.get("results", {}) + # Validate severity input severity = self._validate_severity(severity) logger.info(f"Starting image-only scan for {self.image_name}") - + results = { - 'dockerfile_scan': { - 'success': True, # Skip Dockerfile scan - 'output': "Skipped - Image-only scan mode", - 'skipped': True - }, - 'image_scan': { - 'success': False, - 'output': None + "dockerfile_scan": { + "success": True, # Skip Dockerfile scan + "output": "Skipped - Image-only scan mode", + "skipped": True, }, - 'json_data': [], - 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - 'image_name': self.image_name, - 'dockerfile_path': self.dockerfile_path or "N/A - Image-only scan", - 'scan_mode': 'image_only' + "image_scan": {"success": False, "output": None}, + "json_data": [], + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "image_name": self.image_name, + "dockerfile_path": self.dockerfile_path or "N/A - Image-only scan", + "scan_mode": "image_only", } - # Run image vulnerability scan - image_success, image_output = self.scan_image(severity) - results['image_scan']['success'] = image_success - results['image_scan']['output'] = image_output + scanner_mode = getattr(self, "scanner", "trivy") + trivy_data: List[Dict] = [] + grype_data: List[Dict] = [] + + if scanner_mode in ("trivy", "all"): + image_success, image_output = self.scan_image(severity) + results["image_scan"]["success"] = image_success + results["image_scan"]["output"] = image_output + trivy_success, trivy_data = self.scan_image_json(severity) + trivy_data = trivy_data or [] + + if scanner_mode in ("grype", "all") and getattr( + self, "_grype_available", False + ): + grype_success, grype_data = self.scan_image_grype(severity) + grype_data = grype_data or [] + if scanner_mode == "grype": + results["image_scan"]["success"] = grype_success + + if scanner_mode == "trivy": + results["json_data"] = trivy_data + elif scanner_mode == "grype": + results["json_data"] = grype_data + else: # "all" + results["json_data"] = self._deduplicate_vulnerabilities( + trivy_data, grype_data + ) - # Get JSON data for vulnerabilities - json_success, json_data = self.scan_image_json(severity) - if json_success: - results['json_data'] = json_data + json_data = results["json_data"] # Cache results if self.use_cache: @@ -373,63 +490,76 @@ def run_image_only_scan(self, severity: str = "CRITICAL,HIGH") -> Dict: # Print final summary if not json_data: - print(f"[SUCCESS] Image scan completed for {self.image_name} (no vulnerabilities found).") + print( + f"[SUCCESS] Image scan completed for {self.image_name} (no vulnerabilities found)." + ) else: - severity_counts = defaultdict(int) - for v in json_data: - severity_counts[v.get('Severity', Severity.UNKNOWN)] += 1 - print(f"[INFO] Image scan completed for {self.image_name}. Found {len(json_data)} vulnerabilities.") - # self._print_compact_vulnerability_summary(json_data) is already called in scan_image_json - - return results - + print( + f"[INFO] Image scan completed for {self.image_name}. Found {len(json_data)} vulnerabilities." + ) + + return results + def _check_tools(self) -> List[str]: """Check if all required tools are installed and return list of missing tools.""" missing_tools = [] - + for tool in self.required_tools: try: subprocess.run( - [tool, '--version'], + [tool, "--version"], capture_output=True, check=True, timeout=10, - shell=False + shell=False, ) - except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired): + except ( + subprocess.CalledProcessError, + FileNotFoundError, + subprocess.TimeoutExpired, + ): missing_tools.append(tool) - + return missing_tools - + def _get_tool_installation_instructions(self, tool: str) -> str: """Get installation instructions for a missing tool.""" instructions = { - 'docker': ( + "docker": ( "Docker is required for image scanning. Please install Docker:\n" " - Linux: https://docs.docker.com/engine/install/\n" " - macOS: https://docs.docker.com/desktop/install/mac-install/\n" " - Windows: https://docs.docker.com/desktop/install/windows-install/" ), - 'trivy': ( + "trivy": ( "Trivy is required for vulnerability scanning. Install it:\n" " - Linux/Mac: curl -sfL https://raw.githubusercontent.com/aquasecurity/trivy/main/contrib/install.sh | sh -s -- -b /usr/local/bin\n" " - Windows: See https://aquasecurity.github.io/trivy/latest/getting-started/installation/\n" " - Or run: python setup_external_tools.py" ), - 'hadolint': ( + "hadolint": ( "Hadolint is required for Dockerfile linting. Install it:\n" " - Linux: curl -L -o hadolint https://github.com/hadolint/hadolint/releases/latest/download/hadolint-Linux-x86_64 && chmod +x hadolint && sudo mv hadolint /usr/local/bin/\n" " - macOS: brew install hadolint\n" " - Windows: See https://github.com/hadolint/hadolint#install\n" " - Or run: python setup_external_tools.py" - ) + ), + "grype": ( + "Grype is an optional vulnerability scanner (complements Trivy). Install it:\n" + " - Linux/Mac: curl -sSfL https://raw.githubusercontent.com/anchore/grype/main/install.sh | sh -s -- -b /usr/local/bin\n" + " - macOS: brew install anchore/grype/grype\n" + " - Windows: See https://github.com/anchore/grype#installation\n" + " - Or run: python setup_external_tools.py" + ), } - return instructions.get(tool, f"Please install {tool} from its official documentation.") + return instructions.get( + tool, f"Please install {tool} from its official documentation." + ) def scan_dockerfile(self) -> Tuple[bool, Optional[str]]: """ Scan Dockerfile using Hadolint. - + Returns: Tuple containing: - bool: True if no issues found, False otherwise @@ -439,26 +569,28 @@ def scan_dockerfile(self) -> Tuple[bool, Optional[str]]: print("\n=== Starting Dockerfile scan with Hadolint ===") try: result = subprocess.run( - ['hadolint', self.dockerfile_path], + ["hadolint", self.dockerfile_path], capture_output=True, text=True, timeout=300, - shell=False + shell=False, ) - + if result.returncode != 0: output = result.stdout if result.stdout else result.stderr logger.warning(f"Hadolint found issues in {self.dockerfile_path}") print("[WARNING] Dockerfile linting issues found:") print(output) print("\n[TIP] Run 'hadolint --help' to learn about specific rules") - print(" You can ignore specific rules with: hadolint --ignore DL3000 Dockerfile") + print( + " You can ignore specific rules with: hadolint --ignore DL3000 Dockerfile" + ) return False, output else: logger.info("No Dockerfile linting issues found.") print("[SUCCESS] No Dockerfile linting issues found.") return True, None - + except subprocess.CalledProcessError as e: error_msg = f"Hadolint execution failed: {e}" logger.error(error_msg, exc_info=True) @@ -482,34 +614,34 @@ def scan_dockerfile(self) -> Tuple[bool, Optional[str]]: logger.error(error_msg) print(f"\n[ERROR] Error: {error_msg}") print("\nInstallation instructions:") - print(self._get_tool_installation_instructions('hadolint')) + print(self._get_tool_installation_instructions("hadolint")) return False, error_msg except Exception as e: error_msg = f"Unexpected error during Hadolint scan: {e}" logger.error(error_msg, exc_info=True) print(f"\n[ERROR] Error: {error_msg}") return False, str(e) - + def _filter_scan_results(self, scan_results: Dict) -> List[Dict]: """ Filter Trivy scan results to extract specific vulnerability data. - + Args: scan_results: The raw Trivy scan results - + Returns: List of filtered vulnerability data with key information """ filtered_vulnerabilities = [] - + for result in scan_results.get("Results", []): target = result.get("Target", "") - - for vulnerability in result.get('Vulnerabilities', []): + + for vulnerability in result.get("Vulnerabilities", []): description = vulnerability.get("Description", "") if description and len(description) > 150: description = description[:150] + "..." - + filtered_vulnerability = { "VulnerabilityID": vulnerability.get("VulnerabilityID"), "Target": target, @@ -520,78 +652,292 @@ def _filter_scan_results(self, scan_results: Dict) -> List[Dict]: "Description": description, "Status": vulnerability.get("Status"), "CVSS": vulnerability.get("CVSS", {}).get("nvd", {}).get("V3Score"), - "PrimaryURL": vulnerability.get("PrimaryURL") + "PrimaryURL": vulnerability.get("PrimaryURL"), } - + filtered_vulnerabilities.append(filtered_vulnerability) - + + return filtered_vulnerabilities + + def _parse_grype_output( + self, json_output: str, severity_filter: Optional[set] = None + ) -> List[Dict]: + """ + Normalize Grype JSON output to DockSec's internal vulnerability format. + + Args: + json_output: Raw JSON string from Grype + severity_filter: Set of uppercase severity levels to include (e.g. {"CRITICAL","HIGH"}). + If None, all severities are included. + + Returns: + List of normalized vulnerability dicts matching DockSec's internal schema. + """ + try: + data = json.loads(json_output) + except json.JSONDecodeError: + return [] + + filtered_vulnerabilities = [] + + for match in data.get("matches", []): + vuln = match.get("vulnerability", {}) + artifact = match.get("artifact", {}) + + # Grype uses title-case severity (e.g. "High") — normalize to upper + severity = vuln.get("severity", "UNKNOWN").upper() + + # Apply severity filter + if severity_filter and severity not in severity_filter: + continue + + raw_desc = vuln.get("description", "") + # Derive a concise title: use first sentence of description (≤100 chars) + if raw_desc: + first_sentence = raw_desc.split(".")[0].strip() + title = first_sentence[:100] + ( + "..." if len(first_sentence) > 100 else "" + ) + else: + title = vuln.get("id", "") + + description = raw_desc[:150] + "..." if len(raw_desc) > 150 else raw_desc + + # Extract CVSS v3 base score — prefer NVD v3, fall back to any v3 entry + cvss_score = None + for cvss_entry in vuln.get("cvss", []): + version = str(cvss_entry.get("version", "")) + if version.startswith("3"): + score = cvss_entry.get("metrics", {}).get("baseScore") + if score is not None: + cvss_score = score + break + + urls = vuln.get("urls", []) + primary_url = urls[0] if urls else None + + fix_state = vuln.get("fix", {}).get("state", "") + status = "fixed" if fix_state == "fixed" else "affected" + + # Use artifact locations for Target (mirrors Trivy's layer-level target) + locations = artifact.get("locations", []) + target = ( + locations[0].get("path", "") if locations else artifact.get("type", "") + ) + + filtered_vulnerabilities.append( + { + "VulnerabilityID": vuln.get("id"), + "Target": target, + "PkgName": artifact.get("name", ""), + "InstalledVersion": artifact.get("version", ""), + "Severity": severity, + "Title": title, + "Description": description, + "Status": status, + "CVSS": cvss_score, + "PrimaryURL": primary_url, + "sources": ["grype"], + } + ) + return filtered_vulnerabilities - - def scan_image_json(self, severity: str = "CRITICAL,HIGH") -> Tuple[bool, Optional[List[Dict]]]: + + def _deduplicate_vulnerabilities( + self, trivy_vulns: List[Dict], grype_vulns: List[Dict] + ) -> List[Dict]: + """ + Merge and deduplicate vulnerabilities from Trivy and Grype by CVE ID. + + CVEs found by both scanners are merged into a single entry with + ``sources`` listing both tools. CVEs found by only one scanner + keep their original data. + + Args: + trivy_vulns: Normalized vulnerabilities from Trivy + grype_vulns: Normalized vulnerabilities from Grype + + Returns: + Deduplicated list of vulnerability dicts with a ``sources`` field. + """ + # Tag Trivy vulns with their source + for v in trivy_vulns: + v.setdefault("sources", ["trivy"]) + + seen: Dict[str, Dict] = {} + + for v in trivy_vulns: + cve_id = v.get("VulnerabilityID") + if cve_id: + seen[cve_id] = v + + for v in grype_vulns: + cve_id = v.get("VulnerabilityID") + if not cve_id: + continue + if cve_id in seen: + existing_sources = seen[cve_id].get("sources", ["trivy"]) + if "grype" not in existing_sources: + seen[cve_id]["sources"] = existing_sources + ["grype"] + else: + seen[cve_id] = v + + return list(seen.values()) + + def scan_image_grype( + self, severity: str = "CRITICAL,HIGH" + ) -> Tuple[bool, Optional[List[Dict]]]: + """ + Scan Docker image using Grype and return structured results. + + Args: + severity: Comma-separated list of severity levels to include + + Returns: + Tuple of (success: bool, vulnerabilities: List[Dict] | None) + """ + from rich.progress import ( + BarColumn, + Progress, + SpinnerColumn, + TextColumn, + TimeElapsedColumn, + ) + + severity = self._validate_severity(severity) + severity_set = {s.strip().upper() for s in severity.split(",")} + logger.info(f"Starting Grype scan for image: {self.image_name}") + + try: + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TimeElapsedColumn(), + console=None, + ) as progress: + scan_task = progress.add_task( + f"[cyan]Scanning {self.image_name} with Grype...", + total=None, + ) + + result = subprocess.run( + [ + "grype", + self.image_name, + "-o", + "json", + ], + capture_output=True, + text=True, + encoding="utf-8", + timeout=600, + shell=False, + ) + + progress.update(scan_task, completed=True) + + if result.returncode != 0 and not result.stdout: + print(f"[ERROR] Grype scan failed: {result.stderr[:200]}") + return False, None + + if not result.stdout: + return True, [] + + filtered_results = self._parse_grype_output(result.stdout, severity_set) + self._print_compact_vulnerability_summary(filtered_results, label="[Grype]") + return True, filtered_results + + except subprocess.TimeoutExpired: + error_msg = "Grype scan timed out after 600 seconds" + logger.error(error_msg) + print(f"[ERROR] {error_msg}") + return False, None + except json.JSONDecodeError as e: + error_msg = f"Failed to parse Grype output: {e}" + logger.error(error_msg) + print(f"[ERROR] {error_msg}") + return False, None + except (subprocess.CalledProcessError, Exception) as e: + error_msg = f"Grype scan failed: {e}" + logger.error(error_msg, exc_info=True) + print(f"[ERROR] {error_msg}") + return False, None + + def scan_image_json( + self, severity: str = "CRITICAL,HIGH" + ) -> Tuple[bool, Optional[List[Dict]]]: """ Scan Docker image using Trivy and return the results as structured data (compact). - + Args: severity: Comma-separated list of severity levels to scan for - + Returns: Tuple containing: - bool: True if scan completed successfully, False otherwise - Optional[List[Dict]]: Filtered vulnerability data or None if scan failed """ - from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn - + from rich.progress import ( + BarColumn, + Progress, + SpinnerColumn, + TextColumn, + TimeElapsedColumn, + ) + # Validate severity input severity = self._validate_severity(severity) logger.info(f"Starting Trivy JSON scan for image: {self.image_name}") - + try: with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TimeElapsedColumn(), - console=None + console=None, ) as progress: scan_task = progress.add_task( - f"[cyan]Scanning {self.image_name}...", - total=None + f"[cyan]Scanning {self.image_name}...", total=None ) - + result = subprocess.run( [ - 'trivy', - 'image', - '-f', 'json', - '--severity', severity, - '--no-progress', - '--skip-version-check', - self.image_name + "trivy", + "image", + "-f", + "json", + "--severity", + severity, + "--no-progress", + "--skip-version-check", + self.image_name, ], capture_output=True, text=True, - encoding='utf-8', + encoding="utf-8", timeout=600, - shell=False + shell=False, ) - + progress.update(scan_task, completed=True) - - if result.stderr and 'error' in result.stderr.lower() and not result.stdout: + + if result.stderr and "error" in result.stderr.lower() and not result.stdout: print(f"[ERROR] Trivy scan failed: {result.stderr[:200]}") return False, None - + if not result.stdout: return True, [] response = json.loads(result.stdout) filtered_results = self._filter_scan_results(response) - + # Print compact summary - self._print_compact_vulnerability_summary(filtered_results) - + self._print_compact_vulnerability_summary(filtered_results, label="[Trivy]") + return True, filtered_results - + except subprocess.TimeoutExpired: error_msg = "Trivy scan timed out after 600 seconds" logger.error(error_msg) @@ -611,10 +957,10 @@ def scan_image_json(self, severity: str = "CRITICAL,HIGH") -> Tuple[bool, Option def scan_image(self, severity: str = "CRITICAL,HIGH") -> Tuple[bool, Optional[str]]: """ Scan Docker image using Trivy and return text output (compressed). - + Args: severity: Comma-separated list of severity levels to scan for - + Returns: Tuple containing: - bool: True if no vulnerabilities found, False otherwise @@ -622,30 +968,33 @@ def scan_image(self, severity: str = "CRITICAL,HIGH") -> Tuple[bool, Optional[st """ # Validate severity input severity = self._validate_severity(severity) - logger.info(f"Starting Trivy scan for image: {self.image_name} with severity: {severity}") - + logger.info( + f"Starting Trivy scan for image: {self.image_name} with severity: {severity}" + ) + try: result = subprocess.run( [ - 'trivy', - 'image', - '--severity', severity, - '--no-progress', - '--skip-version-check', - '--quiet', - self.image_name + "trivy", + "image", + "--severity", + severity, + "--no-progress", + "--skip-version-check", + "--quiet", + self.image_name, ], capture_output=True, text=True, - encoding='utf-8', + encoding="utf-8", timeout=600, - shell=False + shell=False, ) - + # In compact mode, we mostly rely on scan_image_json for output # This method is kept for backward compatibility and full text results return result.returncode == 0, result.stdout - + except subprocess.TimeoutExpired: print("[ERROR] Trivy scan timed out after 600 seconds") return False, "Scan timed out" @@ -656,156 +1005,195 @@ def scan_image(self, severity: str = "CRITICAL,HIGH") -> Tuple[bool, Optional[st def advanced_scan(self) -> Dict: """ Run advanced Docker Scout scan and show a concise summary. - + Returns: Dict containing scan results, or empty dict if scan failed """ - result_dict = { - 'success': False, - 'output': None, - 'error': None - } - + result_dict = {"success": False, "output": None, "error": None} + try: # Running Docker Scout quick scan result = subprocess.run( - ["docker", "scout", "quickview", self.image_name], - capture_output=True, text=True, check=True, timeout=300, shell=False + ["docker", "scout", "quickview", self.image_name], + capture_output=True, + text=True, + check=True, + timeout=300, + shell=False, ) - + # Parse and show concise summary output = result.stdout summary_lines = [] - for line in output.split('\n'): + for line in output.split("\n"): # Extract lines containing counts or recommendations - if any(x in line for x in ['Target', 'Base image', 'Updated base image', 'vulnerabilities']): + if any( + x in line + for x in [ + "Target", + "Base image", + "Updated base image", + "vulnerabilities", + ] + ): summary_lines.append(line.strip()) - + print(f" [ADVANCED] Docker Scout Summary for {self.image_name}:") if summary_lines: - for line in summary_lines[:5]: # Show top 5 summary lines + for line in summary_lines[:5]: # Show top 5 summary lines print(f" {line}") else: # Fallback to a very short version of output if parsing fails - print(f" {output.splitlines()[0] if output.splitlines() else 'Scan completed.'}") - - result_dict['success'] = True - result_dict['output'] = result.stdout + print( + f" {output.splitlines()[0] if output.splitlines() else 'Scan completed.'}" + ) + + result_dict["success"] = True + result_dict["output"] = result.stdout except subprocess.CalledProcessError as e: error_msg = e.stderr if e.stderr else str(e) logger.warning(f"Docker Scout failed: {error_msg}") - result_dict['error'] = error_msg + result_dict["error"] = error_msg except subprocess.TimeoutExpired: error_msg = "Docker Scout scan timed out" logger.warning(error_msg) - result_dict['error'] = error_msg + result_dict["error"] = error_msg except FileNotFoundError: # Silently fail if tool not found, as it's optional - result_dict['error'] = "Docker Scout not found" - + result_dict["error"] = "Docker Scout not found" + return result_dict + def run_full_scan(self, severity: str = "CRITICAL,HIGH") -> Dict: """ Run all security scans and return results. - + Args: severity: Comma-separated list of severity levels to scan for - + Returns: Dictionary containing scan results """ # Check cache first (only if image name is provided) if self.image_name and self.use_cache: - cached = self.cache.get(self.image_name) + cached = self.cache.get(self._cache_key) if cached: - print(f"[INFO] Using cached scan results for {self.image_name} (scanned at {cached.get('timestamp', 'N/A')})") - print("[TIP] To bypass cache, set environment variable DOCKSEC_USE_CACHE=false") - return cached.get('results', {}) - + print( + f"[INFO] Using cached scan results for {self.image_name} (scanned at {cached.get('timestamp', 'N/A')})" + ) + print( + "[TIP] To bypass cache, set environment variable DOCKSEC_USE_CACHE=false" + ) + return cached.get("results", {}) + # Validate severity input severity = self._validate_severity(severity) scan_status = True results = { - 'dockerfile_scan': { - 'success': False, - 'output': None + "dockerfile_scan": {"success": False, "output": None}, + "image_scan": { + "success": True, # Default to True if skipped + "output": "Skipped - No image provided", + "skipped": True, }, - 'image_scan': { - 'success': True, # Default to True if skipped - 'output': "Skipped - No image provided", - 'skipped': True - }, - 'json_data': [], - 'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - 'image_name': self.image_name or "N/A", - 'dockerfile_path': self.dockerfile_path + "json_data": [], + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "image_name": self.image_name or "N/A", + "dockerfile_path": self.dockerfile_path, } # Run Dockerfile scan if self.dockerfile_path: dockerfile_success, dockerfile_output = self.scan_dockerfile() - results['dockerfile_scan']['success'] = dockerfile_success - results['dockerfile_scan']['output'] = dockerfile_output + results["dockerfile_scan"]["success"] = dockerfile_success + results["dockerfile_scan"]["output"] = dockerfile_output if not dockerfile_success: scan_status = False else: - results['dockerfile_scan']['success'] = True - results['dockerfile_scan']['output'] = "Skipped - No Dockerfile provided" - results['dockerfile_scan']['skipped'] = True + results["dockerfile_scan"]["success"] = True + results["dockerfile_scan"]["output"] = "Skipped - No Dockerfile provided" + results["dockerfile_scan"]["skipped"] = True # Run image vulnerability scan (only if image name is provided) if self.image_name: - image_success, image_output = self.scan_image(severity) - results['image_scan']['success'] = image_success - results['image_scan']['output'] = image_output - results['image_scan']['skipped'] = False - if not image_success: - scan_status = False + scanner_mode = getattr(self, "scanner", "trivy") + trivy_data: List[Dict] = [] + grype_data: List[Dict] = [] - # Get JSON data - json_success, json_data = self.scan_image_json(severity) - if json_success: - results['json_data'] = json_data + if scanner_mode in ("trivy", "all"): + image_success, image_output = self.scan_image(severity) + results["image_scan"]["success"] = image_success + results["image_scan"]["output"] = image_output + results["image_scan"]["skipped"] = False + if not image_success: + scan_status = False + trivy_success, trivy_data = self.scan_image_json(severity) + trivy_data = trivy_data or [] + + if scanner_mode in ("grype", "all") and getattr( + self, "_grype_available", False + ): + grype_success, grype_data = self.scan_image_grype(severity) + grype_data = grype_data or [] + if not grype_success and scanner_mode == "grype": + scan_status = False + results["image_scan"]["skipped"] = False + + if scanner_mode == "trivy": + results["json_data"] = trivy_data + elif scanner_mode == "grype": + results["image_scan"]["skipped"] = False + results["json_data"] = grype_data + else: # "all" + results["json_data"] = self._deduplicate_vulnerabilities( + trivy_data, grype_data + ) # Cache results if self.use_cache: - self.cache.set(self.image_name, results) + self.cache.set(self._cache_key, results) # Print final summary target_name = self.image_name if self.image_name else self.dockerfile_path if scan_status: print(f"[SUCCESS] All security scans completed for {target_name}.") else: - print(f"[WARNING] Security scans completed for {target_name} with some issues.") + print( + f"[WARNING] Security scans completed for {target_name} with some issues." + ) return results def save_results_to_json(self, results: Dict) -> str: """ Save scan results to a JSON file. - + Args: results: The scan results to save - + Returns: Path to the saved JSON file """ # Sanitize image name for filename (avoid backslash in f-string expression) image_name_for_file = self.image_name or "docksec_report" - safe_image_name = re.sub(r'[:/.\-]', '_', image_name_for_file) - output_file = os.path.join(self.RESULTS_DIR, f"{safe_image_name}_scan_results.json") + safe_image_name = re.sub(r"[:/.\-]", "_", image_name_for_file) + output_file = os.path.join( + self.RESULTS_DIR, f"{safe_image_name}_scan_results.json" + ) - json_results = results.get('json_data', []) + json_results = results.get("json_data", []) vulnerabilities = { "scan_info": { "image": self.image_name or "N/A", "dockerfile": self.dockerfile_path or "N/A", - "scan_time": results.get('timestamp', datetime.now().strftime("%Y-%m-%d %H:%M:%S")), - "analysis score": self.analysis_score + "scan_time": results.get( + "timestamp", datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ), + "analysis score": self.analysis_score, }, - "vulnerabilities": json_results + "vulnerabilities": json_results, } - + try: with open(output_file, "w") as f: json.dump(vulnerabilities, f, indent=4) @@ -817,277 +1205,382 @@ def save_results_to_json(self, results: Dict) -> str: def save_results_to_csv(self, results: Dict) -> str: """ Save vulnerability scan results to a CSV file. - + Args: results: The scan results to save - + Returns: Path to the saved CSV file """ # Sanitize image name for filename image_name_for_file = self.image_name or "docksec_report" - safe_image_name = re.sub(r'[:/.\-]', '_', image_name_for_file) - output_file = os.path.join(self.RESULTS_DIR, f"{safe_image_name}_vulnerabilities.csv") - - vulnerabilities = results.get('json_data', []) + safe_image_name = re.sub(r"[:/.\-]", "_", image_name_for_file) + output_file = os.path.join( + self.RESULTS_DIR, f"{safe_image_name}_vulnerabilities.csv" + ) + + vulnerabilities = results.get("json_data", []) if not vulnerabilities: # Create an empty CSV with headers if no vulnerabilities found - logger.info("No vulnerability data to save to CSV, creating header-only file") - + logger.info( + "No vulnerability data to save to CSV, creating header-only file" + ) + try: # Define CSV columns fieldnames = [ - "VulnerabilityID", "Severity", "PkgName", "InstalledVersion", - "Title", "Description", "CVSS", "Status", "Target", "PrimaryURL" + "VulnerabilityID", + "Severity", + "PkgName", + "InstalledVersion", + "Title", + "Description", + "CVSS", + "Status", + "Target", + "PrimaryURL", + "sources", ] - - with open(output_file, 'w', newline='') as csvfile: + + with open(output_file, "w", newline="") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() - + for vuln in vulnerabilities: - # Only write the fields we care about - filtered_vuln = {k: vuln.get(k, "") for k in fieldnames} + filtered_vuln = {} + for k in fieldnames: + v = vuln.get(k, "") + # Serialize list fields (e.g. sources) to comma-separated string + if isinstance(v, list): + v = ",".join(str(x) for x in v) + filtered_vuln[k] = v writer.writerow(filtered_vuln) - + return output_file except Exception as e: logger.error(f"Error saving results to CSV file: {e}") return "" - + def save_results_to_pdf(self, results: Dict) -> str: """ Save scan results to a PDF file with formatting. Handles both full scans and image-only scans. - + Args: results: The scan results to save - + Returns: Path to the saved PDF file """ # Sanitize image name for filename image_name_for_file = self.image_name or "docksec_report" - safe_image_name = re.sub(r'[:/.\-]', '_', image_name_for_file) - output_file = os.path.join(self.RESULTS_DIR, f"{safe_image_name}_security_report.pdf") - + safe_image_name = re.sub(r"[:/.\-]", "_", image_name_for_file) + output_file = os.path.join( + self.RESULTS_DIR, f"{safe_image_name}_security_report.pdf" + ) + try: # Create custom PDF class with text wrapping capability from fpdf.enums import XPos, YPos + class PDF(FPDF): def __init__(self): super().__init__() self.set_auto_page_break(True, margin=15) - + def multi_cell_with_title(self, title, content, title_w=40): """Create a title-content pair with the content potentially spanning multiple lines""" - self.set_font('helvetica', 'B', 10) + self.set_font("helvetica", "B", 10) x_start = self.get_x() y_start = self.get_y() self.cell(title_w, 7, title) - self.set_font('helvetica', '', 10) - + self.set_font("helvetica", "", 10) + # Calculate available width for content to avoid horizontal space errors available_w = self.w - self.l_margin - self.r_margin - title_w if available_w < 20: # Minimum fallback width available_w = 20 - + self.set_xy(x_start + title_w, y_start) - self.multi_cell(available_w, 7, str(content), new_x=XPos.LMARGIN, new_y=YPos.NEXT) + self.multi_cell( + available_w, + 7, + str(content), + new_x=XPos.LMARGIN, + new_y=YPos.NEXT, + ) self.ln(2) - + def add_section_header(self, title): """Add a section header""" - self.set_font('helvetica', 'B', 12) + self.set_font("helvetica", "B", 12) self.cell(0, 10, title, new_x=XPos.LMARGIN, new_y=YPos.NEXT) self.ln(2) - + # Create PDF instance pdf = PDF() pdf.add_page() - + # Add title - pdf.set_font('helvetica', 'B', 16) - scan_mode = results.get('scan_mode', 'full') - title = f'Docker Security Scan Report ({scan_mode.upper()})' - pdf.cell(0, 10, title, align='C', new_x=XPos.LMARGIN, new_y=YPos.NEXT) + pdf.set_font("helvetica", "B", 16) + scan_mode = results.get("scan_mode", "full") + title = f"Docker Security Scan Report ({scan_mode.upper()})" + pdf.cell(0, 10, title, align="C", new_x=XPos.LMARGIN, new_y=YPos.NEXT) pdf.ln(5) - + # Add scan information section - pdf.add_section_header('Scan Information') - pdf.multi_cell_with_title('Image:', self.image_name or "N/A") - pdf.multi_cell_with_title('Scan Mode:', scan_mode.replace('_', ' ').title()) - pdf.multi_cell_with_title('Dockerfile:', results.get('dockerfile_path', 'N/A')) - pdf.multi_cell_with_title('Scan Date:', results.get('timestamp', '')) - pdf.multi_cell_with_title('Analysis Score:', str(self.analysis_score)) + pdf.add_section_header("Scan Information") + pdf.multi_cell_with_title("Image:", self.image_name or "N/A") + pdf.multi_cell_with_title("Scan Mode:", scan_mode.replace("_", " ").title()) + pdf.multi_cell_with_title( + "Dockerfile:", results.get("dockerfile_path", "N/A") + ) + pdf.multi_cell_with_title("Scan Date:", results.get("timestamp", "")) + pdf.multi_cell_with_title("Analysis Score:", str(self.analysis_score)) pdf.ln(5) - + # Add image information if available (for extended scans) - if 'image_info' in results: - pdf.add_section_header('Image Information') - image_info = results['image_info'] - - if image_info.get('size'): - size_mb = round(image_info['size'] / (1024*1024), 2) - pdf.multi_cell_with_title('Size:', f"{size_mb} MB") - - if image_info.get('created'): - pdf.multi_cell_with_title('Created:', image_info['created'][:19]) # Truncate timestamp - - if image_info.get('architecture'): - pdf.multi_cell_with_title('Architecture:', image_info['architecture']) - - if image_info.get('os'): - pdf.multi_cell_with_title('OS:', image_info['os']) - + if "image_info" in results: + pdf.add_section_header("Image Information") + image_info = results["image_info"] + + if image_info.get("size"): + size_mb = round(image_info["size"] / (1024 * 1024), 2) + pdf.multi_cell_with_title("Size:", f"{size_mb} MB") + + if image_info.get("created"): + pdf.multi_cell_with_title( + "Created:", image_info["created"][:19] + ) # Truncate timestamp + + if image_info.get("architecture"): + pdf.multi_cell_with_title( + "Architecture:", image_info["architecture"] + ) + + if image_info.get("os"): + pdf.multi_cell_with_title("OS:", image_info["os"]) + pdf.ln(5) - + # Add configuration analysis if available - if 'config_analysis' in results: - pdf.add_section_header('Configuration Analysis') - config_analysis = results['config_analysis'] - + if "config_analysis" in results: + pdf.add_section_header("Configuration Analysis") + config_analysis = results["config_analysis"] + # Count issues - high_count = len(config_analysis.get('high_risk', [])) - medium_count = len(config_analysis.get('medium_risk', [])) - low_count = len(config_analysis.get('low_risk', [])) + high_count = len(config_analysis.get("high_risk", [])) + medium_count = len(config_analysis.get("medium_risk", [])) + low_count = len(config_analysis.get("low_risk", [])) total_count = high_count + medium_count + low_count - - pdf.multi_cell_with_title('Total Issues:', str(total_count)) + + pdf.multi_cell_with_title("Total Issues:", str(total_count)) if high_count > 0: - pdf.multi_cell_with_title('High Risk:', str(high_count)) + pdf.multi_cell_with_title("High Risk:", str(high_count)) if medium_count > 0: - pdf.multi_cell_with_title('Medium Risk:', str(medium_count)) + pdf.multi_cell_with_title("Medium Risk:", str(medium_count)) if low_count > 0: - pdf.multi_cell_with_title('Low Risk:', str(low_count)) - + pdf.multi_cell_with_title("Low Risk:", str(low_count)) + # Add issue details if high_count > 0: pdf.ln(3) - pdf.set_font('helvetica', 'B', 10) - pdf.cell(0, 7, 'High-Risk Issues:', new_x=XPos.LMARGIN, new_y=YPos.NEXT) - pdf.set_font('helvetica', '', 9) - for issue in config_analysis['high_risk']: - pdf.multi_cell(0, 5, f"• {issue}", new_x=XPos.LMARGIN, new_y=YPos.NEXT) - + pdf.set_font("helvetica", "B", 10) + pdf.cell( + 0, 7, "High-Risk Issues:", new_x=XPos.LMARGIN, new_y=YPos.NEXT + ) + pdf.set_font("helvetica", "", 9) + for issue in config_analysis["high_risk"]: + pdf.multi_cell( + 0, 5, f"• {issue}", new_x=XPos.LMARGIN, new_y=YPos.NEXT + ) + if medium_count > 0: pdf.ln(3) - pdf.set_font('helvetica', 'B', 10) - pdf.cell(0, 7, 'Medium-Risk Issues:', new_x=XPos.LMARGIN, new_y=YPos.NEXT) - pdf.set_font('helvetica', '', 9) - for issue in config_analysis['medium_risk']: - pdf.multi_cell(0, 5, f"• {issue}", new_x=XPos.LMARGIN, new_y=YPos.NEXT) - + pdf.set_font("helvetica", "B", 10) + pdf.cell( + 0, 7, "Medium-Risk Issues:", new_x=XPos.LMARGIN, new_y=YPos.NEXT + ) + pdf.set_font("helvetica", "", 9) + for issue in config_analysis["medium_risk"]: + pdf.multi_cell( + 0, 5, f"• {issue}", new_x=XPos.LMARGIN, new_y=YPos.NEXT + ) + if low_count > 0: pdf.ln(3) - pdf.set_font('helvetica', 'B', 10) - pdf.cell(0, 7, 'Low-Risk Issues:', new_x=XPos.LMARGIN, new_y=YPos.NEXT) - pdf.set_font('helvetica', '', 9) - for issue in config_analysis['low_risk']: - pdf.multi_cell(0, 5, f"• {issue}", new_x=XPos.LMARGIN, new_y=YPos.NEXT) - + pdf.set_font("helvetica", "B", 10) + pdf.cell( + 0, 7, "Low-Risk Issues:", new_x=XPos.LMARGIN, new_y=YPos.NEXT + ) + pdf.set_font("helvetica", "", 9) + for issue in config_analysis["low_risk"]: + pdf.multi_cell( + 0, 5, f"• {issue}", new_x=XPos.LMARGIN, new_y=YPos.NEXT + ) + pdf.ln(5) - + # Add Dockerfile scan results (only if not skipped) - if not results['dockerfile_scan'].get('skipped', False): - pdf.add_section_header('Dockerfile Scan Results') - - if results['dockerfile_scan']['success']: - pdf.set_font('helvetica', '', 10) - pdf.cell(0, 7, 'No Dockerfile linting issues found.', new_x=XPos.LMARGIN, new_y=YPos.NEXT) + if not results["dockerfile_scan"].get("skipped", False): + pdf.add_section_header("Dockerfile Scan Results") + + if results["dockerfile_scan"]["success"]: + pdf.set_font("helvetica", "", 10) + pdf.cell( + 0, + 7, + "No Dockerfile linting issues found.", + new_x=XPos.LMARGIN, + new_y=YPos.NEXT, + ) else: - pdf.set_font('helvetica', '', 10) - pdf.cell(0, 7, 'Dockerfile linting issues:', new_x=XPos.LMARGIN, new_y=YPos.NEXT) + pdf.set_font("helvetica", "", 10) + pdf.cell( + 0, + 7, + "Dockerfile linting issues:", + new_x=XPos.LMARGIN, + new_y=YPos.NEXT, + ) pdf.ln(2) - pdf.set_font('courier', '', 8) - - if results['dockerfile_scan']['output']: - for line in results['dockerfile_scan']['output'].split('\n')[:20]: # Limit lines - pdf.multi_cell(0, 5, line, new_x=XPos.LMARGIN, new_y=YPos.NEXT) - + pdf.set_font("courier", "", 8) + + if results["dockerfile_scan"]["output"]: + for line in results["dockerfile_scan"]["output"].split("\n")[ + :20 + ]: # Limit lines + pdf.multi_cell( + 0, 5, line, new_x=XPos.LMARGIN, new_y=YPos.NEXT + ) + pdf.ln(5) - + # Add vulnerability scan summary (rest remains the same) - pdf.add_section_header('Vulnerability Scan Summary') - vulnerabilities = results.get('json_data', []) - + pdf.add_section_header("Vulnerability Scan Summary") + vulnerabilities = results.get("json_data", []) + if not vulnerabilities: - pdf.set_font('helvetica', '', 10) - pdf.cell(0, 7, 'No vulnerabilities found.', new_x=XPos.LMARGIN, new_y=YPos.NEXT) + pdf.set_font("helvetica", "", 10) + pdf.cell( + 0, + 7, + "No vulnerabilities found.", + new_x=XPos.LMARGIN, + new_y=YPos.NEXT, + ) else: # Count vulnerabilities by severity severity_counts: Dict[str, int] = {} for vuln in vulnerabilities: - severity = vuln.get('Severity', Severity.UNKNOWN) + severity = vuln.get("Severity", Severity.UNKNOWN) severity_counts[severity] = severity_counts.get(severity, 0) + 1 - - pdf.set_font('helvetica', '', 10) - pdf.cell(0, 7, f'Total vulnerabilities: {len(vulnerabilities)}', new_x=XPos.LMARGIN, new_y=YPos.NEXT) - + + pdf.set_font("helvetica", "", 10) + pdf.cell( + 0, + 7, + f"Total vulnerabilities: {len(vulnerabilities)}", + new_x=XPos.LMARGIN, + new_y=YPos.NEXT, + ) + for severity, count in severity_counts.items(): - pdf.cell(0, 7, f'{severity}: {count}', new_x=XPos.LMARGIN, new_y=YPos.NEXT) - + pdf.cell( + 0, + 7, + f"{severity}: {count}", + new_x=XPos.LMARGIN, + new_y=YPos.NEXT, + ) + pdf.ln(5) - + # Add limited vulnerability details table if len(vulnerabilities) > 0: - pdf.add_section_header('Top Vulnerabilities') - + pdf.add_section_header("Top Vulnerabilities") + # Show top 20 vulnerabilities for i, vuln in enumerate(vulnerabilities[:20]): if pdf.get_y() > pdf.h - 40: # Check if near bottom pdf.add_page() - - pdf.set_font('helvetica', 'B', 9) - pdf.cell(0, 6, f"{i+1}. {vuln.get('VulnerabilityID', 'N/A')} ({vuln.get('Severity', 'N/A')})", new_x=XPos.LMARGIN, new_y=YPos.NEXT) - - pdf.set_font('helvetica', '', 8) - pdf.multi_cell(0, 4, f"Package: {vuln.get('PkgName', 'N/A')} ({vuln.get('InstalledVersion', 'N/A')})", new_x=XPos.LMARGIN, new_y=YPos.NEXT) - - title = vuln.get('Title', '') + + pdf.set_font("helvetica", "B", 9) + pdf.cell( + 0, + 6, + f"{i+1}. {vuln.get('VulnerabilityID', 'N/A')} ({vuln.get('Severity', 'N/A')})", + new_x=XPos.LMARGIN, + new_y=YPos.NEXT, + ) + + pdf.set_font("helvetica", "", 8) + pdf.multi_cell( + 0, + 4, + f"Package: {vuln.get('PkgName', 'N/A')} ({vuln.get('InstalledVersion', 'N/A')})", + new_x=XPos.LMARGIN, + new_y=YPos.NEXT, + ) + + title = vuln.get("Title", "") if title: - pdf.multi_cell(0, 4, f"Title: {title[:100]}{'...' if len(title) > 100 else ''}", new_x=XPos.LMARGIN, new_y=YPos.NEXT) - + pdf.multi_cell( + 0, + 4, + f"Title: {title[:100]}{'...' if len(title) > 100 else ''}", + new_x=XPos.LMARGIN, + new_y=YPos.NEXT, + ) + pdf.ln(2) - + if len(vulnerabilities) > 20: pdf.ln(3) - pdf.set_font('helvetica', 'I', 9) - pdf.cell(0, 5, f'Showing 20 of {len(vulnerabilities)} vulnerabilities. See CSV/JSON for complete list.', new_x=XPos.LMARGIN, new_y=YPos.NEXT) - + pdf.set_font("helvetica", "I", 9) + pdf.cell( + 0, + 5, + f"Showing 20 of {len(vulnerabilities)} vulnerabilities. See CSV/JSON for complete list.", + new_x=XPos.LMARGIN, + new_y=YPos.NEXT, + ) + # Save the PDF pdf.output(output_file) return output_file - + except Exception as e: logger.error(f"Error saving results to PDF file: {e}") return "" - + def generate_all_reports(self, results: Dict) -> Dict: """ Generate all report formats (JSON, CSV, PDF, HTML) from scan results. - + Args: results: The scan results to save - + Returns: Dictionary with paths to the generated reports """ from docksec.report_generator import ReportGenerator - + # Calculate security score if not already set if self.analysis_score is None: self.analysis_score = self.get_security_score(results) - + # Initialize report generator - generator = ReportGenerator(self.image_name or "docksec_report", self.RESULTS_DIR) + generator = ReportGenerator( + self.image_name or "docksec_report", self.RESULTS_DIR + ) generator.set_analysis_score(self.analysis_score) - + # Generate all reports using the dedicated generator report_paths = generator.generate_all_reports(results) - + return report_paths - + def _calculate_local_score(self, results: Dict) -> float: """ Calculate a security score locally without any LLM call. @@ -1096,9 +1589,10 @@ def _calculate_local_score(self, results: Dict) -> float: Weights: vulnerabilities 50%, dockerfile quality 30%, configuration 20%. """ from docksec.score_calculator import SecurityScoreCalculator + calculator = SecurityScoreCalculator(skip_llm=True) breakdown = calculator.get_score_breakdown(results) - score = breakdown['overall'] + score = breakdown["overall"] print(f"Security Score: {score}/100") if score >= 90: @@ -1118,7 +1612,7 @@ def get_security_score(self, results: Dict) -> float: Uses LLM-based scoring when available. Falls back to local static scoring when scan_only=True or if the LLM call fails (e.g., quota exceeded). - + Optimizes token usage by sending summarized vulnerability data to LLM. Args: @@ -1132,11 +1626,11 @@ def get_security_score(self, results: Dict) -> float: try: from docksec.config import summarize_vulnerabilities - + # Create summarized vulnerability data instead of sending full results - vulnerabilities = results.get('json_data', []) + vulnerabilities = results.get("json_data", []) vuln_summary = summarize_vulnerabilities(vulnerabilities, max_count=20) - + # Send only summary, not full results dict score = self.score_chain.invoke({"results": vuln_summary}) print(f"Security Score: {score.score}") @@ -1145,39 +1639,41 @@ def get_security_score(self, results: Dict) -> float: logger.warning(f"AI scoring failed: {e}. Falling back to local scoring.") print(f"AI scoring unavailable: {e}. Falling back to local scoring.") return self._calculate_local_score(results) - + def save_results_to_html(self, results: Dict) -> str: """ Save scan results to an HTML file using a template. - + Args: results: The scan results to save - + Returns: Path to the saved HTML file """ # Sanitize image name for filename image_name_for_file = self.image_name or "docksec_report" - safe_image_name = re.sub(r'[:/.\-]', '_', image_name_for_file) - output_file = os.path.join(self.RESULTS_DIR, f"{safe_image_name}_security_report.html") + safe_image_name = re.sub(r"[:/.\-]", "_", image_name_for_file) + output_file = os.path.join( + self.RESULTS_DIR, f"{safe_image_name}_security_report.html" + ) try: from docksec.config import get_html_template - + # Prepare template variables template_vars = self._prepare_html_template_vars(results) - + # Replace placeholders in template html_content = get_html_template() for key, value in template_vars.items(): - html_content = html_content.replace(f'{{{{{key}}}}}', str(value)) - + html_content = html_content.replace(f"{{{{{key}}}}}", str(value)) + # Save the HTML file - with open(output_file, 'w', encoding='utf-8') as f: + with open(output_file, "w", encoding="utf-8") as f: f.write(html_content) - + return output_file - + except Exception as e: logger.error(f"Error saving results to HTML file: {e}") return "" @@ -1185,29 +1681,31 @@ def save_results_to_html(self, results: Dict) -> str: def _prepare_html_template_vars(self, results: Dict) -> Dict[str, str]: """ Prepare variables for HTML template replacement. - + Args: results: The scan results - + Returns: Dictionary of template variables """ - vulnerabilities = results.get('json_data', []) - scan_mode = results.get('scan_mode', 'full') - + vulnerabilities = results.get("json_data", []) + scan_mode = results.get("scan_mode", "full") + # Base template variables template_vars = { - 'IMAGE_NAME': self.image_name or "N/A", - 'SCAN_MODE': scan_mode.replace('_', ' ').title(), - 'SCAN_MODE_TITLE': f"{scan_mode.replace('_', ' ').title()} Scan", - 'DOCKERFILE_PATH': results.get('dockerfile_path', 'N/A'), - 'SCAN_DATE': results.get('timestamp', ''), - 'ANALYSIS_SCORE': self.analysis_score + "IMAGE_NAME": self.image_name or "N/A", + "SCAN_MODE": scan_mode.replace("_", " ").title(), + "SCAN_MODE_TITLE": f"{scan_mode.replace('_', ' ').title()} Scan", + "DOCKERFILE_PATH": results.get("dockerfile_path", "N/A"), + "SCAN_DATE": results.get("timestamp", ""), + "ANALYSIS_SCORE": self.analysis_score, } - + # Security Score Section - if 'security_score' in results: - template_vars['SECURITY_SCORE_SECTION'] = f""" + if "security_score" in results: + template_vars[ + "SECURITY_SCORE_SECTION" + ] = f"""
{self._escape_html(dockerfile_output[:2000])}'
if len(dockerfile_output) > 2000:
- dockerfile_content += 'Output truncated for display...
' - - template_vars['DOCKERFILE_SECTION'] = f""" + dockerfile_content += ( + "Output truncated for display...
" + ) + + template_vars[ + "DOCKERFILE_SECTION" + ] = f"""Total vulnerabilities: {len(vulnerabilities)}
""" - - template_vars['VULNERABILITY_SUMMARY'] = severity_html - + + template_vars["VULNERABILITY_SUMMARY"] = severity_html + # Detailed vulnerabilities table if vulnerabilities: table_html = """ @@ -1356,19 +1868,29 @@ def _prepare_html_template_vars(self, results: Dict) -> Dict[str, str]: """ - + # Show top 50 vulnerabilities to avoid overly large HTML files for vuln in vulnerabilities[:50]: - severity = vuln.get('Severity', 'UNKNOWN').lower() - severity_class = f'badge-{severity}' if severity in ['critical', 'high', 'medium', 'low'] else 'badge-low' - - status = vuln.get('Status', 'affected') - status_class = 'status-fixed' if status == 'fixed' else 'status-affected' - - cvss_score = vuln.get('CVSS', 'N/A') - if cvss_score and cvss_score != 'N/A': - cvss_score = f"{cvss_score:.1f}" if isinstance(cvss_score, (int, float)) else str(cvss_score) - + severity = vuln.get("Severity", "UNKNOWN").lower() + severity_class = ( + f"badge-{severity}" + if severity in ["critical", "high", "medium", "low"] + else "badge-low" + ) + + status = vuln.get("Status", "affected") + status_class = ( + "status-fixed" if status == "fixed" else "status-affected" + ) + + cvss_score = vuln.get("CVSS", "N/A") + if cvss_score and cvss_score != "N/A": + cvss_score = ( + f"{cvss_score:.1f}" + if isinstance(cvss_score, (int, float)) + else str(cvss_score) + ) + table_html += f"""Showing 50 of {len(vulnerabilities)} vulnerabilities. See CSV/JSON for complete list.
' - - table_html += '' - template_vars['DETAILED_VULNERABILITIES_SECTION'] = table_html + + table_html += "" + template_vars["DETAILED_VULNERABILITIES_SECTION"] = table_html else: - template_vars['DETAILED_VULNERABILITIES_SECTION'] = "" - + template_vars["DETAILED_VULNERABILITIES_SECTION"] = "" + return template_vars def _escape_html(self, text: str) -> str: """ Escape HTML special characters in text. - + Uses Python's built-in html.escape() for complete HTML5 entity handling, replacing the previous hand-rolled table. - + Args: text: Text to escape - + Returns: HTML-escaped text """ import html + if not text: return "" return html.escape(str(text), quote=True) + def main(): """Main function to run the security scanner.""" if len(sys.argv) < 3: - print("Usage: python docker_scanner.pyOutput truncated for display...
" ) - template_vars["DOCKERFILE_SECTION"] = f""" + template_vars[ + "DOCKERFILE_SECTION" + ] = f"""+ Scanners used: {self._escape_html(scanners_display)} +
+ {grid_html} +