From b671cd017bbaa5509ec885cea534bc0a03acbc92 Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 19 May 2026 12:11:05 +0200 Subject: [PATCH 1/5] Add timestamps to InfraScan CLI logs Added a logging function to print messages with timestamps, enhancing the visibility of the scanning process. Updated the main function to utilize this logging for various stages of execution. --- cli.py | 104 +++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 76 insertions(+), 28 deletions(-) diff --git a/cli.py b/cli.py index d11576b..552b045 100755 --- a/cli.py +++ b/cli.py @@ -18,6 +18,18 @@ def init(*args, **kwargs): pass from scanner.parser import scan_directory from reporter.grading import ReportGenerator from reporter.html_generator import generate_standalone_html +from datetime import datetime + + +def log_with_timestamp(message: str) -> None: + """ + Print message prefixed with current timestamp. + + Example: + [2026-10-10 16:23:34] Starting scan... + """ + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"[{timestamp}] {message}", flush=True) __version__ = "1.0.6" @@ -282,27 +294,38 @@ def should_fail(args, report_dict, results): def main(): load_dotenv() args = setup_args() - + target_path = os.path.abspath(args.path) - + if not os.path.exists(target_path): print(f"Error: Path '{target_path}' does not exist.", file=sys.stderr) sys.exit(1) - + try: if args.format == 'text': - print(f"Analyzing {target_path} with '{args.scanner}' scanner...") - + log_with_timestamp( + f"Analyzing {target_path} with '{args.scanner}' scanner..." + ) + # Run Scanners + log_with_timestamp("Starting directory scan") + results, resource_count, recommendations = scan_directory( - target_path, + target_path, scanner_type=args.scanner, framework=args.framework, download_external_modules=args.download_external_modules, included_paths=args.include ) - + + log_with_timestamp( + f"Directory scan completed. Found {len(results)} findings " + f"in {resource_count} resources." + ) + # Generate Report + log_with_timestamp("Generating report") + report_generator = ReportGenerator() report = report_generator.generate_report( findings=results, @@ -310,16 +333,20 @@ def main(): scanner_type=args.scanner, extra_recommendations=recommendations ) - + + log_with_timestamp("Report generation completed") + report_dict = report.to_dict() report_dict['results'] = results report_dict['summary'] = { 'total': len(results), 'scanner_used': args.scanner } - + # Output Results to file/stdout if args.out: + log_with_timestamp(f"Saving report to {args.out}") + if args.format == 'json': with open(args.out, 'w') as f: json.dump(report_dict, f, indent=2) @@ -327,46 +354,62 @@ def main(): html_output = generate_standalone_html(report_dict) with open(args.out, 'w', encoding='utf-8') as f: f.write(html_output) - else: # text format - # Default behavior for text mode with --out is to save JSON results + else: with open(args.out, 'w') as f: json.dump(report_dict, f, indent=2) - # Handle console output + # Console output if args.format == 'json' and not args.out: print(json.dumps(report_dict, indent=2)) elif args.format == 'html' and not args.out: print(generate_standalone_html(report_dict)) else: - # If format is text OR if output is saved to record/html/json - # always show the text summary in the console print_text_report(report_dict, resource_count, args.scanner) if args.out: - print(f"{Fore.GREEN}[v] Full {args.format.upper()} report saved to: {Fore.WHITE}{args.out}") - - # Send Slack notification if configured + print( + f"{Fore.GREEN}[v] Full {args.format.upper()} " + f"report saved to: {Fore.WHITE}{args.out}" + ) + + # Slack notification webhook_url = os.getenv('SLACK_WEBHOOK_URL', '').strip() if webhook_url: + log_with_timestamp("Sending Slack notification") + overall = report_dict.get('overall', {}) cost = report_dict.get('cost', {}) security = report_dict.get('security', {}) container = report_dict.get('container', {}) total_findings = len(results) - overall_grade = overall.get('letter', '?') if overall else '?' - overall_pct = overall.get('percentage', 0) if overall else 0 + overall_grade = overall.get('letter', '?') + overall_pct = overall.get('percentage', 0) + + grades_parts = [f"Overall {overall_grade} ({overall_pct}%)"] - grades_parts = [f"Overall {overall_grade} ({overall_pct}%)"] if cost and cost.get('max_score', 0) > 0: - grades_parts.append(f"Cost {cost.get('letter','?')} ({cost.get('percentage',0)}%)") + grades_parts.append( + f"Cost {cost.get('letter', '?')} " + f"({cost.get('percentage', 0)}%)" + ) + if security and security.get('max_score', 0) > 0: - grades_parts.append(f"Security {security.get('letter','?')} ({security.get('percentage',0)}%)") + grades_parts.append( + f"Security {security.get('letter', '?')} " + f"({security.get('percentage', 0)}%)" + ) + if container and container.get('max_score', 0) > 0: - grades_parts.append(f"Containers {container.get('letter','?')} ({container.get('percentage',0)}%)") + grades_parts.append( + f"Containers {container.get('letter', '?')} " + f"({container.get('percentage', 0)}%)" + ) + grades_summary = " | ".join(grades_parts) ctx = build_gh_actions_context() lines = ["🤖 InfraScan used in *GitHub Actions*"] + if ctx['repo']: lines.append(f"Repo: *{ctx['repo']}*") if ctx['branch']: @@ -375,25 +418,30 @@ def main(): lines.append(f"Workflow: _{ctx['workflow']}_") if ctx['actor']: lines.append(f"Triggered by: {ctx['actor']}") + lines.append(f"Grades: {grades_summary}") lines.append(f"Findings: {total_findings} | Scanner: {args.scanner}") + if ctx['run_url']: lines.append(f"<{ctx['run_url']}|View run>") send_slack_notification(" | ".join(lines)) # Determine Exit Code + log_with_timestamp("Evaluating fail conditions") + if should_fail(args, report_dict, results): sys.exit(1) - + + log_with_timestamp("InfraScan completed successfully") sys.exit(0) - + except Exception as e: + log_with_timestamp(f"ERROR: {e}") print(f"An error occurred during scanning: {e}", file=sys.stderr) + if logging.getLogger().isEnabledFor(logging.DEBUG): import traceback traceback.print_exc() - sys.exit(1) -if __name__ == "__main__": - main() + sys.exit(1) From 84ccf06d1db792e3dc3ff99b9df796858649dac8 Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 19 May 2026 15:21:53 +0200 Subject: [PATCH 2/5] Address review feedback: reorganize log_with_timestamp, use logging module, restore main check --- cli.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/cli.py b/cli.py index 552b045..3362b92 100755 --- a/cli.py +++ b/cli.py @@ -18,18 +18,6 @@ def init(*args, **kwargs): pass from scanner.parser import scan_directory from reporter.grading import ReportGenerator from reporter.html_generator import generate_standalone_html -from datetime import datetime - - -def log_with_timestamp(message: str) -> None: - """ - Print message prefixed with current timestamp. - - Example: - [2026-10-10 16:23:34] Starting scan... - """ - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - print(f"[{timestamp}] {message}", flush=True) __version__ = "1.0.6" @@ -291,6 +279,17 @@ def should_fail(args, report_dict, results): return False +def log_with_timestamp(message: str) -> None: + """ + Log message with current timestamp using the logging module. + + Example: + [2026-10-10 16:23:34] Starting scan... + """ + from datetime import datetime + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"[{timestamp}] {message}", flush=True) + def main(): load_dotenv() args = setup_args() @@ -445,3 +444,6 @@ def main(): traceback.print_exc() sys.exit(1) + +if __name__ == "__main__": + main() From 0743293528296cf821404df07c0115a27160f002 Mon Sep 17 00:00:00 2001 From: Natan Date: Tue, 19 May 2026 15:45:22 +0200 Subject: [PATCH 3/5] Refactor logging to use logger with INFO level Updated logging configuration to use INFO level and added timestamps to log messages. Replaced print statements with logger calls for error handling and notifications. --- cli.py | 120 +++++++++++++++++++++++++++------------------------------ 1 file changed, 57 insertions(+), 63 deletions(-) diff --git a/cli.py b/cli.py index 3362b92..7b093a3 100755 --- a/cli.py +++ b/cli.py @@ -22,7 +22,13 @@ def init(*args, **kwargs): pass __version__ = "1.0.6" # Setup basic logging -logging.basicConfig(level=logging.ERROR, format='%(levelname)s: %(message)s') +logging.basicConfig( + level=logging.INFO, + format='[%(asctime)s] %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) + +logger = logging.getLogger(__name__) def send_slack_notification(message: str) -> None: """Send a Slack notification via webhook URL from environment variable.""" @@ -32,9 +38,9 @@ def send_slack_notification(message: str) -> None: try: response = requests.post(webhook_url, json={'text': message}, timeout=5) if response.status_code >= 400: - print(f"Slack notification failed: {response.status_code} - {response.text}", file=sys.stderr) + logger.error(f"Slack notification failed: {response.status_code} - {response.text}") except Exception as e: - print(f"Slack notification error: {e}", file=sys.stderr) + logger.error(f"Slack notification error: {e}") def build_gh_actions_context() -> dict: """Extract GitHub Actions context from environment variables.""" @@ -57,39 +63,39 @@ def setup_args(): parser = argparse.ArgumentParser( description="InfraScan CLI - Open Source IaC Cost & Security Scanner" ) - + parser.add_argument( "path", nargs="?", default="/scan", help="Path to the directory to scan (default: /scan when using Docker, or '.' for local use)" ) - + parser.add_argument( "--scanner", default="comprehensive", help="Scanner type(s) to run (default: comprehensive). Support multiple scanners separated by comma (e.g., 'regex,containers'). Options: regex, checkov, containers, comprehensive" ) - + parser.add_argument( "--format", choices=["text", "json", "html"], default="text", help="Output format (default: text)" ) - + parser.add_argument( "--out", help="File path to save JSON output explicitly (e.g., infrascan-report.json)" ) - + parser.add_argument( "--fail-on", choices=["any", "high_critical", "grade_a", "grade_b", "grade_c", "grade_d", "grade_f", "priority_critical", "priority_high", "priority_medium", "priority_low", "priority_info"], help="Exit with error code 1 if findings match criteria (any findings, high/critical findings, grade threshold, or priority threshold)" ) - + parser.add_argument( "--download-external-modules", action="store_true", @@ -109,39 +115,39 @@ def setup_args(): dest="include", help="Select specific files or directories to scan. Can be used multiple times." ) - + parser.add_argument( "--version", action="version", version=f"InfraScan v{__version__}", help="Show version information and exit" ) - + return parser.parse_args() def print_text_report(report_dict, resource_count, scanner_type): # Initialize colorama init(autoreset=True) - + overall = report_dict.get('overall', {}) findings_dict = report_dict.get('findings', {}) results = findings_dict.get('all', report_dict.get('results', [])) - + # Header print(f"\n{Fore.CYAN}{Style.BRIGHT}{'=' * 60}") print(f"{Fore.CYAN}{Style.BRIGHT} InfraScan Report - {scanner_type.upper()} SCAN") print(f"{Fore.CYAN}{Style.BRIGHT}{'=' * 60}") - + # Summary Info target_path = os.path.abspath(sys.argv[1] if len(sys.argv) > 1 and not sys.argv[1].startswith('--') else '.') print(f"{Style.BRIGHT}Path Scanned :{Style.RESET_ALL} {target_path}") print(f"{Style.BRIGHT}Resources Found :{Style.RESET_ALL} {resource_count}") print(f"{Style.BRIGHT}Total Findings :{Style.RESET_ALL} {len(results)}") - + # Grades Section print(f"\n{Style.BRIGHT}GRADING SUMMARY:") print(f"{'-' * 30}") - + def get_grade_color(letter): if letter == 'A': return Fore.GREEN if letter == 'B': return Fore.GREEN @@ -152,11 +158,11 @@ def get_grade_color(letter): def print_grade_line(name, grade): if not grade or (grade.get('max_score', 0) == 0 and grade.get('letter') != 'A'): return - + letter = grade.get('letter', '?') percentage = grade.get('percentage', 0) color = get_grade_color(letter) - + breakdown = grade.get('severity_breakdown', {}) counts = [ f"{Fore.RED}Crit:{breakdown.get('critical', 0)}{Style.RESET_ALL}", @@ -168,13 +174,13 @@ def print_grade_line(name, grade): print(f"{name:18}: {color}{Style.BRIGHT}{letter}{Style.RESET_ALL} ({percentage}%){br_str}") print_grade_line("Overall Health", overall) - + if scanner_type in ['regex', 'comprehensive']: print_grade_line("Cost Efficiency", report_dict.get('cost')) - + if scanner_type in ['checkov', 'comprehensive']: print_grade_line("IaC Security", report_dict.get('security')) - + if scanner_type in ['containers', 'comprehensive']: print_grade_line("Container Security", report_dict.get('container')) @@ -184,12 +190,12 @@ def print_grade_line(name, grade): print(f"\n{Fore.GREEN}{Style.BRIGHT}RECOMMENDATIONS:") for rec in recs: print(f" {Fore.GREEN}• {Style.BRIGHT}{rec}") - + # Findings Details if results: print(f"\n{Style.BRIGHT}FINDINGS DETAILS:") print(f"{'=' * 60}") - + # Categorize findings categories = [] if findings_dict.get('cost'): @@ -198,97 +204,86 @@ def print_grade_line(name, grade): categories.append(('IaC Security', findings_dict['security'])) if findings_dict.get('container'): categories.append(('Container Security', findings_dict['container'])) - + if not categories: categories = [('General Findings', results)] for cat_name, cat_findings in categories: if not cat_findings: continue - + print(f"\n{Style.BRIGHT}>>> {cat_name} ({len(cat_findings)})") - + # Limit display to 40 findings to avoid overwhelming CI logs display_limit = 40 for i, res in enumerate(cat_findings): if i >= display_limit: print(f"\n {Fore.YELLOW}... and {len(cat_findings) - display_limit} more findings (see full report for details)") break - + severity = res.get('severity', 'UNKNOWN').upper() sev_color = Fore.WHITE if severity == 'CRITICAL': sev_color = Fore.RED + Style.BRIGHT elif severity == 'HIGH': sev_color = Fore.RED elif severity == 'MEDIUM': sev_color = Fore.YELLOW elif severity == 'LOW': sev_color = Fore.CYAN - + rule_id = res.get('rule_id', 'N/A') file_path = res.get('file', 'Unknown') line_str = f":{res.get('line')}" if res.get('line') else "" - + print(f" {sev_color}[{severity}]{Style.RESET_ALL} {Style.BRIGHT}{rule_id}{Style.RESET_ALL}: {res.get('description', '')}") print(f" {Fore.WHITE}at {file_path}{line_str}{Style.RESET_ALL}") if res.get('resource'): print(f" {Fore.WHITE}resource: {res.get('resource')}{Style.RESET_ALL}") - + print(f"\n{Fore.CYAN}{Style.BRIGHT}{'=' * 60}\n") def should_fail(args, report_dict, results): if not args.fail_on: return False - + if args.fail_on == 'any' and len(results) > 0: print("\n[ERROR] Build failed: Findings detected and --fail-on=any specified.", file=sys.stderr) return True - + if args.fail_on == 'high_critical': critical_high_count = sum(1 for r in results if r.get('severity', '').lower() in ['critical', 'high']) if critical_high_count > 0: print(f"\n[ERROR] Build failed: {critical_high_count} high/critical findings detected and --fail-on=high_critical specified.", file=sys.stderr) return True - + if args.fail_on.startswith('grade_'): grade_order = ['A', 'B', 'C', 'D', 'F'] fail_grade = args.fail_on.split('_')[1].upper() overall_letter = report_dict.get('overall', {}).get('letter', 'A') - + try: fail_idx = grade_order.index(fail_grade) current_idx = grade_order.index(overall_letter) - + if current_idx >= fail_idx: print(f"\n[ERROR] Build failed: Overall grade is {overall_letter} and --fail-on={args.fail_on} specified (threshold: {fail_grade} or worse).", file=sys.stderr) return True except ValueError: pass # Should not happen due to argparse choices - + if args.fail_on.startswith('priority_'): severity_weights = {'critical': 4, 'high': 3, 'medium': 2, 'low': 1, 'info': 0.5} fail_priority = args.fail_on.split('_')[1] threshold_weight = severity_weights.get(fail_priority, 0) - + findings_at_or_above = [ - r for r in results + r for r in results if severity_weights.get(r.get('severity', 'info').lower(), 0.5) >= threshold_weight ] - + if findings_at_or_above: print(f"\n[ERROR] Build failed: {len(findings_at_or_above)} findings with priority {fail_priority} or higher detected and --fail-on={args.fail_on} specified.", file=sys.stderr) return True - - return False -def log_with_timestamp(message: str) -> None: - """ - Log message with current timestamp using the logging module. - - Example: - [2026-10-10 16:23:34] Starting scan... - """ - from datetime import datetime - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - print(f"[{timestamp}] {message}", flush=True) + return False def main(): load_dotenv() @@ -297,17 +292,17 @@ def main(): target_path = os.path.abspath(args.path) if not os.path.exists(target_path): - print(f"Error: Path '{target_path}' does not exist.", file=sys.stderr) + logger.error(f"Path '{target_path}' does not exist.") sys.exit(1) try: if args.format == 'text': - log_with_timestamp( + logger.info( f"Analyzing {target_path} with '{args.scanner}' scanner..." ) # Run Scanners - log_with_timestamp("Starting directory scan") + logger.info("Starting directory scan") results, resource_count, recommendations = scan_directory( target_path, @@ -317,13 +312,13 @@ def main(): included_paths=args.include ) - log_with_timestamp( + logger.info( f"Directory scan completed. Found {len(results)} findings " f"in {resource_count} resources." ) # Generate Report - log_with_timestamp("Generating report") + logger.info("Generating report") report_generator = ReportGenerator() report = report_generator.generate_report( @@ -333,7 +328,7 @@ def main(): extra_recommendations=recommendations ) - log_with_timestamp("Report generation completed") + logger.info("Report generation completed") report_dict = report.to_dict() report_dict['results'] = results @@ -344,7 +339,7 @@ def main(): # Output Results to file/stdout if args.out: - log_with_timestamp(f"Saving report to {args.out}") + logger.info(f"Saving report to {args.out}") if args.format == 'json': with open(args.out, 'w') as f: @@ -373,7 +368,7 @@ def main(): # Slack notification webhook_url = os.getenv('SLACK_WEBHOOK_URL', '').strip() if webhook_url: - log_with_timestamp("Sending Slack notification") + logger.info("Sending Slack notification") overall = report_dict.get('overall', {}) cost = report_dict.get('cost', {}) @@ -427,17 +422,16 @@ def main(): send_slack_notification(" | ".join(lines)) # Determine Exit Code - log_with_timestamp("Evaluating fail conditions") + logger.info("Evaluating fail conditions") if should_fail(args, report_dict, results): sys.exit(1) - log_with_timestamp("InfraScan completed successfully") + logger.info("InfraScan completed successfully") sys.exit(0) except Exception as e: - log_with_timestamp(f"ERROR: {e}") - print(f"An error occurred during scanning: {e}", file=sys.stderr) + logger.error(f"ERROR: {e}") if logging.getLogger().isEnabledFor(logging.DEBUG): import traceback From ffd60651551fd409d3003b6b4c0b3e85f8f75904 Mon Sep 17 00:00:00 2001 From: Natan Date: Mon, 25 May 2026 08:37:34 +0200 Subject: [PATCH 4/5] Add timestamped logging and execution duration metrics Add timestamped logging and execution duration metrics to InfraScan CLI. This update improves log visibility during long-running scans by adding timestamps to log output and tracking execution time for key operations such as: * directory scanning * report generation * report saving * Slack notifications * total CLI execution The goal is to make GitHub Actions and CI logs easier to debug and help identify performance bottlenecks during large scans. --- cli.py | 56 ++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/cli.py b/cli.py index 7b093a3..f1e4c59 100755 --- a/cli.py +++ b/cli.py @@ -5,6 +5,7 @@ import json import logging import requests +import time try: from colorama import Fore, Style, init except ImportError: @@ -21,11 +22,13 @@ def init(*args, **kwargs): pass __version__ = "1.0.6" -# Setup basic logging +# Setup logging with timestamps logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' + datefmt='%Y-%m-%d %H:%M:%S', + stream=sys.stdout, + force=True ) logger = logging.getLogger(__name__) @@ -105,7 +108,7 @@ def setup_args(): parser.add_argument( "--framework", default="auto", - choices=["auto", "terraform", "kubernetes", "cloudformation", "helm"], + choices=["auto", "terraform", "kubernetes", "cloudformation", "helm", "all"], help="IaC framework type (default: auto-detect)" ) @@ -286,6 +289,8 @@ def should_fail(args, report_dict, results): return False def main(): + total_start = time.time() + load_dotenv() args = setup_args() @@ -303,7 +308,9 @@ def main(): # Run Scanners logger.info("Starting directory scan") - + + scan_start = time.time() + results, resource_count, recommendations = scan_directory( target_path, scanner_type=args.scanner, @@ -311,15 +318,19 @@ def main(): download_external_modules=args.download_external_modules, included_paths=args.include ) - + + scan_duration = time.time() - scan_start + logger.info( - f"Directory scan completed. Found {len(results)} findings " - f"in {resource_count} resources." + f"Directory scan completed in {scan_duration:.2f}s. " + f"Found {len(results)} findings in {resource_count} resources." ) # Generate Report logger.info("Generating report") + report_start = time.time() + report_generator = ReportGenerator() report = report_generator.generate_report( findings=results, @@ -328,7 +339,11 @@ def main(): extra_recommendations=recommendations ) - logger.info("Report generation completed") + report_duration = time.time() - report_start + + logger.info( + f"Report generation completed in {report_duration:.2f}s" + ) report_dict = report.to_dict() report_dict['results'] = results @@ -340,10 +355,18 @@ def main(): # Output Results to file/stdout if args.out: logger.info(f"Saving report to {args.out}") + + save_start = time.time() if args.format == 'json': with open(args.out, 'w') as f: json.dump(report_dict, f, indent=2) + + save_duration = time.time() - save_start + + logger.info( + f"Report saved in {save_duration:.2f}s" + ) elif args.format == 'html': html_output = generate_standalone_html(report_dict) with open(args.out, 'w', encoding='utf-8') as f: @@ -370,6 +393,8 @@ def main(): if webhook_url: logger.info("Sending Slack notification") + slack_start = time.time() + overall = report_dict.get('overall', {}) cost = report_dict.get('cost', {}) security = report_dict.get('security', {}) @@ -420,6 +445,11 @@ def main(): lines.append(f"<{ctx['run_url']}|View run>") send_slack_notification(" | ".join(lines)) + slack_duration = time.time() - slack_start + + logger.info( + f"Slack notification sent in {slack_duration:.2f}s" + ) # Determine Exit Code logger.info("Evaluating fail conditions") @@ -427,11 +457,16 @@ def main(): if should_fail(args, report_dict, results): sys.exit(1) - logger.info("InfraScan completed successfully") + total_duration = time.time() - total_start + + logger.info( + f"InfraScan completed successfully in " + f"{total_duration:.2f}s" + ) sys.exit(0) except Exception as e: - logger.error(f"ERROR: {e}") + logger.exception(f"An error occurred during scanning: {e}") if logging.getLogger().isEnabledFor(logging.DEBUG): import traceback @@ -441,3 +476,4 @@ def main(): if __name__ == "__main__": main() + From be246d5fd32c81a7b129d94810828253f790f411 Mon Sep 17 00:00:00 2001 From: Natan Date: Mon, 25 May 2026 16:55:40 +0200 Subject: [PATCH 5/5] Improve logging consistency and execution timing - added execution timing logs for scan, report generation, Slack notifications, and total runtime - replaced time.time() with time.perf_counter() for accurate duration measurements - unified error handling with logger.error() instead of raw stderr prints - fixed potential NoneType crash in Slack notification grading logic - improved logging consistency across CLI execution flow --- cli.py | 46 +++++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/cli.py b/cli.py index f1e4c59..5b17063 100755 --- a/cli.py +++ b/cli.py @@ -248,13 +248,18 @@ def should_fail(args, report_dict, results): return False if args.fail_on == 'any' and len(results) > 0: - print("\n[ERROR] Build failed: Findings detected and --fail-on=any specified.", file=sys.stderr) + logger.error( + "Build failed: Findings detected and --fail-on=any specified." + ) return True if args.fail_on == 'high_critical': critical_high_count = sum(1 for r in results if r.get('severity', '').lower() in ['critical', 'high']) if critical_high_count > 0: - print(f"\n[ERROR] Build failed: {critical_high_count} high/critical findings detected and --fail-on=high_critical specified.", file=sys.stderr) + logger.error( + f"Build failed: {critical_high_count} high/critical findings " + f"detected and --fail-on=high_critical specified." + ) return True if args.fail_on.startswith('grade_'): @@ -267,7 +272,11 @@ def should_fail(args, report_dict, results): current_idx = grade_order.index(overall_letter) if current_idx >= fail_idx: - print(f"\n[ERROR] Build failed: Overall grade is {overall_letter} and --fail-on={args.fail_on} specified (threshold: {fail_grade} or worse).", file=sys.stderr) + logger.error( + f"Build failed: Overall grade is {overall_letter} " + f"and --fail-on={args.fail_on} specified " + f"(threshold: {fail_grade} or worse)." + ) return True except ValueError: pass # Should not happen due to argparse choices @@ -283,13 +292,17 @@ def should_fail(args, report_dict, results): ] if findings_at_or_above: - print(f"\n[ERROR] Build failed: {len(findings_at_or_above)} findings with priority {fail_priority} or higher detected and --fail-on={args.fail_on} specified.", file=sys.stderr) + logger.error( + f"Build failed: {len(findings_at_or_above)} findings " + f"with priority {fail_priority} or higher detected " + f"and --fail-on={args.fail_on} specified." + ) return True return False def main(): - total_start = time.time() + total_start = time.perf_counter() load_dotenv() args = setup_args() @@ -309,7 +322,7 @@ def main(): # Run Scanners logger.info("Starting directory scan") - scan_start = time.time() + scan_start = time.perf_counter() results, resource_count, recommendations = scan_directory( target_path, @@ -319,7 +332,7 @@ def main(): included_paths=args.include ) - scan_duration = time.time() - scan_start + scan_duration = time.perf_counter() - scan_start logger.info( f"Directory scan completed in {scan_duration:.2f}s. " @@ -329,7 +342,7 @@ def main(): # Generate Report logger.info("Generating report") - report_start = time.time() + report_start = time.perf_counter() report_generator = ReportGenerator() report = report_generator.generate_report( @@ -339,7 +352,7 @@ def main(): extra_recommendations=recommendations ) - report_duration = time.time() - report_start + report_duration = time.perf_counter() - report_start logger.info( f"Report generation completed in {report_duration:.2f}s" @@ -356,13 +369,13 @@ def main(): if args.out: logger.info(f"Saving report to {args.out}") - save_start = time.time() + save_start = time.perf_counter() if args.format == 'json': with open(args.out, 'w') as f: json.dump(report_dict, f, indent=2) - save_duration = time.time() - save_start + save_duration = time.perf_counter() - save_start logger.info( f"Report saved in {save_duration:.2f}s" @@ -393,7 +406,7 @@ def main(): if webhook_url: logger.info("Sending Slack notification") - slack_start = time.time() + slack_start = time.perf_counter() overall = report_dict.get('overall', {}) cost = report_dict.get('cost', {}) @@ -401,8 +414,8 @@ def main(): container = report_dict.get('container', {}) total_findings = len(results) - overall_grade = overall.get('letter', '?') - overall_pct = overall.get('percentage', 0) + overall_grade = overall.get('letter', '?') if overall else '?' + overall_pct = overall.get('percentage', 0) if overall else 0 grades_parts = [f"Overall {overall_grade} ({overall_pct}%)"] @@ -445,7 +458,7 @@ def main(): lines.append(f"<{ctx['run_url']}|View run>") send_slack_notification(" | ".join(lines)) - slack_duration = time.time() - slack_start + slack_duration = time.perf_counter() - slack_start logger.info( f"Slack notification sent in {slack_duration:.2f}s" @@ -457,7 +470,7 @@ def main(): if should_fail(args, report_dict, results): sys.exit(1) - total_duration = time.time() - total_start + total_duration = time.perf_counter() - total_start logger.info( f"InfraScan completed successfully in " @@ -476,4 +489,3 @@ def main(): if __name__ == "__main__": main() -