Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 93 additions & 70 deletions sigma/cli/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,91 @@
from sigma.validation import SigmaValidator
from sigma.rule import SigmaRule

plugins = InstalledSigmaPlugins.autodiscover()
validators = plugins.validators

severity_color = {"low": "green", "medium": "yellow", "high": "red"}

# ==========================================
# Data Processing & Extraction Functions
# ==========================================
def setup_validator(validation_config, exclude):

plugins = InstalledSigmaPlugins.autodiscover()
validators = plugins.validators

if (
validation_config is None
): # no validation config provided, use basic config with all validators
exclude_lower = [excluded.lower() for excluded in exclude]
exclude_invalid = [
excluded for excluded in exclude_lower if excluded not in validators.keys()
]
exclude_valid = [
excluded for excluded in exclude_lower if excluded not in exclude_invalid
]

report_config_warnings(exclude_invalid, exclude_valid)

validators_filtered = [
validator
for name, validator in validators.items()
if name.lower() not in exclude_valid
]
rule_validator = SigmaValidator(validators_filtered)
else:
if exclude:
click.echo(
f"A configuration file and the `--exclude` parameter was set, ignoring the `--exclude` parameter."
)
rule_validator = SigmaValidator.from_yaml(validation_config.read(), validators)
return rule_validator

# ==========================================
# Reporting & Output Functions
# ==========================================

def report_config_warnings(exclude_invalid, exclude_valid):
"""Prints warnings regarding invalid or excluded validators."""
if len(exclude_invalid) > 0:
click.echo(f"Invalid validators name : {exclude_invalid} use 'sigma list validators'")
if len(exclude_valid) > 0:
click.echo(f"Ignoring these validators : {exclude_valid}'")

def validate_loaded_rules(check_rules, rule_validator):
with click.progressbar(
check_rules, label="Checking Sigma rules", file=stderr
) as rules:
issues = rule_validator.validate_rules(rules)
return issues


def load_and_check_rules(input, file_pattern, rule_errors, cond_errors):
rule_collection = load_rules(input, file_pattern)
check_rules = list()
first_error = True
for rule in rule_collection.rules:
if (
len(rule.errors) > 0
): # rule has errors: print errors and skip further checking of rule
if first_error:
click.echo("=== Sigma Rule Errors ===")
first_error = False

for error in rule.errors:
click.echo(error)
rule_errors.update((error.__class__.__name__,))
elif isinstance(rule, SigmaRule): # rule has no errors, parse condition
try:
for condition in rule.detection.parsed_condition:
condition.parse()
check_rules.append(rule)
except SigmaConditionError as e: # Error in condition
error = str(e)
click.echo(
f"Condition error in { str(condition.source) }:{ error }"
)
cond_errors.update((error,))
else:
check_rules.append(rule)
return check_rules

@click.command()
@click.option(
Expand Down Expand Up @@ -66,89 +146,32 @@ def check(
input, validation_config, file_pattern, fail_on_error, fail_on_issues, exclude
):
"""Check Sigma rules for validity and best practices (not yet implemented)."""
if (
validation_config is None
): # no validation config provided, use basic config with all validators
exclude_lower = [excluded.lower() for excluded in exclude]
exclude_invalid = [
excluded for excluded in exclude_lower if excluded not in validators.keys()
]
exclude_valid = [
excluded for excluded in exclude_lower if excluded not in exclude_invalid
]

if len(exclude_invalid) > 0:
click.echo(
f"Invalid validators name : {exclude_invalid} use 'sigma list validators'"
)
if len(exclude_valid) > 0:
click.echo(f"Ignoring these validators : {exclude_valid}'")

validators_filtered = [
validator
for name, validator in validators.items()
if name.lower() not in exclude_valid
]
rule_validator = SigmaValidator(validators_filtered)
else:
if exclude:
click.echo(
f"A configuration file and the `--exclude` parameter was set, ignoring the `--exclude` parameter."
)
rule_validator = SigmaValidator.from_yaml(validation_config.read(), validators)

rule_validator = setup_validator(validation_config, exclude)

try:
rule_collection = load_rules(input, file_pattern)
rule_errors = Counter()
cond_errors = Counter()
check_rules = list()
first_error = True
for rule in rule_collection.rules:
if (
len(rule.errors) > 0
): # rule has errors: print errors and skip further checking of rule
if first_error:
click.echo("=== Sigma Rule Errors ===")
first_error = False

for error in rule.errors:
click.echo(error)
rule_errors.update((error.__class__.__name__,))
elif isinstance(rule, SigmaRule): # rule has no errors, parse condition
try:
for condition in rule.detection.parsed_condition:
condition.parse()
check_rules.append(rule)
except SigmaConditionError as e: # Error in condition
error = str(e)
click.echo(
f"Condition error in { str(condition.source) }:{ error }"
)
cond_errors.update((error,))
else:
check_rules.append(rule)
check_rules = load_and_check_rules(input, file_pattern, rule_errors, cond_errors)

# TODO: From Python 3.10 the commented line below can be used.
rule_error_count = sum(rule_errors.values())
# rule_error_count = rule_errors.total()

with click.progressbar(
check_rules, label="Checking Sigma rules", file=stderr
) as rules:
issues = rule_validator.validate_rules(rules)
issues = validate_loaded_rules(check_rules, rule_validator)

issue_count = len(issues)
issue_counter = Counter()
if issue_count > 0:
click.echo("=== Issues ===")
for issue in issues:
# Need to split SigmaValidationIssue __str__
rules = ", ".join(
formatted_rules_output = ", ".join(
[
str(rule.source)
if rule.source is not None
else str(rule.id) or rule.title
for rule in issue.rules
str(rule_with_issue.source)
if rule_with_issue.source is not None
else str(rule_with_issue.id) or rule_with_issue.title
for rule_with_issue in issue.rules
]
)
additional_fields = " ".join(
Expand All @@ -171,7 +194,7 @@ def check(
+ " description="
+ click.style(issue.description, bold=True, fg="blue")
+ " rule="
+ click.style(rules, bold=True, fg="blue")
+ click.style(formatted_rules_output, bold=True, fg="blue")
+ f" {additional_fields}"
)
issue_counter.update((issue.__class__,))
Expand Down
Loading