From 3ba4c26f22952f4f872d6fef6016cbfb59345870 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=89milio=20Gonzalez?= Date: Thu, 5 Feb 2026 16:25:58 -0500 Subject: [PATCH] Add a `--group` argument to `sigma analyze fields` to group results by log source. Displays the output in a prettytable. --- sigma/analyze/fields.py | 22 +++++++++++++++++----- sigma/cli/analyze.py | 25 ++++++++++++++++++++----- tests/test_analyze.py | 8 ++++++++ 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/sigma/analyze/fields.py b/sigma/analyze/fields.py index e08951e..3ea044a 100644 --- a/sigma/analyze/fields.py +++ b/sigma/analyze/fields.py @@ -2,7 +2,7 @@ from __future__ import annotations from operator import add -from typing import List, Set, Tuple +from typing import List, Set, Tuple, Dict, Union from sigma.rule import SigmaRule, SigmaDetection, SigmaDetectionItem from sigma.collection import SigmaCollection from sigma.correlations import SigmaCorrelationRule @@ -145,19 +145,22 @@ def noop(field: str) -> str: def extract_fields_from_collection( collection: SigmaCollection, backend, + group = False, collect_errors: bool = True, -) -> Tuple[Set[str], List[SigmaError]]: +) -> Tuple[Union[Set[str], Dict[str, Set[str]]], List[SigmaError]]: """Extract all unique field names from a Sigma collection. Args: collection: A SigmaCollection to extract fields from backend: A Backend instance used to escape and quote field names + group: Whether to group fields by logsource. Defaults to False. collect_errors: Whether to collect errors. Defaults to True. Returns: - Tuple[Set[str], List[SigmaError]]: A set of unique field names and any errors found + Tuple[Union[Set[str], Dict[str, Set[str]]], List[SigmaError]]: A set of unique field names (or a dict of set if grouped) and any errors found """ all_fields: Set[str] = set() + grouped_fields: Dict[str, Set[str]] = {} all_errors: List[SigmaError] = [] for rule in collection: @@ -202,6 +205,15 @@ def extract_fields_from_collection( fields, errors = get_fields(backend, rule, collect_errors) all_fields.update(fields) all_errors.extend(errors) - - return all_fields, all_errors + if group: + if isinstance(rule, SigmaRule): # Correlations not supported, they don't have logsource + logsource = f"{rule.logsource.category or ''}|{rule.logsource.product or ''}|{rule.logsource.service or ''}" + if logsource not in grouped_fields: + grouped_fields[logsource] = set() + grouped_fields[logsource].update(fields) + + if group: + return grouped_fields, all_errors + else: + return all_fields, all_errors diff --git a/sigma/cli/analyze.py b/sigma/cli/analyze.py index c0f4292..361f537 100644 --- a/sigma/cli/analyze.py +++ b/sigma/cli/analyze.py @@ -1,6 +1,7 @@ import json import pathlib import click +from prettytable import PrettyTable from sigma.processing.resolver import SigmaPipelineNotFoundError from sigma.cli.convert import pipeline_resolver @@ -245,13 +246,18 @@ def analyze_logsource( default=True, help="Verify if a pipeline is used that is intended for another backend.", ) +@click.option( + "--group/--no-group", + default=False, + help="Group fields by logsource.", +) @click.argument( "input", nargs=-1, required=True, type=click.Path(exists=True, allow_dash=True, path_type=pathlib.Path), ) -def analyze_fields(file_pattern, target, pipeline, pipeline_check, input): +def analyze_fields(file_pattern, target, pipeline, pipeline_check, group, input): """Extract field names from Sigma rule sets. This command extracts and outputs all unique field names present in the given @@ -301,13 +307,22 @@ def analyze_fields(file_pattern, target, pipeline, pipeline_check, input): raise click.ClickException(f"Failed to initialize backend '{target}': {str(e)}") # Extract fields - all_fields, errors = extract_fields_from_collection(rules, backend) + all_fields, errors = extract_fields_from_collection(rules, backend, group) # Handle errors if errors: click.echo("Warnings during field extraction:", err=True) for error in errors: click.echo(f"* {error}", err=True) - - # Output fields sorted - click.echo("\n".join(sorted(all_fields))) + + if group: + table = PrettyTable() + table.field_names = ["Logsource", "Fields"] + table.align["Logsource"] = "r" + table.align["Fields"] = "l" + for logsource, fields in sorted(all_fields.items()): + table.add_row([logsource, "\n".join(sorted(fields))]) + click.echo(table) + else: + # Output fields sorted + click.echo("\n".join(sorted(all_fields))) diff --git a/tests/test_analyze.py b/tests/test_analyze.py index db0da6c..65106db 100644 --- a/tests/test_analyze.py +++ b/tests/test_analyze.py @@ -245,3 +245,11 @@ def test_fields_invalid_rule(): result = cli.invoke(analyze_fields, ["-t", "text_query_test", "-", "tests/files/sigma_rule_without_condition.yml"]) assert result.exit_code != 0 assert "at least one condition" in result.stderr + +def test_fields_grouped_extract(): + cli = CliRunner() + result = cli.invoke(analyze_fields, ["-t", "text_query_test", "--group", "-", "tests/files/valid"]) + assert result.exit_code == 0 + # Should have extracted at least some fields + assert len(result.stdout.split()) > 0 + assert "+----------" in result.stdout # Check for table format \ No newline at end of file