diff --git a/sigma/cli/main.py b/sigma/cli/main.py index f8466b9..a9001ae 100644 --- a/sigma/cli/main.py +++ b/sigma/cli/main.py @@ -34,7 +34,7 @@ from .check import check from .plugin import plugin_group from .analyze import analyze_group -from .pysigma import check_pysigma_command +from .pysigma import pysigma_group CONTEXT_SETTINGS={ @@ -73,10 +73,10 @@ def version(): def main(): cli.add_command(analyze_group) cli.add_command(plugin_group) + cli.add_command(pysigma_group) cli.add_command(list_group) cli.add_command(convert) cli.add_command(check) - cli.add_command(check_pysigma_command) cli.add_command(version) cli() diff --git a/sigma/cli/pysigma.py b/sigma/cli/pysigma.py index 67e01b2..35c3554 100644 --- a/sigma/cli/pysigma.py +++ b/sigma/cli/pysigma.py @@ -1,8 +1,11 @@ import importlib.metadata import subprocess import sys +import os +from datetime import datetime import click from packaging.specifiers import SpecifierSet +from prettytable import PrettyTable def get_pysigma_requirement(): requires = importlib.metadata.requires("sigma-cli") @@ -18,8 +21,15 @@ def check_pysigma_version(): version_specifier = SpecifierSet(requires_pysgima.split(" ")[1][1:-1]) return importlib.metadata.version("pysigma") in version_specifier -@click.command( - name="check-pysigma", +@click.group( + name="pysigma", + help="pySigma library management commands." +) +def pysigma_group(): + pass + +@pysigma_group.command( + name="check-version", help="Check if the installed version of pysigma is compatible with the version required by sigma-cli." ) @click.option( @@ -28,7 +38,7 @@ def check_pysigma_version(): default=False, help="Suppress output if check passes.", ) -def check_pysigma_command(quiet): +def check_version_command(quiet): check_pysigma(quiet) def check_pysigma(quiet=False): @@ -61,4 +71,232 @@ def check_pysigma(quiet=False): ) click.echo("pySigma successfully reinstalled") else: - click.echo("Incompatible pySigma version was keeped. You can rerun the check with: " + click.style("sigma check-pysigma", fg="green")) \ No newline at end of file + click.echo("Incompatible pySigma version was keeped. You can rerun the check with: " + click.style("sigma pysigma check-version", fg="green")) + + +@pysigma_group.command( + name="list-cache", + help="List cached data versions and timestamps." +) +def list_cache_command(): + """List the cached versions of pySigma data and their timestamps.""" + try: + from sigma.data import mitre_attack, mitre_d3fend + + # Configuration for datasets to check + datasets = [ + { + 'name': 'MITRE ATT&CK', + 'module': mitre_attack, + 'cache_key': 'mitre_attack_data_default', + 'version_key': 'mitre_attack_version' + }, + { + 'name': 'MITRE D3FEND', + 'module': mitre_d3fend, + 'cache_key': 'mitre_d3fend_data_default', + 'version_key': 'mitre_d3fend_version' + } + ] + + table = PrettyTable() + table.field_names = ["Dataset", "Version", "Cached Date"] + table.align = "l" + + for dataset in datasets: + cache = dataset['module']._get_cache() + + # Check if cache directory exists and has the key + if not os.path.exists(cache.directory) or dataset['cache_key'] not in cache: + table.add_row([dataset['name'], "Not cached", "-"]) + else: + # Get cached data without triggering download + data = cache.get(dataset['cache_key'], read=True) + version = data.get(dataset['version_key'], 'Unknown') + + # Get timestamp from cache files + cache_files = [f for f in os.listdir(cache.directory) if not f.startswith('.')] + if cache_files: + newest_mtime = max(os.path.getmtime(os.path.join(cache.directory, f)) for f in cache_files) + timestamp = datetime.fromtimestamp(newest_mtime).strftime("%Y-%m-%d %H:%M:%S") + else: + timestamp = "Unknown" + + table.add_row([dataset['name'], version, timestamp]) + + click.echo(table) + + except ImportError: + click.echo(click.style("Error: Unable to import pySigma data modules.", fg="red")) + click.echo("Make sure pySigma is installed correctly.") + except Exception as e: + click.echo(click.style(f"Error accessing cache: {str(e)}", fg="red")) + + +@pysigma_group.command( + name="clear-cache", + help="Delete all cached data." +) +@click.option( + "--yes", + "-y", + is_flag=True, + help="Skip confirmation prompt.", +) +def clear_cache_command(yes): + """Delete the cached data for all datasets.""" + try: + from sigma.data import mitre_attack, mitre_d3fend + + datasets = [ + {'name': 'MITRE ATT&CK', 'module': mitre_attack}, + {'name': 'MITRE D3FEND', 'module': mitre_d3fend} + ] + + # Check what's cached + cached_datasets = [] + total_size = 0 + total_entries = 0 + + for dataset in datasets: + cache = dataset['module']._get_cache() + if os.path.exists(cache.directory): + keys = list(cache.iterkeys()) + if keys: + size = cache.volume() + cached_datasets.append({ + 'name': dataset['name'], + 'entries': len(keys), + 'size': size + }) + total_entries += len(keys) + total_size += size + + if not cached_datasets: + click.echo(click.style("No cached data found. Nothing to clear.", fg="yellow")) + return + + # Confirm deletion + if not yes: + for cached in cached_datasets: + click.echo(f"{cached['name']}: {cached['entries']} entries, {cached['size']} bytes") + click.echo(f"Total: {total_entries} entries, {total_size} bytes") + if not click.confirm(click.style("Are you sure you want to clear all cached data?", fg="yellow")): + click.echo("Cache clearing cancelled.") + return + + # Clear all caches + cleared_count = 0 + for dataset in datasets: + cache = dataset['module']._get_cache() + if os.path.exists(cache.directory): + keys = list(cache.iterkeys()) + if keys: + dataset['module'].clear_cache() + cleared_count += 1 + + click.echo(click.style(f"✓ Cache cleared successfully for {cleared_count} dataset(s).", fg="green")) + click.echo(f"Removed {total_entries} cache entries ({total_size} bytes)") + + except ImportError: + click.echo(click.style("Error: Unable to import pySigma data modules.", fg="red")) + click.echo("Make sure pySigma is installed correctly.") + except Exception as e: + click.echo(click.style(f"Error clearing cache: {str(e)}", fg="red")) + + +@pysigma_group.command( + name="update-cache", + help="Update cache by clearing and re-caching data." +) +@click.option( + "--yes", + "-y", + is_flag=True, + help="Skip confirmation prompt.", +) +def update_cache_command(yes): + """Update the cache by deleting it and re-caching data for all datasets.""" + try: + from sigma.data import mitre_attack, mitre_d3fend + + datasets = [ + { + 'name': 'MITRE ATT&CK', + 'module': mitre_attack, + 'trigger_attr': 'mitre_attack_techniques_tactics_mapping' + }, + { + 'name': 'MITRE D3FEND', + 'module': mitre_d3fend, + 'trigger_attr': 'mitre_d3fend_techniques' + } + ] + + # Get current cache info + cached_datasets = [] + total_size = 0 + total_entries = 0 + + for dataset in datasets: + cache = dataset['module']._get_cache() + if os.path.exists(cache.directory): + keys = list(cache.iterkeys()) + if keys: + size = cache.volume() + cached_datasets.append({ + 'name': dataset['name'], + 'entries': len(keys), + 'size': size + }) + total_entries += len(keys) + total_size += size + + # Confirm update + if not yes: + if cached_datasets: + click.echo("Current cache:") + for cached in cached_datasets: + click.echo(f" {cached['name']}: {cached['entries']} entries, {cached['size']} bytes") + click.echo(f"Total: {total_entries} entries, {total_size} bytes") + else: + click.echo("No cached data found (will download fresh data)") + + if not click.confirm(click.style("Update cache by clearing and re-downloading data?", fg="yellow")): + click.echo("Cache update cancelled.") + return + + # Clear and update each dataset + updated_count = 0 + new_total_size = 0 + new_total_entries = 0 + + for dataset in datasets: + click.echo(f"Updating {dataset['name']}...") + + # Clear cache + dataset['module'].clear_cache() + + # Trigger re-caching by accessing data + _ = getattr(dataset['module'], dataset['trigger_attr']) + + # Get new cache info + cache = dataset['module']._get_cache() + new_keys = list(cache.iterkeys()) + new_size = cache.volume() + + click.echo(click.style(f" ✓ {dataset['name']} cached: {len(new_keys)} entries, {new_size} bytes", fg="green")) + + updated_count += 1 + new_total_entries += len(new_keys) + new_total_size += new_size + + click.echo() + click.echo(click.style(f"✓ Cache updated successfully for {updated_count} dataset(s).", fg="green")) + click.echo(f"Total: {new_total_entries} entries, {new_total_size} bytes") + + except ImportError: + click.echo(click.style("Error: Unable to import pySigma data modules.", fg="red")) + click.echo("Make sure pySigma is installed correctly.") + except Exception as e: + click.echo(click.style(f"Error updating cache: {str(e)}", fg="red")) \ No newline at end of file diff --git a/tests/test_pysigma.py b/tests/test_pysigma.py index 01d19d5..eeac0ab 100644 --- a/tests/test_pysigma.py +++ b/tests/test_pysigma.py @@ -1,6 +1,6 @@ import importlib import re -from sigma.cli.pysigma import check_pysigma_command, check_pysigma_version +from sigma.cli.pysigma import pysigma_group, check_pysigma_version from click.testing import CliRunner import pytest @@ -30,13 +30,67 @@ def test_check_pysigma_version_incompatible(monkeypatch, pysigma_expected_versio ) def test_check_pysigma(): cli = CliRunner() - result = cli.invoke(check_pysigma_command) + result = cli.invoke(pysigma_group, ["check-version"]) assert "pySigma version is compatible with sigma-cli" in result.output @pytest.mark.skip(reason="This test is not working") def test_check_pysigma_incompatible(monkeypatch): monkeypatch.setattr('importlib.metadata.version', lambda x: "0.0.1") cli = CliRunner() - result = cli.invoke(check_pysigma_command, input="y\n") + result = cli.invoke(pysigma_group, ["check-version"], input="y\n") assert "pySigma version is not compatible" in result.output - assert "pySigma successfully reinstalled" in result.output \ No newline at end of file + assert "pySigma successfully reinstalled" in result.output + + +def test_list_cache(): + """Test list-cache command shows cache information.""" + cli = CliRunner() + result = cli.invoke(pysigma_group, ["list-cache"]) + assert result.exit_code == 0 + # Check that the output contains the expected table headers and dataset names + assert "Dataset" in result.output + assert "Version" in result.output + assert "Cached Date" in result.output + assert ("MITRE ATT&CK" in result.output or "Not cached" in result.output) + + +def test_clear_cache_help(): + """Test clear-cache command help.""" + cli = CliRunner() + result = cli.invoke(pysigma_group, ["clear-cache", "--help"]) + assert result.exit_code == 0 + assert "Delete all cached data" in result.output + assert "--yes" in result.output or "-y" in result.output + + +def test_clear_cache_with_confirmation_cancel(): + """Test clear-cache command cancellation.""" + cli = CliRunner() + result = cli.invoke(pysigma_group, ["clear-cache"], input="n\n") + assert result.exit_code == 0 + assert "cancelled" in result.output.lower() or "empty" in result.output.lower() or "No cache directory found" in result.output + + +def test_clear_cache_with_yes_flag(): + """Test clear-cache command with -y flag.""" + cli = CliRunner() + result = cli.invoke(pysigma_group, ["clear-cache", "-y"]) + assert result.exit_code == 0 + assert "cleared" in result.output.lower() or "empty" in result.output.lower() or "No cache directory found" in result.output + + +def test_update_cache_help(): + """Test update-cache command help.""" + cli = CliRunner() + result = cli.invoke(pysigma_group, ["update-cache", "--help"]) + assert result.exit_code == 0 + assert "Update cache" in result.output + assert "--yes" in result.output or "-y" in result.output + + +def test_update_cache_with_confirmation_cancel(): + """Test update-cache command cancellation.""" + cli = CliRunner() + result = cli.invoke(pysigma_group, ["update-cache"], input="n\n") + assert result.exit_code == 0 + assert "cancelled" in result.output.lower() \ No newline at end of file