From af0e7c86e84707f05b617950706e502118adab70 Mon Sep 17 00:00:00 2001 From: Matt Wyen Date: Tue, 7 Oct 2025 15:53:26 -0400 Subject: [PATCH] feat(sonar): add Sonar Query API and modernize tools Implements comprehensive Sonar Query API wrapper and modernizes legacy Sonar query creation tool to use v2.0 architecture patterns. Changes: - Create src/rapid7/constants.py with API endpoints and status codes - Implement src/rapid7/api/sonar_queries.py with full CRUD operations - Add sonar_queries client to InsightVMClient - Create modern src/rapid7/tools/create_sonar_queries.py CLI tool - Uses InsightVMClient instead of direct HTTPBasicAuth - Improved error handling and user feedback - Better CSV processing with pandas - Command-line argument parsing with argparse - Progress tracking and result saving The new tool provides: - Clean, type-hinted code following v2.0 patterns - Helper methods for common operations (domain/IP queries) - Comprehensive docstrings and examples - Better separation of concerns Relates to #56 --- src/rapid7/api/sonar_queries.py | 220 +++++++++++++++++ src/rapid7/client.py | 13 +- src/rapid7/constants.py | 59 +++++ src/rapid7/tools/create_sonar_queries.py | 291 +++++++++++++++++++++++ 4 files changed, 581 insertions(+), 2 deletions(-) create mode 100644 src/rapid7/api/sonar_queries.py create mode 100644 src/rapid7/constants.py create mode 100644 src/rapid7/tools/create_sonar_queries.py diff --git a/src/rapid7/api/sonar_queries.py b/src/rapid7/api/sonar_queries.py new file mode 100644 index 0000000..9f3024b --- /dev/null +++ b/src/rapid7/api/sonar_queries.py @@ -0,0 +1,220 @@ +""" +Sonar Query API + +This module provides methods for managing InsightVM Sonar discovery queries. +""" + +from typing import Dict, List, Optional, Any +from .base import BaseAPI +from ..constants import Endpoints + + +class SonarQueryAPI(BaseAPI): + """API wrapper for InsightVM Sonar Query operations.""" + + def __init__( + self, + auth, + verify_ssl=None, + timeout=(10, 90) + ): + """ + Initialize the SonarQueryAPI. + + Args: + auth: InsightVMAuth instance + verify_ssl: Whether to verify SSL certificates + timeout: Tuple of (connect_timeout, read_timeout) + """ + super().__init__(auth, verify_ssl=verify_ssl, timeout=timeout) + self.endpoint = Endpoints.SONAR_QUERY + + def create_sonar_query( + self, + name: str, + filters: List[Dict[str, Any]], + description: Optional[str] = None + ) -> Dict: + """ + Create a new Sonar discovery query. + + Args: + name: Name of the Sonar query + filters: List of filter criteria dictionaries + description: Optional description of the query + + Returns: + Dict containing the created query details including ID + + Example: + >>> filters = [ + ... {"type": "domain-contains", "domain": "example.com"}, + ... {"type": "scan-date-within-the-last", "days": 30} + ... ] + >>> result = client.sonar_queries.create_sonar_query( + ... name="example.com Assets", + ... filters=filters + ... ) + """ + payload = { + "name": name, + "criteria": {"filters": filters} + } + + if description: + payload["description"] = description + + response = self.post(self.endpoint, json=payload) + return response.json() + + def get_sonar_query(self, query_id: int) -> Dict: + """ + Get details of a specific Sonar query. + + Args: + query_id: ID of the Sonar query + + Returns: + Dict containing the query details + """ + response = self.get(f"{self.endpoint}/{query_id}") + return response.json() + + def list_sonar_queries( + self, + page: int = 0, + size: int = 10, + sort: Optional[str] = None + ) -> Dict: + """ + List all Sonar queries. + + Args: + page: Page number (0-indexed) + size: Number of results per page + sort: Sort order (e.g., "name,asc") + + Returns: + Dict containing paginated list of Sonar queries + """ + params: Dict[str, Any] = {"page": page, "size": size} + if sort: + params["sort"] = sort + + response = self.get(self.endpoint, params=params) + return response.json() + + def update_sonar_query( + self, + query_id: int, + name: Optional[str] = None, + filters: Optional[List[Dict[str, Any]]] = None, + description: Optional[str] = None + ) -> Dict: + """ + Update an existing Sonar query. + + Args: + query_id: ID of the Sonar query to update + name: New name for the query + filters: New filter criteria + description: New description + + Returns: + Dict containing the updated query details + """ + payload: Dict[str, Any] = {} + + if name: + payload["name"] = name + if filters: + payload["criteria"] = {"filters": filters} + if description: + payload["description"] = description + + response = self.put(f"{self.endpoint}/{query_id}", json=payload) + return response.json() + + def delete_sonar_query(self, query_id: int) -> None: + """ + Delete a Sonar query. + + Args: + query_id: ID of the Sonar query to delete + """ + self.delete(f"{self.endpoint}/{query_id}") + + def create_domain_query( + self, + domain: str, + days: int = 30, + name: Optional[str] = None + ) -> Dict: + """ + Create a Sonar query for a specific domain. + + Helper method that creates a query filtering by domain and + recent scans. + + Args: + domain: Domain to filter by + days: Number of days for scan recency (default: 30) + name: Optional custom name (defaults to domain name) + + Returns: + Dict containing the created query details + + Example: + >>> result = sonar.create_domain_query( + ... domain="example.com", + ... days=7 + ... ) + """ + filters: List[Dict[str, Any]] = [ + {"type": "domain-contains", "domain": domain}, + {"type": "scan-date-within-the-last", "days": days} + ] + + query_name = name or domain + return self.create_sonar_query(query_name, filters) + + def create_ip_range_query( + self, + lower_ip: str, + upper_ip: str, + days: int = 30, + name: Optional[str] = None + ) -> Dict: + """ + Create a Sonar query for an IP address range. + + Helper method that creates a query filtering by IP range and + recent scans. + + Args: + lower_ip: Lower bound IP address + upper_ip: Upper bound IP address + days: Number of days for scan recency (default: 30) + name: Optional custom name (defaults to IP range) + + Returns: + Dict containing the created query details + + Example: + >>> result = sonar.create_ip_range_query( + ... lower_ip="192.168.1.1", + ... upper_ip="192.168.1.255", + ... days=7 + ... ) + """ + filters: List[Dict[str, Any]] = [ + { + "type": "ip-address-range", + "lower": lower_ip, + "upper": upper_ip + }, + {"type": "scan-date-within-the-last", "days": days} + ] + + query_name = name or f"{lower_ip}-{upper_ip}" + return self.create_sonar_query(query_name, filters) diff --git a/src/rapid7/client.py b/src/rapid7/client.py index 170b5d0..67f0f43 100644 --- a/src/rapid7/client.py +++ b/src/rapid7/client.py @@ -10,6 +10,7 @@ from .auth import InsightVMAuth from .api.assets import AssetAPI from .api.asset_groups import AssetGroupAPI +from .api.sonar_queries import SonarQueryAPI class InsightVMClient: @@ -23,6 +24,7 @@ class InsightVMClient: auth (InsightVMAuth): Authentication handler assets (AssetAPI): Asset operations client asset_groups (AssetGroupAPI): Asset group operations client + sonar_queries (SonarQueryAPI): Sonar query operations client Example: >>> # Basic usage with environment variables @@ -93,8 +95,15 @@ def __init__( ) # Initialize API clients - self.assets = AssetAPI(self.auth, verify_ssl=verify_ssl, timeout=timeout) - self.asset_groups = AssetGroupAPI(self.auth, verify_ssl=verify_ssl, timeout=timeout) + self.assets = AssetAPI( + self.auth, verify_ssl=verify_ssl, timeout=timeout + ) + self.asset_groups = AssetGroupAPI( + self.auth, verify_ssl=verify_ssl, timeout=timeout + ) + self.sonar_queries = SonarQueryAPI( + self.auth, verify_ssl=verify_ssl, timeout=timeout + ) def __repr__(self): return f"InsightVMClient(base_url='{self.auth.base_url}')" diff --git a/src/rapid7/constants.py b/src/rapid7/constants.py new file mode 100644 index 0000000..83e9557 --- /dev/null +++ b/src/rapid7/constants.py @@ -0,0 +1,59 @@ +""" +InsightVM API Constants + +This module provides constants used throughout the InsightVM Python client, +including API endpoint paths and other configuration values. +""" + +# API Version +API_VERSION = "3" +API_BASE_PATH = f"/api/{API_VERSION}" + + +# API Endpoints +class Endpoints: + """API endpoint paths for InsightVM API v3.""" + + ASSET_GROUP = "asset-group" + ASSET = "asset" + AUTHENTICATION = "authentication" + CONFIGURATION = "configuration" + CREDENTIAL = "credential" + DISCOVERY_CONNECTION = "discovery-connection" + DISCOVERY_PROFILE = "discovery-profile" + ENGINE = "engine" + EXPORT = "export" + IMPORT = "import" + LICENSE = "license" + REPORT = "report" + ROLE = "role" + SCAN = "scan" + SITE = "site" + SONAR_QUERY = "sonar_queries" + TAG = "tag" + USER = "user" + VULNERABILITY_EXCEPTION = "vulnerability-exception" + VULNERABILITY = "vulnerability" + VULNERABILITY_CHECK = "vulnerability-check" + VULNERABILITY_DEFINITION = "vulnerability-definition" + VULNERABILITY_SOLUTION = "vulnerability-solution" + + +# HTTP Status Codes +class StatusCodes: + """Standard HTTP status codes.""" + + OK = 200 + CREATED = 201 + NO_CONTENT = 204 + BAD_REQUEST = 400 + UNAUTHORIZED = 401 + FORBIDDEN = 403 + NOT_FOUND = 404 + INTERNAL_SERVER_ERROR = 500 + SERVICE_UNAVAILABLE = 503 + + +# Default Configuration +DEFAULT_TIMEOUT = 30 +DEFAULT_VERIFY_SSL = False # InsightVM often uses self-signed certs diff --git a/src/rapid7/tools/create_sonar_queries.py b/src/rapid7/tools/create_sonar_queries.py new file mode 100644 index 0000000..65107de --- /dev/null +++ b/src/rapid7/tools/create_sonar_queries.py @@ -0,0 +1,291 @@ +#!/usr/bin/env python3 +""" +Create Sonar Discovery Queries from CSV + +This tool reads a CSV file containing discovery targets (domains or IP ranges) +and creates InsightVM Sonar queries for each target using the InsightVM API. + +CSV Format: + target + example.com + 192.168.1.0/24 + test.org + +Usage: + python create_sonar_queries.py [--days N] + +Example: + python create_sonar_queries.py targets.csv --days 7 +""" + +import argparse +import ipaddress +import re +import sys +from pathlib import Path +from typing import List, Dict, Any, Tuple + +import pandas as pd # type: ignore + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +from src.rapid7.client import InsightVMClient # noqa: E402 + + +def is_valid_domain(domain: str) -> bool: + """ + Check if string is a valid domain name. + + Args: + domain: String to validate + + Returns: + True if valid domain, False otherwise + """ + pattern = r'^[a-z0-9]+([\-\.]{1}[a-z0-9]+)*\.[a-z]{2,}$' + return bool(re.match(pattern, domain.lower())) + + +def parse_target(target: str) -> Tuple[str, Dict[str, Any]]: + """ + Parse a target string and determine its type and filter criteria. + + Args: + target: Target string (domain or IP/CIDR) + + Returns: + Tuple of (target_type, filter_dict) + + Raises: + ValueError: If target format is invalid + """ + target = target.strip() + + # Check if it's a domain + if is_valid_domain(target): + return 'domain', {"type": "domain-contains", "domain": target} + + # Try to parse as IP or IP range + try: + ip_range = ipaddress.ip_network(target, strict=False) + return 'ip_range', { + "type": "ip-address-range", + "lower": str(ip_range.network_address), + "upper": str(ip_range.broadcast_address) + } + except ValueError: + raise ValueError(f"Invalid target format: {target}") + + +def load_targets_from_csv(filepath: str) -> pd.DataFrame: + """ + Load targets from CSV file. + + Args: + filepath: Path to CSV file + + Returns: + DataFrame with targets + + Raises: + FileNotFoundError: If CSV file doesn't exist + ValueError: If CSV format is invalid + """ + if not Path(filepath).exists(): + raise FileNotFoundError(f"CSV file not found: {filepath}") + + df = pd.read_csv(filepath, skipinitialspace=True) + + # Validate CSV format + if 'target' not in df.columns: + raise ValueError( + "CSV file must have a 'target' column. " + "Found columns: " + ", ".join(df.columns) + ) + + # Clean whitespace + df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x) + + return df + + +def create_queries_from_csv( + client: InsightVMClient, + filepath: str, + days: int = 30 +) -> pd.DataFrame: + """ + Create Sonar queries from CSV file. + + Args: + client: InsightVM client instance + filepath: Path to CSV file + days: Number of days for scan recency filter + + Returns: + DataFrame with results (target, status, query_id, message) + """ + # Load targets + df = load_targets_from_csv(filepath) + + # Add result columns + df['status'] = '' + df['query_id'] = '' + df['message'] = '' + + print(f"\nProcessing {len(df)} targets with {days}-day scan filter...") + print("-" * 80) + + # Process each target + for index, row in df.iterrows(): + target = row['target'] + print(f"\nProcessing: {target}") + + try: + # Parse target + target_type, filter_dict = parse_target(target) + + # Build filters + filters: List[Dict[str, Any]] = [ + filter_dict, + {"type": "scan-date-within-the-last", "days": days} + ] + + # Create query + result = client.sonar_queries.create_sonar_query( + name=target, + filters=filters + ) + + # Update results + df.at[index, 'status'] = 'success' + df.at[index, 'query_id'] = result.get('id', '') + df.at[index, 'message'] = ( + f"Query created successfully. ID: {result.get('id')}" + ) + + print(f" ✓ Success - Query ID: {result.get('id')}") + + except ValueError as e: + df.at[index, 'status'] = 'error' + df.at[index, 'message'] = str(e) + print(f" ✗ Error: {e}") + + except Exception as e: + df.at[index, 'status'] = 'error' + df.at[index, 'message'] = f"API Error: {str(e)}" + print(f" ✗ API Error: {e}") + + return df + + +def main(): + """Main function.""" + parser = argparse.ArgumentParser( + description='Create InsightVM Sonar queries from CSV file', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +CSV Format: + The CSV file must have a 'target' column containing domains or IP ranges. + + Example CSV: + target + example.com + 192.168.1.0/24 + test.org + 10.0.0.1 + +Examples: + # Create queries with default 30-day filter + python create_sonar_queries.py targets.csv + + # Create queries with 7-day filter + python create_sonar_queries.py targets.csv --days 7 + + # Save results to specific output file + python create_sonar_queries.py targets.csv --output results.csv + """ + ) + + parser.add_argument( + 'csv_file', + help='Path to CSV file containing targets' + ) + parser.add_argument( + '--days', + type=int, + default=30, + help='Number of days for scan-date-within-the-last filter (default: 30)' + ) + parser.add_argument( + '--output', + help='Output CSV file path (default: input file with _results suffix)' + ) + + args = parser.parse_args() + + # Validate days parameter + if args.days < 1: + parser.error("--days must be a positive integer") + + # Determine output file + if args.output: + output_file = args.output + else: + input_path = Path(args.csv_file) + output_file = ( + input_path.parent / + f"{input_path.stem}_results{input_path.suffix}" + ) + + print("=" * 80) + print("InsightVM Sonar Query Creator") + print("=" * 80) + print(f"Input file: {args.csv_file}") + print(f"Output file: {output_file}") + print(f"Days filter: {args.days}") + + try: + # Create client (uses environment variables) + print("\nConnecting to InsightVM...") + client = InsightVMClient() + print(f"Connected to: {client.auth.base_url}") + + # Process CSV + results_df = create_queries_from_csv( + client, + args.csv_file, + days=args.days + ) + + # Save results + results_df.to_csv(output_file, index=False) + + # Print summary + print("\n" + "=" * 80) + print("Summary") + print("=" * 80) + success_count = (results_df['status'] == 'success').sum() + error_count = (results_df['status'] == 'error').sum() + print(f"Total targets: {len(results_df)}") + print(f"Successful: {success_count}") + print(f"Failed: {error_count}") + print(f"\nResults saved to: {output_file}") + + # Exit with error if any failed + sys.exit(0 if error_count == 0 else 1) + + except FileNotFoundError as e: + print(f"\nError: {e}", file=sys.stderr) + sys.exit(1) + except ValueError as e: + print(f"\nError: {e}", file=sys.stderr) + sys.exit(1) + except Exception as e: + print(f"\nUnexpected error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == '__main__': + main()