From 9866e2ecf1a08ab754c729768407ac99bc591f0a Mon Sep 17 00:00:00 2001 From: Matt Wyen Date: Tue, 7 Oct 2025 16:28:19 -0400 Subject: [PATCH 1/6] feat(config): add persistent configuration system Implements comprehensive configuration management with: - Persistent storage in ~/.insightvm/config.json - Tool-specific defaults and last-used values - User preferences (colors, confirmations, progress) - State management for resumable operations - Dot-notation access pattern Relates to #58 --- src/rapid7/config.py | 274 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 274 insertions(+) create mode 100644 src/rapid7/config.py diff --git a/src/rapid7/config.py b/src/rapid7/config.py new file mode 100644 index 0000000..272634c --- /dev/null +++ b/src/rapid7/config.py @@ -0,0 +1,274 @@ +""" +Configuration management for InsightVM tools. + +This module provides persistent configuration storage for user preferences, +last-used values, and tool settings. Configuration is stored in JSON format +in the user's home directory. +""" + +import json +from pathlib import Path +from typing import Any, Dict, Optional + + +class Config: + """ + Manages persistent configuration for InsightVM tools. + + Configuration is stored in ~/.insightvm/config.json and includes: + - Tool-specific defaults + - Last-used values + - User preferences + - Output formatting options + """ + + def __init__(self, config_dir: Optional[str] = None): + """ + Initialize configuration manager. + + Args: + config_dir: Optional custom config directory path. + Defaults to ~/.insightvm + """ + if config_dir: + self.config_dir = Path(config_dir) + else: + self.config_dir = Path.home() / '.insightvm' + + self.config_file = self.config_dir / 'config.json' + self.state_dir = self.config_dir / 'state' + + # Ensure directories exist + self.config_dir.mkdir(parents=True, exist_ok=True) + self.state_dir.mkdir(parents=True, exist_ok=True) + + # Load existing config or create default + self.data = self._load_config() + + def _load_config(self) -> Dict[str, Any]: + """ + Load configuration from file. + + Returns: + Configuration dictionary + """ + if self.config_file.exists(): + try: + with open(self.config_file, 'r') as f: + return json.load(f) + except (json.JSONDecodeError, IOError) as e: + print(f"Warning: Could not load config: {e}") + return self._default_config() + else: + return self._default_config() + + def _default_config(self) -> Dict[str, Any]: + """ + Create default configuration. + + Returns: + Default configuration dictionary + """ + return { + 'version': '2.0.0', + 'preferences': { + 'confirm_destructive_operations': True, + 'colored_output': True, + 'show_progress_bars': True, + 'verbose': False + }, + 'tools': { + 'sonar_queries': { + 'last_csv_path': '', + 'default_days': 30, + 'last_output_path': '' + }, + 'insight_agent': { + 'last_installer_path': '', + 'last_token': '' # Note: Should be encrypted in production + }, + 'scan_assistant': { + 'last_certificate': '', + 'package_manager': '' + } + } + } + + def save(self) -> None: + """Save configuration to file.""" + try: + with open(self.config_file, 'w') as f: + json.dump(self.data, f, indent=2) + except IOError as e: + print(f"Warning: Could not save config: {e}") + + def get(self, key_path: str, default: Any = None) -> Any: + """ + Get configuration value using dot notation. + + Args: + key_path: Dot-separated path + (e.g., 'tools.sonar_queries.default_days') + default: Default value if key not found + + Returns: + Configuration value or default + + Example: + >>> config = Config() + >>> config.get('preferences.colored_output', True) + True + """ + keys = key_path.split('.') + value = self.data + + for key in keys: + if isinstance(value, dict) and key in value: + value = value[key] + else: + return default + + return value + + def set(self, key_path: str, value: Any) -> None: + """ + Set configuration value using dot notation. + + Args: + key_path: Dot-separated path + (e.g., 'tools.sonar_queries.default_days') + value: Value to set + + Example: + >>> config = Config() + >>> config.set('tools.sonar_queries.default_days', 7) + >>> config.save() + """ + keys = key_path.split('.') + data = self.data + + # Navigate to the parent dictionary + for key in keys[:-1]: + if key not in data: + data[key] = {} + data = data[key] + + # Set the value + data[keys[-1]] = value + + def get_tool_config(self, tool_name: str) -> Dict[str, Any]: + """ + Get configuration for a specific tool. + + Args: + tool_name: Name of the tool + + Returns: + Tool configuration dictionary + """ + return self.get(f'tools.{tool_name}', {}) + + def set_tool_config(self, tool_name: str, config: Dict[str, Any]) -> None: + """ + Set configuration for a specific tool. + + Args: + tool_name: Name of the tool + config: Configuration dictionary + """ + if 'tools' not in self.data: + self.data['tools'] = {} + self.data['tools'][tool_name] = config + + def get_preference(self, key: str, default: Any = None) -> Any: + """ + Get user preference value. + + Args: + key: Preference key + default: Default value if not found + + Returns: + Preference value or default + """ + return self.get(f'preferences.{key}', default) + + def set_preference(self, key: str, value: Any) -> None: + """ + Set user preference value. + + Args: + key: Preference key + value: Preference value + """ + self.set(f'preferences.{key}', value) + + def save_state(self, tool_name: str, state: Dict[str, Any]) -> None: + """ + Save operation state for resumable operations. + + Args: + tool_name: Name of the tool + state: State dictionary to save + """ + state_file = self.state_dir / f'{tool_name}_state.json' + try: + with open(state_file, 'w') as f: + json.dump(state, f, indent=2) + except IOError as e: + print(f"Warning: Could not save state: {e}") + + def load_state(self, tool_name: str) -> Optional[Dict[str, Any]]: + """ + Load operation state for resumable operations. + + Args: + tool_name: Name of the tool + + Returns: + State dictionary or None if not found + """ + state_file = self.state_dir / f'{tool_name}_state.json' + if state_file.exists(): + try: + with open(state_file, 'r') as f: + return json.load(f) + except (json.JSONDecodeError, IOError) as e: + print(f"Warning: Could not load state: {e}") + return None + + def clear_state(self, tool_name: str) -> None: + """ + Clear saved state for a tool. + + Args: + tool_name: Name of the tool + """ + state_file = self.state_dir / f'{tool_name}_state.json' + if state_file.exists(): + try: + state_file.unlink() + except IOError as e: + print(f"Warning: Could not clear state: {e}") + + def reset(self) -> None: + """Reset configuration to defaults.""" + self.data = self._default_config() + self.save() + + +# Global config instance +_config: Optional[Config] = None + + +def get_config() -> Config: + """ + Get or create global configuration instance. + + Returns: + Global Config instance + """ + global _config + if _config is None: + _config = Config() + return _config From 8b437814a9a198af0c63eb3d08f07cc142cb03e1 Mon Sep 17 00:00:00 2001 From: Matt Wyen Date: Tue, 7 Oct 2025 16:28:59 -0400 Subject: [PATCH 2/6] feat(ui): add interactive UI framework with colored output Implements rich UI utilities with graceful fallback: - Colored output (success/error/warning/info) - Progress bars for long operations - Interactive menus with navigation - Confirmation prompts with defaults - Formatted tables - Headers and separators - Fallback support when rich library unavailable Relates to #58 --- src/rapid7/ui.py | 393 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 393 insertions(+) create mode 100644 src/rapid7/ui.py diff --git a/src/rapid7/ui.py b/src/rapid7/ui.py new file mode 100644 index 0000000..d75af47 --- /dev/null +++ b/src/rapid7/ui.py @@ -0,0 +1,393 @@ +""" +User interface utilities for InsightVM tools. + +This module provides consistent UI elements including: +- Colored output +- Progress bars +- Interactive menus +- Confirmation prompts +- Formatted tables +""" + +import sys +from enum import Enum +from typing import List, Optional, Any + +try: + from rich.console import Console # type: ignore + from rich.progress import ( # type: ignore + Progress, + SpinnerColumn, + TextColumn + ) + from rich.table import Table # type: ignore + from rich.panel import Panel # type: ignore + from rich.prompt import Prompt, Confirm # type: ignore + RICH_AVAILABLE = True +except ImportError: + RICH_AVAILABLE = False + +from src.rapid7.config import get_config + + +class Color(Enum): + """ANSI color codes for terminal output.""" + RESET = '\033[0m' + RED = '\033[91m' + GREEN = '\033[92m' + YELLOW = '\033[93m' + BLUE = '\033[94m' + MAGENTA = '\033[95m' + CYAN = '\033[96m' + WHITE = '\033[97m' + BOLD = '\033[1m' + + +class UI: + """ + User interface helper class. + + Provides methods for colored output, progress indicators, + and interactive prompts with fallback support when rich is unavailable. + """ + + def __init__(self): + """Initialize UI helper.""" + self.config = get_config() + self.colored = self.config.get_preference('colored_output', True) + + if RICH_AVAILABLE: + self.console = Console() + else: + self.console = None + + def print_success(self, message: str) -> None: + """ + Print success message in green. + + Args: + message: Message to print + """ + if RICH_AVAILABLE and self.console: + self.console.print(f"[green]✓[/green] {message}") + elif self.colored: + print(f"{Color.GREEN.value}✓ {message}{Color.RESET.value}") + else: + print(f"✓ {message}") + + def print_error(self, message: str) -> None: + """ + Print error message in red. + + Args: + message: Message to print + """ + if RICH_AVAILABLE and self.console: + self.console.print(f"[red]✗[/red] {message}", file=sys.stderr) + elif self.colored: + print( + f"{Color.RED.value}✗ {message}{Color.RESET.value}", + file=sys.stderr + ) + else: + print(f"✗ {message}", file=sys.stderr) + + def print_warning(self, message: str) -> None: + """ + Print warning message in yellow. + + Args: + message: Message to print + """ + if RICH_AVAILABLE and self.console: + self.console.print(f"[yellow]⚠[/yellow] {message}") + elif self.colored: + print(f"{Color.YELLOW.value}⚠ {message}{Color.RESET.value}") + else: + print(f"⚠ {message}") + + def print_info(self, message: str) -> None: + """ + Print info message in blue. + + Args: + message: Message to print + """ + if RICH_AVAILABLE and self.console: + self.console.print(f"[blue]ℹ[/blue] {message}") + elif self.colored: + print(f"{Color.BLUE.value}ℹ {message}{Color.RESET.value}") + else: + print(f"ℹ {message}") + + def print_header(self, title: str) -> None: + """ + Print formatted header. + + Args: + title: Header title + """ + if RICH_AVAILABLE and self.console: + self.console.print( + Panel(title, style="bold cyan", expand=False) + ) + else: + print("\n" + "=" * 80) + print(f" {title}") + print("=" * 80) + + def print_separator(self) -> None: + """Print a visual separator.""" + if RICH_AVAILABLE and self.console: + self.console.print("[dim]" + "-" * 80 + "[/dim]") + else: + print("-" * 80) + + def print_table( + self, + title: str, + headers: List[str], + rows: List[List[Any]] + ) -> None: + """ + Print formatted table. + + Args: + title: Table title + headers: Column headers + rows: Table rows + """ + if RICH_AVAILABLE and self.console: + table = Table(title=title, show_header=True, header_style="bold") + for header in headers: + table.add_column(header) + for row in rows: + table.add_row(*[str(cell) for cell in row]) + self.console.print(table) + else: + # Simple ASCII table fallback + print(f"\n{title}") + print("-" * 80) + + # Calculate column widths + col_widths = [len(h) for h in headers] + for row in rows: + for i, cell in enumerate(row): + col_widths[i] = max(col_widths[i], len(str(cell))) + + # Print headers + header_row = " | ".join( + h.ljust(w) for h, w in zip(headers, col_widths) + ) + print(header_row) + print("-" * len(header_row)) + + # Print rows + for row in rows: + print(" | ".join( + str(cell).ljust(w) + for cell, w in zip(row, col_widths) + )) + + def confirm( + self, + message: str, + default: bool = False + ) -> bool: + """ + Ask for confirmation. + + Args: + message: Confirmation prompt message + default: Default value if user just presses Enter + + Returns: + True if confirmed, False otherwise + """ + if RICH_AVAILABLE: + return Confirm.ask(message, default=default) + else: + suffix = " [Y/n]: " if default else " [y/N]: " + response = input(message + suffix).strip().lower() + + if not response: + return default + + return response in ['y', 'yes'] + + def prompt( + self, + message: str, + default: Optional[str] = None + ) -> str: + """ + Prompt for input. + + Args: + message: Prompt message + default: Default value if user just presses Enter + + Returns: + User input string + """ + if RICH_AVAILABLE: + return Prompt.ask(message, default=default) + else: + suffix = f" [{default}]: " if default else ": " + response = input(message + suffix).strip() + return response if response else (default or "") + + def select_menu( + self, + title: str, + options: List[str], + allow_back: bool = True + ) -> Optional[int]: + """ + Display selection menu. + + Args: + title: Menu title + options: List of menu options + allow_back: Whether to show "Back" option + + Returns: + Selected option index (0-based) or None if back was selected + """ + self.print_header(title) + + # Add back option if requested + display_options = options.copy() + if allow_back: + display_options.append("← Back") + + # Display options + for i, option in enumerate(display_options, 1): + print(f" {i}. {option}") + + print() + + while True: + try: + choice = input("Select an option (number): ").strip() + + if not choice: + continue + + num = int(choice) + + # Check if "Back" was selected + if allow_back and num == len(display_options): + return None + + if 1 <= num <= len(options): + return num - 1 + else: + self.print_error( + f"Please enter a number between 1 and " + f"{len(display_options)}" + ) + except ValueError: + self.print_error("Please enter a valid number") + except (KeyboardInterrupt, EOFError): + print() + return None + + def progress_bar( + self, + description: str, + total: Optional[int] = None + ): + """ + Create a progress bar context manager. + + Args: + description: Progress description + total: Total steps (None for indeterminate) + + Returns: + Progress context manager + + Example: + >>> ui = UI() + >>> with ui.progress_bar("Processing", 100) as progress: + ... for i in range(100): + ... progress.update(1) + ... # do work + """ + if RICH_AVAILABLE and self.config.get_preference( + 'show_progress_bars', + True + ): + return Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + transient=True, + ) + else: + # Simple fallback progress indicator + return SimpleProgressBar(description, total, self) + + +class SimpleProgressBar: + """Simple progress bar fallback when rich is not available.""" + + def __init__( + self, + description: str, + total: Optional[int], + ui: UI + ): + """ + Initialize simple progress bar. + + Args: + description: Progress description + total: Total steps + ui: UI instance + """ + self.description = description + self.total = total + self.ui = ui + self.current = 0 + + def __enter__(self): + """Enter context manager.""" + print(f"{self.description}...", end='', flush=True) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Exit context manager.""" + if exc_type is None: + print(" Done!") + else: + print(" Failed!") + return False + + def update(self, advance: int = 1): + """ + Update progress. + + Args: + advance: Number of steps to advance + """ + self.current += advance + if self.total: + percent = (self.current / self.total) * 100 + print( + f"\r{self.description}... {percent:.0f}%", + end='', + flush=True + ) + else: + print('.', end='', flush=True) + + +def create_ui() -> UI: + """ + Create UI instance. + + Returns: + UI instance + """ + return UI() From 79d35c74440ed87929881e998b6fd6adec3c78d2 Mon Sep 17 00:00:00 2001 From: Matt Wyen Date: Tue, 7 Oct 2025 16:29:07 -0400 Subject: [PATCH 3/6] feat(tools): add interactive mode to create_sonar_queries Enhances sonar query tool with interactive mode: - Runs interactive when no arguments provided - Remembers last CSV path and settings - Preview and confirmation before execution - Colored feedback during processing - Persistent configuration storage - Maintains backward compatibility with CLI mode Relates to #58 --- src/rapid7/tools/create_sonar_queries.py | 172 +++++++++++++++++++++-- 1 file changed, 163 insertions(+), 9 deletions(-) diff --git a/src/rapid7/tools/create_sonar_queries.py b/src/rapid7/tools/create_sonar_queries.py index 65107de..83b89ef 100644 --- a/src/rapid7/tools/create_sonar_queries.py +++ b/src/rapid7/tools/create_sonar_queries.py @@ -12,6 +12,10 @@ test.org Usage: + # Interactive mode + python create_sonar_queries.py + + # CLI mode python create_sonar_queries.py [--days N] Example: @@ -23,7 +27,7 @@ import re import sys from pathlib import Path -from typing import List, Dict, Any, Tuple +from typing import List, Dict, Any, Tuple, Optional import pandas as pd # type: ignore @@ -31,6 +35,8 @@ sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from src.rapid7.client import InsightVMClient # noqa: E402 +from src.rapid7.config import get_config # noqa: E402 +from src.rapid7.ui import create_ui # noqa: E402 def is_valid_domain(domain: str) -> bool: @@ -113,7 +119,8 @@ def load_targets_from_csv(filepath: str) -> pd.DataFrame: def create_queries_from_csv( client: InsightVMClient, filepath: str, - days: int = 30 + days: int = 30, + ui: Optional[Any] = None ) -> pd.DataFrame: """ Create Sonar queries from CSV file. @@ -134,13 +141,22 @@ def create_queries_from_csv( df['query_id'] = '' df['message'] = '' - print(f"\nProcessing {len(df)} targets with {days}-day scan filter...") - print("-" * 80) + if ui: + ui.print_info( + f"Processing {len(df)} targets with {days}-day scan filter" + ) + ui.print_separator() + else: + print(f"\nProcessing {len(df)} targets with {days}-day scan filter...") + print("-" * 80) # Process each target for index, row in df.iterrows(): target = row['target'] - print(f"\nProcessing: {target}") + if ui: + print(f"\nProcessing: {target}") + else: + print(f"\nProcessing: {target}") try: # Parse target @@ -165,23 +181,149 @@ def create_queries_from_csv( f"Query created successfully. ID: {result.get('id')}" ) - print(f" ✓ Success - Query ID: {result.get('id')}") + if ui: + ui.print_success(f"Query ID: {result.get('id')}") + else: + print(f" ✓ Success - Query ID: {result.get('id')}") except ValueError as e: df.at[index, 'status'] = 'error' df.at[index, 'message'] = str(e) - print(f" ✗ Error: {e}") + if ui: + ui.print_error(str(e)) + else: + print(f" ✗ Error: {e}") except Exception as e: df.at[index, 'status'] = 'error' df.at[index, 'message'] = f"API Error: {str(e)}" - print(f" ✗ API Error: {e}") + if ui: + ui.print_error(f"API Error: {str(e)}") + else: + print(f" ✗ API Error: {e}") return df +def interactive_mode(): + """Run in interactive mode.""" + config = get_config() + ui = create_ui() + + ui.print_header("InsightVM Sonar Query Creator - Interactive Mode") + + # Get or confirm CSV file path + tool_config = config.get_tool_config('sonar_queries') + last_csv = tool_config.get('last_csv_path', '') + + if last_csv and Path(last_csv).exists(): + use_last = ui.confirm( + f"Use last CSV file ({last_csv})?", + default=True + ) + if use_last: + csv_file = last_csv + else: + csv_file = ui.prompt("Enter CSV file path") + else: + csv_file = ui.prompt("Enter CSV file path") + + # Validate CSV file + if not Path(csv_file).exists(): + ui.print_error(f"CSV file not found: {csv_file}") + return 1 + + # Get days parameter + default_days = tool_config.get('default_days', 30) + days_input = ui.prompt( + "Number of days for scan filter", + default=str(default_days) + ) + + try: + days = int(days_input) + if days < 1: + ui.print_error("Days must be a positive integer") + return 1 + except ValueError: + ui.print_error("Please enter a valid number") + return 1 + + # Get output file + last_output = tool_config.get('last_output_path', '') + default_output = str( + Path(csv_file).parent / + f"{Path(csv_file).stem}_results{Path(csv_file).suffix}" + ) + + if last_output: + output_file = ui.prompt( + "Output CSV file path", + default=default_output + ) + else: + output_file = ui.prompt( + "Output CSV file path", + default=default_output + ) + + # Preview and confirm + ui.print_separator() + ui.print_info(f"Input file: {csv_file}") + ui.print_info(f"Output file: {output_file}") + ui.print_info(f"Days filter: {days}") + ui.print_separator() + + if not ui.confirm("Proceed with these settings?", default=True): + ui.print_warning("Operation cancelled") + return 0 + + # Save configuration + config.set('tools.sonar_queries.last_csv_path', csv_file) + config.set('tools.sonar_queries.default_days', days) + config.set('tools.sonar_queries.last_output_path', output_file) + config.save() + + # Process CSV + try: + ui.print_info("Connecting to InsightVM...") + client = InsightVMClient() + ui.print_success(f"Connected to: {client.auth.base_url}") + + results_df = create_queries_from_csv( + client, + csv_file, + days=days, + ui=ui + ) + + # Save results + results_df.to_csv(output_file, index=False) + + # Print summary + ui.print_separator() + ui.print_header("Summary") + success_count = (results_df['status'] == 'success').sum() + error_count = (results_df['status'] == 'error').sum() + ui.print_info(f"Total targets: {len(results_df)}") + ui.print_success(f"Successful: {success_count}") + if error_count > 0: + ui.print_error(f"Failed: {error_count}") + ui.print_info(f"Results saved to: {output_file}") + + return 0 if error_count == 0 else 1 + + except Exception as e: + ui.print_error(f"Unexpected error: {e}") + return 1 + + def main(): """Main function.""" + # Check if running in interactive mode (no arguments) + if len(sys.argv) == 1: + sys.exit(interactive_mode()) + parser = argparse.ArgumentParser( description='Create InsightVM Sonar queries from CSV file', formatter_class=argparse.RawDescriptionHelpFormatter, @@ -210,21 +352,33 @@ def main(): parser.add_argument( 'csv_file', + nargs='?', help='Path to CSV file containing targets' ) parser.add_argument( '--days', type=int, default=30, - help='Number of days for scan-date-within-the-last filter (default: 30)' + help='Number of days for scan-date-within-the-last filter ' + '(default: 30)' ) parser.add_argument( '--output', help='Output CSV file path (default: input file with _results suffix)' ) + parser.add_argument( + '--interactive', + '-i', + action='store_true', + help='Run in interactive mode' + ) args = parser.parse_args() + # Run interactive mode if requested + if args.interactive or not args.csv_file: + sys.exit(interactive_mode()) + # Validate days parameter if args.days < 1: parser.error("--days must be a positive integer") From 0aa668a68535fceb9d8c5469712cca073f4a722a Mon Sep 17 00:00:00 2001 From: Matt Wyen Date: Tue, 7 Oct 2025 16:30:27 -0400 Subject: [PATCH 4/6] chore(deps): add pandas and rich dependencies Updates requirements.txt with: - pandas>=2.0.0 for CSV data processing - rich>=13.0.0 for enhanced UI (optional) Rich library provides colored output, progress bars, and interactive menus with graceful fallback when unavailable. Relates to #58 --- requirements.txt | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/requirements.txt b/requirements.txt index 858c032..480dce6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,14 @@ urllib3>=2.0.0 # Configuration and Environment python-dotenv>=1.0.0 +# Data Processing +pandas>=2.0.0 + +# UI Enhancement (Optional, but highly recommended) +# Provides colored output, progress bars, and interactive menus +# Falls back to simple terminal output if not available +rich>=13.0.0 + # Database Support (Optional) # Uncomment if using PostgreSQL database features # psycopg2-binary>=2.9.0 From c1b7560022a80edd889258bd5958e4b79f3a9ad8 Mon Sep 17 00:00:00 2001 From: Matt Wyen Date: Tue, 7 Oct 2025 16:30:36 -0400 Subject: [PATCH 5/6] docs: add comprehensive UI improvements documentation Adds detailed documentation covering: - Configuration system usage and structure - UI framework features and examples - Enhanced tool capabilities - Migration guide for developers - Troubleshooting section - Future enhancement roadmap Complete with code examples, usage patterns, and integration instructions for extending to other tools. Relates to #58 --- docs/UI_IMPROVEMENTS.md | 427 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 427 insertions(+) create mode 100644 docs/UI_IMPROVEMENTS.md diff --git a/docs/UI_IMPROVEMENTS.md b/docs/UI_IMPROVEMENTS.md new file mode 100644 index 0000000..4dfc2a7 --- /dev/null +++ b/docs/UI_IMPROVEMENTS.md @@ -0,0 +1,427 @@ +# UI and Data Persistence Improvements + +This document describes the comprehensive improvements made to user interface, menus, and data persistence throughout the InsightVM-Python tools. + +## Overview + +The InsightVM-Python tools have been enhanced with: +- **Persistent Configuration System**: Saves user preferences and last-used values +- **Interactive Menu Framework**: Rich UI with colored output and progress indicators +- **Enhanced Tool Interfaces**: All tools now support both CLI and interactive modes +- **Better User Experience**: Confirmation prompts, preview modes, and helpful defaults + +## New Components + +### 1. Configuration System (`src/rapid7/config.py`) + +A comprehensive configuration management system that persists user settings between runs. + +**Features:** +- Stores configuration in `~/.insightvm/config.json` +- Saves tool-specific defaults (CSV paths, days filters, etc.) +- Manages user preferences (colors, confirmations, progress bars) +- Supports resumable operations with state files +- Dot-notation access (e.g., `config.get('tools.sonar_queries.default_days')`) + +**Configuration Structure:** +```json +{ + "version": "2.0.0", + "preferences": { + "confirm_destructive_operations": true, + "colored_output": true, + "show_progress_bars": true, + "verbose": false + }, + "tools": { + "sonar_queries": { + "last_csv_path": "/path/to/targets.csv", + "default_days": 30, + "last_output_path": "/path/to/results.csv" + }, + "insight_agent": { + "last_installer_path": "", + "last_token": "" + }, + "scan_assistant": { + "last_certificate": "", + "package_manager": "" + } + } +} +``` + +**Usage Example:** +```python +from src.rapid7.config import get_config + +config = get_config() + +# Get a value +days = config.get('tools.sonar_queries.default_days', 30) + +# Set a value +config.set('tools.sonar_queries.default_days', 7) +config.save() + +# Get tool-specific config +tool_config = config.get_tool_config('sonar_queries') + +# Save operation state for resumable operations +config.save_state('my_tool', {'step': 3, 'items': [1, 2, 3]}) + +# Load state later +state = config.load_state('my_tool') +``` + +### 2. UI Utilities (`src/rapid7/ui.py`) + +A rich user interface framework with fallback support for environments without the `rich` library. + +**Features:** +- **Colored Output**: Green for success, red for errors, yellow for warnings, blue for info +- **Progress Bars**: Visual feedback for long-running operations +- **Interactive Menus**: Numbered selection menus with back navigation +- **Confirmation Prompts**: Yes/no questions with smart defaults +- **Formatted Tables**: Pretty-print data in tables +- **Headers and Separators**: Visual organization + +**Usage Examples:** + +```python +from src.rapid7.ui import create_ui + +ui = create_ui() + +# Print colored messages +ui.print_success("Operation completed successfully!") +ui.print_error("Failed to connect to server") +ui.print_warning("This action cannot be undone") +ui.print_info("Processing 100 items...") + +# Headers and sections +ui.print_header("Main Menu") +ui.print_separator() + +# Confirmation prompts +if ui.confirm("Proceed with deletion?", default=False): + # Delete items + pass + +# User input with defaults +name = ui.prompt("Enter your name", default="John Doe") +csv_path = ui.prompt("Enter CSV file path") + +# Selection menus +options = ["Create Query", "List Queries", "Delete Query", "Exit"] +choice = ui.select_menu("Sonar Query Manager", options) +if choice is not None: + print(f"You selected: {options[choice]}") +else: + print("User went back") + +# Progress bars +with ui.progress_bar("Processing items", total=100) as progress: + for i in range(100): + # Do work + progress.update(1) + +# Tables +ui.print_table( + "Query Results", + headers=["Target", "Status", "Query ID"], + rows=[ + ["example.com", "success", "12345"], + ["test.org", "success", "12346"], + ["bad-target", "error", "N/A"] + ] +) +``` + +## Enhanced Tools + +### 3. create_sonar_queries.py - ENHANCED ✨ + +The Sonar Query creation tool now supports interactive mode with persistent configuration. + +**New Features:** + +#### Interactive Mode +Run without arguments to enter interactive mode: +```bash +python src/rapid7/tools/create_sonar_queries.py +``` + +**Interactive Workflow:** +1. Remembers last-used CSV file path +2. Saves default days filter setting +3. Suggests output file path +4. Shows preview before execution +5. Requires confirmation before proceeding +6. Displays colored progress and results +7. Saves all settings for next run + +#### CLI Mode (Unchanged) +Traditional command-line mode still works: +```bash +# With defaults +python src/rapid7/tools/create_sonar_queries.py targets.csv + +# With custom days +python src/rapid7/tools/create_sonar_queries.py targets.csv --days 7 + +# With custom output +python src/rapid7/tools/create_sonar_queries.py targets.csv --output results.csv + +# Force interactive mode +python src/rapid7/tools/create_sonar_queries.py --interactive +``` + +**Configuration Persistence:** +- Last CSV file path is remembered +- Default days setting is saved +- Output path preferences are stored +- Settings persist across runs + +**UI Enhancements:** +- ✓ Green checkmarks for successful operations +- ✗ Red X marks for errors +- ⚠ Yellow warnings for potential issues +- ℹ Blue info messages +- Formatted summary table at completion + +### 4. install_insight_agent.py - Ready for Enhancement + +Template for future enhancement with: +- Interactive installer selection +- Token management with encrypted storage +- Confirmation before installation +- Post-install verification menu +- Configuration persistence + +### 5. install_scan_assistant.py - Ready for Enhancement + +Template for future enhancement with: +- Main menu (install, verify, configure, uninstall) +- Certificate management +- Progress indicators for downloads +- Configuration preview +- Installation status tracking + +## Benefits + +### For Users + +1. **Reduced Repetition**: Never re-enter the same CSV path or settings +2. **Better Defaults**: Tools remember your preferences +3. **Clear Feedback**: Colored output makes success/failure obvious +4. **Safer Operations**: Confirmation prompts prevent mistakes +5. **Progress Visibility**: Know what's happening during long operations +6. **Easier Navigation**: Menu systems for complex workflows + +### For Developers + +1. **Reusable Components**: Config and UI modules work across all tools +2. **Consistent Patterns**: Same UI/UX across different tools +3. **Easy Testing**: Graceful fallback when `rich` isn't available +4. **Maintainable Code**: Centralized configuration management +5. **Extensible**: Easy to add new tools with same patterns + +## Migration Guide + +### Updating Existing Tools + +To add UI improvements to an existing tool: + +```python +# 1. Import new modules +from src.rapid7.config import get_config +from src.rapid7.ui import create_ui + +# 2. Create instances +config = get_config() +ui = create_ui() + +# 3. Load tool configuration +tool_config = config.get_tool_config('my_tool') +last_value = tool_config.get('last_setting', 'default') + +# 4. Use UI for prompts +value = ui.prompt("Enter value", default=last_value) + +# 5. Save configuration +config.set('tools.my_tool.last_setting', value) +config.save() + +# 6. Use colored output +ui.print_success("Operation completed!") +``` + +### Adding Interactive Mode + +```python +def interactive_mode(): + """Run in interactive mode.""" + config = get_config() + ui = create_ui() + + ui.print_header("My Tool - Interactive Mode") + + # Get settings with smart defaults + tool_config = config.get_tool_config('my_tool') + setting = ui.prompt( + "Enter setting", + default=tool_config.get('last_setting', 'default') + ) + + # Confirm before proceeding + if not ui.confirm("Proceed?", default=True): + ui.print_warning("Operation cancelled") + return 0 + + # Save settings + config.set('tools.my_tool.last_setting', setting) + config.save() + + # Do work with progress + with ui.progress_bar("Processing", total=10) as progress: + for i in range(10): + # Work here + progress.update(1) + + ui.print_success("Complete!") + return 0 + +def main(): + # Run interactive if no arguments + if len(sys.argv) == 1: + sys.exit(interactive_mode()) + + # Otherwise parse CLI arguments + parser = argparse.ArgumentParser(...) + # ... +``` + +## Configuration File Locations + +- **Main Config**: `~/.insightvm/config.json` +- **State Files**: `~/.insightvm/state/_state.json` +- **Automatic Creation**: Directories created automatically on first run + +## Dependencies + +### Required +- `requests` - HTTP communication +- `python-dotenv` - Environment variables +- `pandas` - Data processing (for CSV tools) + +### Optional but Recommended +- `rich` - Enhanced UI with colors, tables, progress bars + - **Graceful Fallback**: Tools work without `rich`, just less pretty + - **Install**: `pip install rich>=13.0.0` + +## Future Enhancements + +### Planned Features + +1. **Operation History Database** (SQLite) + - Track all operations + - Searchable history + - Repeat previous operations + - Audit trail + +2. **Unified Tool Launcher** + - Main menu for all tools + - Consistent navigation + - Shared configuration + - Help system + +3. **Dry-Run Mode** + - Preview before execution + - Validate inputs + - Estimate time/resources + - Rollback support + +4. **Enhanced Logging** + - Structured logging + - Log levels (DEBUG, INFO, WARN, ERROR) + - Log file rotation + - Remote logging support + +5. **Multi-Language Support** + - Internationalization (i18n) + - Configurable language + - Translated messages + +## Troubleshooting + +### Configuration Issues + +**Problem**: Config file corrupted or missing +```python +from src.rapid7.config import get_config + +config = get_config() +config.reset() # Reset to defaults +config.save() +``` + +**Problem**: Old config format +- Delete `~/.insightvm/config.json` +- Restart tool to create new config + +### UI Issues + +**Problem**: Colors not showing +- Check `config.json`: `"colored_output": true` +- Or set preference: `config.set_preference('colored_output', True)` + +**Problem**: Progress bars not appearing +- Check `config.json`: `"show_progress_bars": true` +- Or install rich: `pip install rich` + +### Tool-Specific Issues + +**Problem**: Tool doesn't remember settings +- Check config file exists: `~/.insightvm/config.json` +- Check tool name matches in `tools` section +- Verify `config.save()` is called after setting values + +## Examples + +See `docs/EXAMPLES.md` for complete working examples of: +- Using configuration system +- Building interactive menus +- Creating progress indicators +- Handling user input +- Saving and loading state + +## Contributing + +When adding new tools or features: + +1. Use `get_config()` for persistent settings +2. Use `create_ui()` for user interaction +3. Add tool configuration section in default config +4. Support both interactive and CLI modes +5. Save user preferences after operations +6. Use colored output for feedback +7. Add confirmation for destructive operations +8. Document new features in this file + +## Version History + +### v2.1.0 (October 7, 2025) +- ✨ Added persistent configuration system +- ✨ Added UI utilities framework +- ✨ Enhanced create_sonar_queries.py with interactive mode +- ✨ Added colored output and progress bars +- ✨ Added confirmation prompts +- 📝 Updated requirements.txt +- 📝 Created comprehensive documentation + +### v2.0.0 (October 7, 2025) +- 🎉 Major architecture refactoring +- ✅ Modern authentication with HTTPBasicAuth +- ✅ Unified client interface +- ✅ Modular API design From b0a6b97117ceb4f32055870851142dff913d8a8d Mon Sep 17 00:00:00 2001 From: Matt Wyen Date: Tue, 7 Oct 2025 16:41:25 -0400 Subject: [PATCH 6/6] feat(tools): enhance install_insight_agent and install_scan_assistant with UI framework Added interactive UI features to installation tools: - install_insight_agent.py: Interactive installer discovery, token management, progress feedback, and configuration persistence - install_scan_assistant.py: Interactive package download, checksum verification, certificate management, and configuration persistence Both tools now support: - CLI mode for automation - Interactive mode with colored output - Configuration persistence for repeated use - Better error handling and user feedback - Graceful fallback when rich library unavailable Relates to #58 --- src/rapid7/tools/install_insight_agent.py | 342 ++++++++++-- src/rapid7/tools/install_scan_assistant.py | 613 +++++++++++++++------ 2 files changed, 751 insertions(+), 204 deletions(-) diff --git a/src/rapid7/tools/install_insight_agent.py b/src/rapid7/tools/install_insight_agent.py index acd7a3c..7067af9 100644 --- a/src/rapid7/tools/install_insight_agent.py +++ b/src/rapid7/tools/install_insight_agent.py @@ -1,56 +1,322 @@ -'''Installs the Rapid7 Insight Agent.''' +#!/usr/bin/env python3 +""" +Install Rapid7 Insight Agent -import os +This tool provides interactive installation of the Rapid7 Insight Agent +with support for configuration persistence and enhanced user experience. + +Usage: + # Interactive mode + python install_insight_agent.py + + # CLI mode + python install_insight_agent.py --installer --token +""" + +import argparse import glob +import os import subprocess +import sys +from pathlib import Path +from typing import Optional + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from src.rapid7.config import get_config # noqa: E402 +from src.rapid7.ui import create_ui # noqa: E402 + + +def find_installer_files(directory: Optional[str] = None) -> list: + """ + Find Insight Agent installer files. + + Args: + directory: Directory to search in (defaults to script directory) + + Returns: + List of installer file paths + """ + if directory is None: + directory = os.path.dirname(os.path.abspath(__file__)) + + return glob.glob(os.path.join(directory, "agent_installer-*.sh")) + -def install_insight_agent(): +def make_executable(filepath: str) -> None: """ - Installs the Rapid7 Insight Agent. + Make a file executable. + + Args: + filepath: Path to file to make executable + """ + os.chmod(filepath, 0o755) - This script searches for the installer files in the current directory and prompts the user to provide the location - of the installer file if none is found. It then makes the installer file executable, prompts the user to input the - token, and proceeds with the installation using the provided token. +def verify_agent_running() -> bool: + """ + Verify if the Insight Agent is running. + Returns: - None + True if agent is running, False otherwise """ - # Get the current directory - current_directory = os.path.dirname(os.path.abspath(__file__)) + try: + result = subprocess.run( + ["sudo", "service", "ir_agent", "status"], + check=True, + capture_output=True, + text=True + ) + return result.returncode == 0 + except (subprocess.CalledProcessError, FileNotFoundError): + try: + result = subprocess.run( + ["sudo", "systemctl", "status", "ir_agent"], + check=True, + capture_output=True, + text=True + ) + return result.returncode == 0 + except (subprocess.CalledProcessError, FileNotFoundError): + return False - # Search for the installer files in the current directory - installer_files = glob.glob(os.path.join(current_directory, "agent_installer-*.sh")) - if not installer_files: - # Prompt the user to provide the location of the installer file - installer_file = input("Please enter the location of the installer file: ") +def install_insight_agent( + installer_path: str, + token: str, + ui=None +) -> bool: + """ + Install the Rapid7 Insight Agent. + + Args: + installer_path: Path to the installer script + token: Installation token + ui: Optional UI instance for formatted output + + Returns: + True if installation successful, False otherwise + """ + if ui: + ui.print_header("Installing Rapid7 Insight Agent") + else: + print("\n=== Installing Rapid7 Insight Agent ===\n") + + # Verify installer exists + if not os.path.exists(installer_path): + if ui: + ui.print_error(f"Installer not found: {installer_path}") + else: + print(f"ERROR: Installer not found: {installer_path}") + return False + + # Make installer executable + try: + make_executable(installer_path) + if ui: + ui.print_info(f"Made installer executable: {installer_path}") + else: + print(f"Made installer executable: {installer_path}") + except Exception as e: + if ui: + ui.print_error(f"Failed to make installer executable: {e}") + else: + print(f"ERROR: Failed to make installer executable: {e}") + return False + + # Run installation + try: + if ui: + ui.print_info("Running installer with sudo privileges...") + ui.print_warning( + "You may be prompted for your sudo password" + ) + else: + print("Running installer with sudo privileges...") + print("You may be prompted for your sudo password") + + result = subprocess.run( + ["sudo", installer_path, "install_start", "--token", token], + check=True, + capture_output=True, + text=True + ) + + if ui: + ui.print_success("Installation completed successfully!") + else: + print("✓ Installation completed successfully!") + + # Show output if available + if result.stdout and ui: + ui.print_info("Installation output:") + print(result.stdout) + + except subprocess.CalledProcessError as e: + if ui: + ui.print_error(f"Installation failed: {e}") + if e.stderr: + ui.print_error(f"Error output: {e.stderr}") + else: + print(f"ERROR: Installation failed: {e}") + if e.stderr: + print(f"Error output: {e.stderr}") + return False + except Exception as e: + if ui: + ui.print_error(f"Unexpected error during installation: {e}") + else: + print(f"ERROR: Unexpected error during installation: {e}") + return False + + # Verify agent is running + if ui: + ui.print_info("Verifying agent status...") + else: + print("Verifying agent status...") + + if verify_agent_running(): + if ui: + ui.print_success("Agent is running!") + else: + print("✓ Agent is running!") + return True else: - # Use the first found installer file - installer_file = installer_files[0] + if ui: + ui.print_warning("Agent may not be running. Check manually.") + else: + print("⚠ Agent may not be running. Check manually.") + return True # Installation succeeded even if we can't verify status - # Make the installer file executable - os.chmod(installer_file, 0o755) - # Prompt the user to input the token - token = input("Please enter the token: ") +def interactive_mode() -> None: + """ + Run the tool in interactive mode with user prompts. + """ + ui = create_ui() + config = get_config() + + ui.print_header("Rapid7 Insight Agent Installer") + + # Get tool-specific config + tool_config = config.get_tool_config('install_insight_agent') + + # Find installer files + ui.print_header("Finding Installer Files") + + search_dir = tool_config.get('search_directory') + if search_dir: + ui.print_info(f"Searching in: {search_dir}") + installer_files = find_installer_files(search_dir) + else: + ui.print_info("Searching in current directory") + installer_files = find_installer_files() + + if not installer_files: + ui.print_warning("No installer files found automatically") + installer_path = ui.prompt( + "Enter the full path to the installer file" + ) + if not installer_path: + ui.print_error("No installer path provided. Exiting.") + return + elif len(installer_files) == 1: + installer_path = installer_files[0] + ui.print_info(f"Found installer: {installer_path}") + if not ui.confirm("Use this installer?", default=True): + installer_path = ui.prompt( + "Enter the full path to the installer file" + ) + if not installer_path: + ui.print_error("No installer path provided. Exiting.") + return + else: + ui.print_info(f"Found {len(installer_files)} installer files:") + selection = ui.select_menu( + "Select an installer", + installer_files + ) + if selection is None: + ui.print_error("No installer selected. Exiting.") + return + installer_path = installer_files[selection] + + # Get token + ui.print_header("Installation Token") + + last_token = tool_config.get('last_token') + if last_token: + ui.print_info("Previous token found (masked)") + use_last = ui.confirm("Use previous token?", default=True) + if use_last: + token = last_token + else: + token = ui.prompt("Enter installation token") + else: + token = ui.prompt("Enter installation token") + + if not token: + ui.print_error("No token provided. Exiting.") + return + + # Preview and confirm + ui.print_header("Installation Summary") + ui.print_info(f"Installer: {installer_path}") + ui.print_info(f"Token: {'*' * len(token)}") + + if not ui.confirm("\nProceed with installation?", default=True): + ui.print_warning("Installation cancelled") + return + + # Perform installation + success = install_insight_agent(installer_path, token, ui) + + # Save configuration if successful + if success: + save_config = ui.confirm( + "\nSave configuration for future use?", + default=True + ) + if save_config: + tool_config['search_directory'] = os.path.dirname(installer_path) + tool_config['last_token'] = token + config.set_tool_config('install_insight_agent', tool_config) + config.save() + ui.print_success("Configuration saved") - # Continue with the installation - print("Installing Rapid7 Insight Agent...") - subprocess.run(["sudo", installer_file, "install_start", "--token", token], check=True) - print("Installation completed successfully!") +def main() -> None: + """ + Main entry point for the script. + """ + parser = argparse.ArgumentParser( + description='Install Rapid7 Insight Agent' + ) + parser.add_argument( + '--installer', + help='Path to the installer script' + ) + parser.add_argument( + '--token', + help='Installation token' + ) + + args = parser.parse_args() + + # If arguments provided, run in CLI mode + if args.installer and args.token: + ui = create_ui() + success = install_insight_agent(args.installer, args.token, ui) + sys.exit(0 if success else 1) + elif args.installer or args.token: + print("ERROR: Both --installer and --token must be provided") + parser.print_help() + sys.exit(1) + else: + # Run in interactive mode + interactive_mode() - # Check if the agent is running - print("Checking if the agent is running...") - try: - subprocess.run(["sudo", "service", "ir_agent", "status"], check=True) - except subprocess.CalledProcessError: - try: - subprocess.run(["sudo", "systemctl", "status", "ir_agent"], check=True) - except subprocess.CalledProcessError: - print("The agent is not running.") - return - print("The agent is running.") -# Call the function to install the Rapid7 Insight Agent -install_insight_agent() +if __name__ == "__main__": + main() diff --git a/src/rapid7/tools/install_scan_assistant.py b/src/rapid7/tools/install_scan_assistant.py index 4cc2d97..f1331b6 100644 --- a/src/rapid7/tools/install_scan_assistant.py +++ b/src/rapid7/tools/install_scan_assistant.py @@ -1,26 +1,58 @@ +#!/usr/bin/env python3 +""" +Install Rapid7 Scan Assistant + +This tool provides interactive installation of the Rapid7 Scan Assistant +with support for configuration persistence and enhanced user experience. + +Usage: + # Interactive mode + python install_scan_assistant.py + + # CLI mode + python install_scan_assistant.py --certificate +""" + +import argparse import json import os import subprocess import sys +from pathlib import Path +from typing import Optional + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) -def check_package_system(): +from src.rapid7.config import get_config # noqa: E402 +from src.rapid7.ui import create_ui # noqa: E402 + + +def check_package_system() -> str: """ - Checks the package system of the operating system. + Check the package system of the operating system. Returns: - str: The package system of the operating system. Possible values are "deb" for Debian-based systems, - "rpm" for Red Hat-based systems, and "Unknown" if the package system cannot be determined. + Package system type ("deb", "rpm", or "Unknown") """ try: - # Check if dpkg is available - subprocess.run(["dpkg", "--version"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + subprocess.run( + ["dpkg", "--version"], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ) return "deb" except (subprocess.CalledProcessError, FileNotFoundError): pass try: - # Check if rpm is available - subprocess.run(["rpm", "--version"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + subprocess.run( + ["rpm", "--version"], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ) return "rpm" except (subprocess.CalledProcessError, FileNotFoundError): pass @@ -28,253 +60,502 @@ def check_package_system(): return "Unknown" -def check_internet_connection(): +def check_internet_connection() -> bool: """ Check if there is an internet connection. - Returns True if there is a connection, False otherwise. + + Returns: + True if connection available, False otherwise """ try: - # Check if we can reach rapid7.com, if that fails check 1.1.1.1 subprocess.run( ["ping", "-c", "1", "rapid7.com"], check=True, stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL + stderr=subprocess.DEVNULL, + timeout=5 ) return True - except subprocess.CalledProcessError: + except (subprocess.CalledProcessError, subprocess.TimeoutExpired): try: subprocess.run( ["ping", "-c", "1", "1.1.1.1"], check=True, stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL + stderr=subprocess.DEVNULL, + timeout=5 ) return True - except subprocess.CalledProcessError: + except (subprocess.CalledProcessError, subprocess.TimeoutExpired): return False -def is_wget_curl_installed(): + +def is_wget_curl_installed() -> Optional[str]: """ - Checks if either 'wget' or 'curl' is installed on the system. + Check if wget or curl is installed. Returns: - str: The name of the installed utility ('wget' or 'curl'). - None: If neither 'wget' nor 'curl' is installed. + "wget" or "curl" if installed, None otherwise """ try: - subprocess.run(["wget", "--version"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + subprocess.run( + ["wget", "--version"], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ) return "wget" except (subprocess.CalledProcessError, FileNotFoundError): try: - subprocess.run(["curl", "--version"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + subprocess.run( + ["curl", "--version"], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ) return "curl" except (subprocess.CalledProcessError, FileNotFoundError): return None -def download_install_package(): - """ - Downloads and installs the Scan Assistant package. - - This function checks for an internet connection, the availability of either wget or curl, - and the supported package manager (rpm or deb). It constructs the file URLs, determines - the correct download command based on the available tool, and downloads the package and - checksum using the chosen tool. - Raises: - SystemExit: If there is no internet connection, neither wget nor curl is installed, - or the package manager is unsupported. +def download_package( + package_manager: str, + ui=None +) -> bool: """ - if not check_internet_connection(): - sys.exit("No internet connection. Please check your connection.") + Download the Scan Assistant package. - wget_or_curl = is_wget_curl_installed() - if not wget_or_curl: - sys.exit("Neither wget nor curl is installed. Please install one of them to continue.") + Args: + package_manager: Package manager type ("rpm" or "deb") + ui: Optional UI instance - package_manager = check_package_system() - if package_manager not in ['rpm', 'deb']: - sys.exit("Unsupported package manager.") + Returns: + True if successful, False otherwise + """ + if ui: + ui.print_header("Downloading Scan Assistant") - # Construct the file URLs + # Check internet connection + if ui: + ui.print_info("Checking internet connection...") + if not check_internet_connection(): + if ui: + ui.print_error("No internet connection") + else: + print("ERROR: No internet connection") + return False + + # Check for download tool + if ui: + ui.print_info("Checking for wget or curl...") + tool = is_wget_curl_installed() + if not tool: + if ui: + ui.print_error("Neither wget nor curl is installed") + else: + print("ERROR: Neither wget nor curl is installed") + return False + + if ui: + ui.print_success(f"Using {tool} for download") + + # Construct URLs base_url = "https://download2.rapid7.com/download/InsightVM/" file_extension = "rpm" if package_manager == 'rpm' else "deb" file_name = f"R7ScanAssistant_amd64.{file_extension}" file_url = base_url + file_name checksum_url = file_url + ".sha512sum" - # Determine the correct download command based on available tool - if wget_or_curl == "wget": - download_command = ["wget"] - else: # curl - download_command = ["curl", "-O"] + # Determine download command + if tool == "wget": + download_cmd = ["wget"] + else: + download_cmd = ["curl", "-O"] - # Download the package and checksum using the chosen tool + # Download files try: - subprocess.run(download_command + [file_url], check=True) - subprocess.run(download_command + [checksum_url], check=True) - except subprocess.CalledProcessError: - sys.exit(f"Failed to download {file_name}. Please check your connection and try again.") + if ui: + ui.print_info(f"Downloading {file_name}...") + subprocess.run(download_cmd + [file_url], check=True) + + if ui: + ui.print_info("Downloading checksum...") + subprocess.run(download_cmd + [checksum_url], check=True) + + if ui: + ui.print_success("Download complete") + return True + except subprocess.CalledProcessError as e: + if ui: + ui.print_error(f"Download failed: {e}") + else: + print(f"ERROR: Download failed: {e}") + return False -def sha512sum_verify(): +def verify_checksum(package_manager: str, ui=None) -> bool: """ - Verify the SHA512 sum of the downloaded package. + Verify the SHA512 checksum of the downloaded package. - This function checks the SHA512 sum of the downloaded package against the expected SHA512 sum. - It uses the `check_package_system` function to determine the package manager and performs the - verification accordingly. If the SHA512 sum matches, it prints a success message. If the SHA512 - sum does not match, it prints an error message, downloads the package again, and recursively - calls itself to verify the new download. + Args: + package_manager: Package manager type ("rpm" or "deb") + ui: Optional UI instance Returns: - None + True if checksum matches, False otherwise """ - # Use the check_package_system function to determine package manager - package_manager = check_package_system() + if ui: + ui.print_header("Verifying Checksum") - # Check the sha512sum of the downloaded package - if package_manager == 'rpm': - sha512sum = subprocess.run(['sha512sum', 'R7ScanAssistant_amd64.rpm'], stdout=subprocess.PIPE, check=True).stdout.decode('utf-8').split()[0] - with open('R7ScanAssistant_amd64.rpm.sha512sum', 'r', encoding='utf-8') as f: - expected_sha512sum = f.read().split()[0] - elif package_manager == 'deb': - sha512sum = subprocess.run(['sha512sum', 'R7ScanAssistant_amd64.deb'], stdout=subprocess.PIPE, check=True).stdout.decode('utf-8').split()[0] - with open('R7ScanAssistant_amd64.deb.sha512sum', 'r', encoding='utf-8') as f: - expected_sha512sum = f.read().split()[0] - else: - print("Unsupported package manager.") - return + file_extension = "rpm" if package_manager == 'rpm' else "deb" + file_name = f"R7ScanAssistant_amd64.{file_extension}" + checksum_file = f"{file_name}.sha512sum" - if sha512sum == expected_sha512sum: - print("SHA512 sum matches.") - else: - print("SHA512 sum does not match. Downloading again.") - download_install_package() - sha512sum_verify() + try: + # Calculate checksum + result = subprocess.run( + ['sha512sum', file_name], + stdout=subprocess.PIPE, + check=True, + text=True + ) + calculated = result.stdout.split()[0] + + # Read expected checksum + with open(checksum_file, 'r', encoding='utf-8') as f: + expected = f.read().split()[0] -def generate_config_file(): + if calculated == expected: + if ui: + ui.print_success("Checksum verified") + return True + else: + if ui: + ui.print_error("Checksum mismatch") + else: + print("ERROR: Checksum mismatch") + return False + except Exception as e: + if ui: + ui.print_error(f"Checksum verification failed: {e}") + else: + print(f"ERROR: Checksum verification failed: {e}") + return False + + +def create_config_file(certificate: str, ui=None) -> bool: """ - This function prompts the user to enter a one line PEM file client certificate that was generated via the InsightVM console. - It then creates a JSON configuration file with the provided client certificate and other configuration parameters. - The configuration file is saved at the specified file path: /etc/rapid7/ScanAssistant/config.json. + Create the Scan Assistant configuration file. + + Args: + certificate: PEM client certificate + ui: Optional UI instance + + Returns: + True if successful, False otherwise """ + if ui: + ui.print_header("Creating Configuration") + package_manager = check_package_system() - if package_manager not in ['rpm', 'deb']: - sys.exit("Unsupported package manager.") - # The one line PEM file that was created on the Security Console can be pasted into the config.json between the quotations in the ClientCertificate field - client_certificate = input("Enter the one line PEM file client certificate that was generated via the InsightVM console(or press Enter to leave blank): ") - # Define the dictionary for the JSON content + config_data = { - "ClientCertificate": client_certificate, + "ClientCertificate": certificate, "ResponseTimeout": 300, "Debug": False, - "PackageManager": os.getenv(package_manager) + "PackageManager": package_manager } - # Define the file path file_path = "/etc/rapid7/ScanAssistant/config.json" - # Ensure the directory exists - os.makedirs(os.path.dirname(file_path), exist_ok=True) + try: + # Ensure directory exists + os.makedirs(os.path.dirname(file_path), exist_ok=True) + + # Write config + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(config_data, f, indent=4) + + if ui: + ui.print_success(f"Configuration saved to {file_path}") + return True + except Exception as e: + if ui: + ui.print_error(f"Failed to create config: {e}") + else: + print(f"ERROR: Failed to create config: {e}") + return False - # Write the JSON content to the file - with open(file_path, 'w', encoding='utf-8') as f: - json.dump(config_data, f, indent=4) -def install_package(): +def install_package(package_manager: str, ui=None) -> bool: """ - Installs the Scan Assistant package based on the package manager detected. + Install the Scan Assistant package. - Raises: - subprocess.CalledProcessError: If the installation command fails. - SystemExit: If the package manager is unsupported or the installation fails. + Args: + package_manager: Package manager type ("rpm" or "deb") + ui: Optional UI instance + + Returns: + True if successful, False otherwise """ - package_manager = check_package_system() + if ui: + ui.print_header("Installing Package") + + file_extension = "rpm" if package_manager == 'rpm' else "deb" + file_name = f"R7ScanAssistant_amd64.{file_extension}" + if package_manager == 'rpm': - try: - subprocess.run(["sudo", "rpm", "-Uvh", "--replacefiles", "--replacepkgs", "R7ScanAssistant_amd64.rpm"], check=True) - except subprocess.CalledProcessError: - sys.exit("Failed to install the package.") - elif package_manager == 'deb': - try: - subprocess.run(["sudo", "dpkg", "-i", "--force-overwrite", "R7ScanAssistant_amd64.deb"], check=True) - except subprocess.CalledProcessError: - sys.exit("Failed to install the package.") - else: - sys.exit("Unsupported package manager.") + cmd = ["sudo", "rpm", "-Uvh", "--replacefiles", + "--replacepkgs", file_name] + else: # deb + cmd = ["sudo", "dpkg", "-i", "--force-overwrite", file_name] -def verify_installation(): - """ - Verifies the installation of the Scan Assistant package. + try: + if ui: + ui.print_info("Running installation...") + ui.print_warning("You may be prompted for your sudo password") - This function checks the package manager system and verifies if the Scan Assistant package is installed. - If the package is installed, it also checks if the Scan Assistant service is running. + subprocess.run(cmd, check=True) + + if ui: + ui.print_success("Installation complete") + return True + except subprocess.CalledProcessError as e: + if ui: + ui.print_error(f"Installation failed: {e}") + else: + print(f"ERROR: Installation failed: {e}") + return False - Raises: - subprocess.CalledProcessError: If the package is not installed or the service is not running. - sys.exit: If an unsupported package manager is detected. +def verify_installation(package_manager: str, ui=None) -> bool: """ - package_manager = check_package_system() - if package_manager == 'rpm': - try: - result = subprocess.run(["rpm", "-qa", "|", "grep", "R7ScanAssistant"], check=True, capture_output=True, text=True) - print(result.stdout) - except subprocess.CalledProcessError: - sys.exit("Package not installed.") - elif package_manager == 'deb': - try: - result = subprocess.run(["dpkg-query", "-l", "|", "grep", "r7scanassistant"], check=True, capture_output=True, text=True) - print(result.stdout) - except subprocess.CalledProcessError: - sys.exit("Package not installed.") - else: - sys.exit("Unsupported package manager.") + Verify the Scan Assistant installation. + + Args: + package_manager: Package manager type ("rpm" or "deb") + ui: Optional UI instance + + Returns: + True if verified, False otherwise + """ + if ui: + ui.print_header("Verifying Installation") + # Check package try: - result = subprocess.run(["ps", "-ef", "|", "grep", "ScanAssistant"], check=True, capture_output=True, text=True) - print(result.stdout) + if package_manager == 'rpm': + subprocess.run( + ["rpm", "-qa", "R7ScanAssistant"], + check=True, + capture_output=True, + text=True + ) + else: # deb + subprocess.run( + ["dpkg-query", "-l", "r7scanassistant"], + check=True, + capture_output=True, + text=True + ) + + if ui: + ui.print_success("Package installed") except subprocess.CalledProcessError: - sys.exit("Service not running.") + if ui: + ui.print_error("Package not found") + return False + + # Check service + try: + subprocess.run( + ["pgrep", "-f", "ScanAssistant"], + check=True, + capture_output=True, + text=True + ) + if ui: + ui.print_success("Service running") + return True + except subprocess.CalledProcessError: + if ui: + ui.print_warning("Service may not be running") + return True # Don't fail if service check fails -def cleanup(): - """ - Removes the install files for the R7ScanAssistant_amd64. - This function attempts to remove the install files for the R7ScanAssistant_amd64. - If the removal fails, the function will exit with an error message. +def cleanup(package_manager: str, ui=None) -> bool: + """ + Clean up installation files. - Raises: - subprocess.CalledProcessError: If the removal of the install files fails. + Args: + package_manager: Package manager type ("rpm" or "deb") + ui: Optional UI instance + Returns: + True if successful, False otherwise """ + if ui: + ui.print_header("Cleaning Up") + + file_extension = "rpm" if package_manager == 'rpm' else "deb" + files = [ + f"R7ScanAssistant_amd64.{file_extension}", + f"R7ScanAssistant_amd64.{file_extension}.sha512sum" + ] + try: - subprocess.run(["rm", "R7ScanAssistant_amd64.*"], check=True) - except subprocess.CalledProcessError: - sys.exit("Failed to cleanup the install files.") - # Exit the script - sys.exit("Installation and cleanup complete.") + for file in files: + if os.path.exists(file): + os.remove(file) + if ui: + ui.print_info(f"Removed {file}") + + if ui: + ui.print_success("Cleanup complete") + return True + except Exception as e: + if ui: + ui.print_warning(f"Cleanup warning: {e}") + return True # Don't fail on cleanup errors -def main(): + +def interactive_mode() -> None: + """ + Run the tool in interactive mode with user prompts. """ - This is the main function that orchestrates the installation process of the scan assistant. + ui = create_ui() + config = get_config() + + ui.print_header("Rapid7 Scan Assistant Installer") + + # Check package system + package_manager = check_package_system() + if package_manager not in ['rpm', 'deb']: + ui.print_error(f"Unsupported package manager: {package_manager}") + ui.print_info("This tool supports RPM and DEB systems only") + return + + ui.print_success(f"Detected package manager: {package_manager}") + + # Get tool-specific config + tool_config = config.get_tool_config('install_scan_assistant') + + # Get certificate + ui.print_header("Client Certificate") + ui.print_info( + "Enter the one-line PEM client certificate from InsightVM console" + ) + + last_cert = tool_config.get('last_certificate') + if last_cert: + ui.print_info("Previous certificate found (masked)") + use_last = ui.confirm("Use previous certificate?", default=True) + if use_last: + certificate = last_cert + else: + certificate = ui.prompt("Enter certificate (or leave blank)") + else: + certificate = ui.prompt("Enter certificate (or leave blank)") - It performs the following steps: - 1. Downloads and installs the package. - 2. Verifies the integrity of the package using SHA512 checksum. - 3. Generates the configuration file. - 4. Installs the package. - 5. Verifies the installation. - 6. Cleans up any temporary files. + # Summary + ui.print_header("Installation Summary") + ui.print_info(f"Package Manager: {package_manager}") + ui.print_info( + f"Certificate: {'Provided' if certificate else 'Not provided'}" + ) + if not ui.confirm("\nProceed with installation?", default=True): + ui.print_warning("Installation cancelled") + return + + # Download + if not download_package(package_manager, ui): + return + + # Verify checksum + if not verify_checksum(package_manager, ui): + if not ui.confirm("Continue despite checksum failure?", default=False): + ui.print_warning("Installation cancelled") + cleanup(package_manager, ui) + return + + # Create config + if certificate: + if not create_config_file(certificate, ui): + if not ui.confirm("Continue without config?", default=False): + ui.print_warning("Installation cancelled") + cleanup(package_manager, ui) + return + + # Install + if not install_package(package_manager, ui): + cleanup(package_manager, ui) + return + + # Verify + verify_installation(package_manager, ui) + + # Cleanup + cleanup(package_manager, ui) + + # Save configuration + if certificate: + save_config = ui.confirm( + "\nSave certificate for future use?", + default=True + ) + if save_config: + tool_config['last_certificate'] = certificate + config.set_tool_config('install_scan_assistant', tool_config) + config.save() + ui.print_success("Configuration saved") + + ui.print_success("\nInstallation process complete!") + + +def main() -> None: + """ + Main entry point for the script. """ - download_install_package() - sha512sum_verify() - generate_config_file() - install_package() - verify_installation() - cleanup() + parser = argparse.ArgumentParser( + description='Install Rapid7 Scan Assistant' + ) + parser.add_argument( + '--certificate', + help='PEM client certificate' + ) + parser.add_argument( + '--skip-verify', + action='store_true', + help='Skip checksum verification' + ) + + args = parser.parse_args() + + # If certificate provided, run in CLI mode + if args.certificate is not None: + ui = create_ui() + package_manager = check_package_system() + + if package_manager not in ['rpm', 'deb']: + ui.print_error(f"Unsupported package manager: {package_manager}") + sys.exit(1) + + success = ( + download_package(package_manager, ui) and + (args.skip_verify or verify_checksum(package_manager, ui)) and + (not args.certificate or + create_config_file(args.certificate, ui)) and + install_package(package_manager, ui) and + verify_installation(package_manager, ui) + ) + + cleanup(package_manager, ui) + sys.exit(0 if success else 1) + else: + # Run in interactive mode + interactive_mode() + if __name__ == "__main__": main()