Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions sigma/analyze/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from __future__ import annotations

from operator import add
from typing import List, Set, Tuple
from typing import List, Set, Tuple, Dict, Union
from sigma.rule import SigmaRule, SigmaDetection, SigmaDetectionItem
from sigma.collection import SigmaCollection
from sigma.correlations import SigmaCorrelationRule
Expand Down Expand Up @@ -145,19 +145,22 @@ def noop(field: str) -> str:
def extract_fields_from_collection(
collection: SigmaCollection,
backend,
group = False,
collect_errors: bool = True,
) -> Tuple[Set[str], List[SigmaError]]:
) -> Tuple[Union[Set[str], Dict[str, Set[str]]], List[SigmaError]]:
"""Extract all unique field names from a Sigma collection.

Args:
collection: A SigmaCollection to extract fields from
backend: A Backend instance used to escape and quote field names
group: Whether to group fields by logsource. Defaults to False.
collect_errors: Whether to collect errors. Defaults to True.

Returns:
Tuple[Set[str], List[SigmaError]]: A set of unique field names and any errors found
Tuple[Union[Set[str], Dict[str, Set[str]]], List[SigmaError]]: A set of unique field names (or a dict of set if grouped) and any errors found
"""
all_fields: Set[str] = set()
grouped_fields: Dict[str, Set[str]] = {}
all_errors: List[SigmaError] = []

for rule in collection:
Expand Down Expand Up @@ -202,6 +205,15 @@ def extract_fields_from_collection(
fields, errors = get_fields(backend, rule, collect_errors)
all_fields.update(fields)
all_errors.extend(errors)

return all_fields, all_errors
if group:
if isinstance(rule, SigmaRule): # Correlations not supported, they don't have logsource
logsource = f"{rule.logsource.category or ''}|{rule.logsource.product or ''}|{rule.logsource.service or ''}"
if logsource not in grouped_fields:
grouped_fields[logsource] = set()
grouped_fields[logsource].update(fields)

if group:
return grouped_fields, all_errors
else:
return all_fields, all_errors

25 changes: 20 additions & 5 deletions sigma/cli/analyze.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import pathlib
import click
from prettytable import PrettyTable
from sigma.processing.resolver import SigmaPipelineNotFoundError

from sigma.cli.convert import pipeline_resolver
Expand Down Expand Up @@ -245,13 +246,18 @@ def analyze_logsource(
default=True,
help="Verify if a pipeline is used that is intended for another backend.",
)
@click.option(
"--group/--no-group",
default=False,
help="Group fields by logsource.",
)
@click.argument(
"input",
nargs=-1,
required=True,
type=click.Path(exists=True, allow_dash=True, path_type=pathlib.Path),
)
def analyze_fields(file_pattern, target, pipeline, pipeline_check, input):
def analyze_fields(file_pattern, target, pipeline, pipeline_check, group, input):
"""Extract field names from Sigma rule sets.

This command extracts and outputs all unique field names present in the given
Expand Down Expand Up @@ -301,13 +307,22 @@ def analyze_fields(file_pattern, target, pipeline, pipeline_check, input):
raise click.ClickException(f"Failed to initialize backend '{target}': {str(e)}")

# Extract fields
all_fields, errors = extract_fields_from_collection(rules, backend)
all_fields, errors = extract_fields_from_collection(rules, backend, group)

# Handle errors
if errors:
click.echo("Warnings during field extraction:", err=True)
for error in errors:
click.echo(f"* {error}", err=True)

# Output fields sorted
click.echo("\n".join(sorted(all_fields)))

if group:
table = PrettyTable()
table.field_names = ["Logsource", "Fields"]
table.align["Logsource"] = "r"
table.align["Fields"] = "l"
for logsource, fields in sorted(all_fields.items()):
table.add_row([logsource, "\n".join(sorted(fields))])
click.echo(table)
else:
# Output fields sorted
click.echo("\n".join(sorted(all_fields)))
8 changes: 8 additions & 0 deletions tests/test_analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,11 @@ def test_fields_invalid_rule():
result = cli.invoke(analyze_fields, ["-t", "text_query_test", "-", "tests/files/sigma_rule_without_condition.yml"])
assert result.exit_code != 0
assert "at least one condition" in result.stderr

def test_fields_grouped_extract():
cli = CliRunner()
result = cli.invoke(analyze_fields, ["-t", "text_query_test", "--group", "-", "tests/files/valid"])
assert result.exit_code == 0
# Should have extracted at least some fields
assert len(result.stdout.split()) > 0
assert "+----------" in result.stdout # Check for table format