diff --git a/cli.py b/cli.py index d11576b..5b17063 100755 --- a/cli.py +++ b/cli.py @@ -5,6 +5,7 @@ import json import logging import requests +import time try: from colorama import Fore, Style, init except ImportError: @@ -21,8 +22,16 @@ def init(*args, **kwargs): pass __version__ = "1.0.6" -# Setup basic logging -logging.basicConfig(level=logging.ERROR, format='%(levelname)s: %(message)s') +# Setup logging with timestamps +logging.basicConfig( + level=logging.INFO, + format='[%(asctime)s] %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + stream=sys.stdout, + force=True +) + +logger = logging.getLogger(__name__) def send_slack_notification(message: str) -> None: """Send a Slack notification via webhook URL from environment variable.""" @@ -32,9 +41,9 @@ def send_slack_notification(message: str) -> None: try: response = requests.post(webhook_url, json={'text': message}, timeout=5) if response.status_code >= 400: - print(f"Slack notification failed: {response.status_code} - {response.text}", file=sys.stderr) + logger.error(f"Slack notification failed: {response.status_code} - {response.text}") except Exception as e: - print(f"Slack notification error: {e}", file=sys.stderr) + logger.error(f"Slack notification error: {e}") def build_gh_actions_context() -> dict: """Extract GitHub Actions context from environment variables.""" @@ -57,39 +66,39 @@ def setup_args(): parser = argparse.ArgumentParser( description="InfraScan CLI - Open Source IaC Cost & Security Scanner" ) - + parser.add_argument( "path", nargs="?", default="/scan", help="Path to the directory to scan (default: /scan when using Docker, or '.' for local use)" ) - + parser.add_argument( "--scanner", default="comprehensive", help="Scanner type(s) to run (default: comprehensive). Support multiple scanners separated by comma (e.g., 'regex,containers'). Options: regex, checkov, containers, comprehensive" ) - + parser.add_argument( "--format", choices=["text", "json", "html"], default="text", help="Output format (default: text)" ) - + parser.add_argument( "--out", help="File path to save JSON output explicitly (e.g., infrascan-report.json)" ) - + parser.add_argument( "--fail-on", choices=["any", "high_critical", "grade_a", "grade_b", "grade_c", "grade_d", "grade_f", "priority_critical", "priority_high", "priority_medium", "priority_low", "priority_info"], help="Exit with error code 1 if findings match criteria (any findings, high/critical findings, grade threshold, or priority threshold)" ) - + parser.add_argument( "--download-external-modules", action="store_true", @@ -99,7 +108,7 @@ def setup_args(): parser.add_argument( "--framework", default="auto", - choices=["auto", "terraform", "kubernetes", "cloudformation", "helm"], + choices=["auto", "terraform", "kubernetes", "cloudformation", "helm", "all"], help="IaC framework type (default: auto-detect)" ) @@ -109,39 +118,39 @@ def setup_args(): dest="include", help="Select specific files or directories to scan. Can be used multiple times." ) - + parser.add_argument( "--version", action="version", version=f"InfraScan v{__version__}", help="Show version information and exit" ) - + return parser.parse_args() def print_text_report(report_dict, resource_count, scanner_type): # Initialize colorama init(autoreset=True) - + overall = report_dict.get('overall', {}) findings_dict = report_dict.get('findings', {}) results = findings_dict.get('all', report_dict.get('results', [])) - + # Header print(f"\n{Fore.CYAN}{Style.BRIGHT}{'=' * 60}") print(f"{Fore.CYAN}{Style.BRIGHT} InfraScan Report - {scanner_type.upper()} SCAN") print(f"{Fore.CYAN}{Style.BRIGHT}{'=' * 60}") - + # Summary Info target_path = os.path.abspath(sys.argv[1] if len(sys.argv) > 1 and not sys.argv[1].startswith('--') else '.') print(f"{Style.BRIGHT}Path Scanned :{Style.RESET_ALL} {target_path}") print(f"{Style.BRIGHT}Resources Found :{Style.RESET_ALL} {resource_count}") print(f"{Style.BRIGHT}Total Findings :{Style.RESET_ALL} {len(results)}") - + # Grades Section print(f"\n{Style.BRIGHT}GRADING SUMMARY:") print(f"{'-' * 30}") - + def get_grade_color(letter): if letter == 'A': return Fore.GREEN if letter == 'B': return Fore.GREEN @@ -152,11 +161,11 @@ def get_grade_color(letter): def print_grade_line(name, grade): if not grade or (grade.get('max_score', 0) == 0 and grade.get('letter') != 'A'): return - + letter = grade.get('letter', '?') percentage = grade.get('percentage', 0) color = get_grade_color(letter) - + breakdown = grade.get('severity_breakdown', {}) counts = [ f"{Fore.RED}Crit:{breakdown.get('critical', 0)}{Style.RESET_ALL}", @@ -168,13 +177,13 @@ def print_grade_line(name, grade): print(f"{name:18}: {color}{Style.BRIGHT}{letter}{Style.RESET_ALL} ({percentage}%){br_str}") print_grade_line("Overall Health", overall) - + if scanner_type in ['regex', 'comprehensive']: print_grade_line("Cost Efficiency", report_dict.get('cost')) - + if scanner_type in ['checkov', 'comprehensive']: print_grade_line("IaC Security", report_dict.get('security')) - + if scanner_type in ['containers', 'comprehensive']: print_grade_line("Container Security", report_dict.get('container')) @@ -184,12 +193,12 @@ def print_grade_line(name, grade): print(f"\n{Fore.GREEN}{Style.BRIGHT}RECOMMENDATIONS:") for rec in recs: print(f" {Fore.GREEN}• {Style.BRIGHT}{rec}") - + # Findings Details if results: print(f"\n{Style.BRIGHT}FINDINGS DETAILS:") print(f"{'=' * 60}") - + # Categorize findings categories = [] if findings_dict.get('cost'): @@ -198,111 +207,143 @@ def print_grade_line(name, grade): categories.append(('IaC Security', findings_dict['security'])) if findings_dict.get('container'): categories.append(('Container Security', findings_dict['container'])) - + if not categories: categories = [('General Findings', results)] for cat_name, cat_findings in categories: if not cat_findings: continue - + print(f"\n{Style.BRIGHT}>>> {cat_name} ({len(cat_findings)})") - + # Limit display to 40 findings to avoid overwhelming CI logs display_limit = 40 for i, res in enumerate(cat_findings): if i >= display_limit: print(f"\n {Fore.YELLOW}... and {len(cat_findings) - display_limit} more findings (see full report for details)") break - + severity = res.get('severity', 'UNKNOWN').upper() sev_color = Fore.WHITE if severity == 'CRITICAL': sev_color = Fore.RED + Style.BRIGHT elif severity == 'HIGH': sev_color = Fore.RED elif severity == 'MEDIUM': sev_color = Fore.YELLOW elif severity == 'LOW': sev_color = Fore.CYAN - + rule_id = res.get('rule_id', 'N/A') file_path = res.get('file', 'Unknown') line_str = f":{res.get('line')}" if res.get('line') else "" - + print(f" {sev_color}[{severity}]{Style.RESET_ALL} {Style.BRIGHT}{rule_id}{Style.RESET_ALL}: {res.get('description', '')}") print(f" {Fore.WHITE}at {file_path}{line_str}{Style.RESET_ALL}") if res.get('resource'): print(f" {Fore.WHITE}resource: {res.get('resource')}{Style.RESET_ALL}") - + print(f"\n{Fore.CYAN}{Style.BRIGHT}{'=' * 60}\n") def should_fail(args, report_dict, results): if not args.fail_on: return False - + if args.fail_on == 'any' and len(results) > 0: - print("\n[ERROR] Build failed: Findings detected and --fail-on=any specified.", file=sys.stderr) + logger.error( + "Build failed: Findings detected and --fail-on=any specified." + ) return True - + if args.fail_on == 'high_critical': critical_high_count = sum(1 for r in results if r.get('severity', '').lower() in ['critical', 'high']) if critical_high_count > 0: - print(f"\n[ERROR] Build failed: {critical_high_count} high/critical findings detected and --fail-on=high_critical specified.", file=sys.stderr) + logger.error( + f"Build failed: {critical_high_count} high/critical findings " + f"detected and --fail-on=high_critical specified." + ) return True - + if args.fail_on.startswith('grade_'): grade_order = ['A', 'B', 'C', 'D', 'F'] fail_grade = args.fail_on.split('_')[1].upper() overall_letter = report_dict.get('overall', {}).get('letter', 'A') - + try: fail_idx = grade_order.index(fail_grade) current_idx = grade_order.index(overall_letter) - + if current_idx >= fail_idx: - print(f"\n[ERROR] Build failed: Overall grade is {overall_letter} and --fail-on={args.fail_on} specified (threshold: {fail_grade} or worse).", file=sys.stderr) + logger.error( + f"Build failed: Overall grade is {overall_letter} " + f"and --fail-on={args.fail_on} specified " + f"(threshold: {fail_grade} or worse)." + ) return True except ValueError: pass # Should not happen due to argparse choices - + if args.fail_on.startswith('priority_'): severity_weights = {'critical': 4, 'high': 3, 'medium': 2, 'low': 1, 'info': 0.5} fail_priority = args.fail_on.split('_')[1] threshold_weight = severity_weights.get(fail_priority, 0) - + findings_at_or_above = [ - r for r in results + r for r in results if severity_weights.get(r.get('severity', 'info').lower(), 0.5) >= threshold_weight ] - + if findings_at_or_above: - print(f"\n[ERROR] Build failed: {len(findings_at_or_above)} findings with priority {fail_priority} or higher detected and --fail-on={args.fail_on} specified.", file=sys.stderr) + logger.error( + f"Build failed: {len(findings_at_or_above)} findings " + f"with priority {fail_priority} or higher detected " + f"and --fail-on={args.fail_on} specified." + ) return True - + return False def main(): + total_start = time.perf_counter() + load_dotenv() args = setup_args() - + target_path = os.path.abspath(args.path) - + if not os.path.exists(target_path): - print(f"Error: Path '{target_path}' does not exist.", file=sys.stderr) + logger.error(f"Path '{target_path}' does not exist.") sys.exit(1) - + try: if args.format == 'text': - print(f"Analyzing {target_path} with '{args.scanner}' scanner...") - + logger.info( + f"Analyzing {target_path} with '{args.scanner}' scanner..." + ) + # Run Scanners + logger.info("Starting directory scan") + + scan_start = time.perf_counter() + results, resource_count, recommendations = scan_directory( - target_path, + target_path, scanner_type=args.scanner, framework=args.framework, download_external_modules=args.download_external_modules, included_paths=args.include ) + scan_duration = time.perf_counter() - scan_start + + logger.info( + f"Directory scan completed in {scan_duration:.2f}s. " + f"Found {len(results)} findings in {resource_count} resources." + ) + # Generate Report + logger.info("Generating report") + + report_start = time.perf_counter() + report_generator = ReportGenerator() report = report_generator.generate_report( findings=results, @@ -310,43 +351,63 @@ def main(): scanner_type=args.scanner, extra_recommendations=recommendations ) - + + report_duration = time.perf_counter() - report_start + + logger.info( + f"Report generation completed in {report_duration:.2f}s" + ) + report_dict = report.to_dict() report_dict['results'] = results report_dict['summary'] = { 'total': len(results), 'scanner_used': args.scanner } - + # Output Results to file/stdout if args.out: + logger.info(f"Saving report to {args.out}") + + save_start = time.perf_counter() + if args.format == 'json': with open(args.out, 'w') as f: json.dump(report_dict, f, indent=2) + + save_duration = time.perf_counter() - save_start + + logger.info( + f"Report saved in {save_duration:.2f}s" + ) elif args.format == 'html': html_output = generate_standalone_html(report_dict) with open(args.out, 'w', encoding='utf-8') as f: f.write(html_output) - else: # text format - # Default behavior for text mode with --out is to save JSON results + else: with open(args.out, 'w') as f: json.dump(report_dict, f, indent=2) - # Handle console output + # Console output if args.format == 'json' and not args.out: print(json.dumps(report_dict, indent=2)) elif args.format == 'html' and not args.out: print(generate_standalone_html(report_dict)) else: - # If format is text OR if output is saved to record/html/json - # always show the text summary in the console print_text_report(report_dict, resource_count, args.scanner) if args.out: - print(f"{Fore.GREEN}[v] Full {args.format.upper()} report saved to: {Fore.WHITE}{args.out}") - - # Send Slack notification if configured + print( + f"{Fore.GREEN}[v] Full {args.format.upper()} " + f"report saved to: {Fore.WHITE}{args.out}" + ) + + # Slack notification webhook_url = os.getenv('SLACK_WEBHOOK_URL', '').strip() if webhook_url: + logger.info("Sending Slack notification") + + slack_start = time.perf_counter() + overall = report_dict.get('overall', {}) cost = report_dict.get('cost', {}) security = report_dict.get('security', {}) @@ -356,17 +417,31 @@ def main(): overall_grade = overall.get('letter', '?') if overall else '?' overall_pct = overall.get('percentage', 0) if overall else 0 - grades_parts = [f"Overall {overall_grade} ({overall_pct}%)"] + grades_parts = [f"Overall {overall_grade} ({overall_pct}%)"] + if cost and cost.get('max_score', 0) > 0: - grades_parts.append(f"Cost {cost.get('letter','?')} ({cost.get('percentage',0)}%)") + grades_parts.append( + f"Cost {cost.get('letter', '?')} " + f"({cost.get('percentage', 0)}%)" + ) + if security and security.get('max_score', 0) > 0: - grades_parts.append(f"Security {security.get('letter','?')} ({security.get('percentage',0)}%)") + grades_parts.append( + f"Security {security.get('letter', '?')} " + f"({security.get('percentage', 0)}%)" + ) + if container and container.get('max_score', 0) > 0: - grades_parts.append(f"Containers {container.get('letter','?')} ({container.get('percentage',0)}%)") + grades_parts.append( + f"Containers {container.get('letter', '?')} " + f"({container.get('percentage', 0)}%)" + ) + grades_summary = " | ".join(grades_parts) ctx = build_gh_actions_context() lines = ["🤖 InfraScan used in *GitHub Actions*"] + if ctx['repo']: lines.append(f"Repo: *{ctx['repo']}*") if ctx['branch']: @@ -375,24 +450,41 @@ def main(): lines.append(f"Workflow: _{ctx['workflow']}_") if ctx['actor']: lines.append(f"Triggered by: {ctx['actor']}") + lines.append(f"Grades: {grades_summary}") lines.append(f"Findings: {total_findings} | Scanner: {args.scanner}") + if ctx['run_url']: lines.append(f"<{ctx['run_url']}|View run>") send_slack_notification(" | ".join(lines)) + slack_duration = time.perf_counter() - slack_start + + logger.info( + f"Slack notification sent in {slack_duration:.2f}s" + ) # Determine Exit Code + logger.info("Evaluating fail conditions") + if should_fail(args, report_dict, results): sys.exit(1) - + + total_duration = time.perf_counter() - total_start + + logger.info( + f"InfraScan completed successfully in " + f"{total_duration:.2f}s" + ) sys.exit(0) - + except Exception as e: - print(f"An error occurred during scanning: {e}", file=sys.stderr) + logger.exception(f"An error occurred during scanning: {e}") + if logging.getLogger().isEnabledFor(logging.DEBUG): import traceback traceback.print_exc() + sys.exit(1) if __name__ == "__main__":