diff --git a/sigma/cli/check.py b/sigma/cli/check.py index 33cc5fb..b2565cd 100644 --- a/sigma/cli/check.py +++ b/sigma/cli/check.py @@ -14,11 +14,91 @@ from sigma.validation import SigmaValidator from sigma.rule import SigmaRule -plugins = InstalledSigmaPlugins.autodiscover() -validators = plugins.validators - severity_color = {"low": "green", "medium": "yellow", "high": "red"} +# ========================================== +# Data Processing & Extraction Functions +# ========================================== +def setup_validator(validation_config, exclude): + + plugins = InstalledSigmaPlugins.autodiscover() + validators = plugins.validators + + if ( + validation_config is None + ): # no validation config provided, use basic config with all validators + exclude_lower = [excluded.lower() for excluded in exclude] + exclude_invalid = [ + excluded for excluded in exclude_lower if excluded not in validators.keys() + ] + exclude_valid = [ + excluded for excluded in exclude_lower if excluded not in exclude_invalid + ] + + report_config_warnings(exclude_invalid, exclude_valid) + + validators_filtered = [ + validator + for name, validator in validators.items() + if name.lower() not in exclude_valid + ] + rule_validator = SigmaValidator(validators_filtered) + else: + if exclude: + click.echo( + f"A configuration file and the `--exclude` parameter was set, ignoring the `--exclude` parameter." + ) + rule_validator = SigmaValidator.from_yaml(validation_config.read(), validators) + return rule_validator + +# ========================================== +# Reporting & Output Functions +# ========================================== + +def report_config_warnings(exclude_invalid, exclude_valid): + """Prints warnings regarding invalid or excluded validators.""" + if len(exclude_invalid) > 0: + click.echo(f"Invalid validators name : {exclude_invalid} use 'sigma list validators'") + if len(exclude_valid) > 0: + click.echo(f"Ignoring these validators : {exclude_valid}'") + +def validate_loaded_rules(check_rules, rule_validator): + with click.progressbar( + check_rules, label="Checking Sigma rules", file=stderr + ) as rules: + issues = rule_validator.validate_rules(rules) + return issues + + +def load_and_check_rules(input, file_pattern, rule_errors, cond_errors): + rule_collection = load_rules(input, file_pattern) + check_rules = list() + first_error = True + for rule in rule_collection.rules: + if ( + len(rule.errors) > 0 + ): # rule has errors: print errors and skip further checking of rule + if first_error: + click.echo("=== Sigma Rule Errors ===") + first_error = False + + for error in rule.errors: + click.echo(error) + rule_errors.update((error.__class__.__name__,)) + elif isinstance(rule, SigmaRule): # rule has no errors, parse condition + try: + for condition in rule.detection.parsed_condition: + condition.parse() + check_rules.append(rule) + except SigmaConditionError as e: # Error in condition + error = str(e) + click.echo( + f"Condition error in { str(condition.source) }:{ error }" + ) + cond_errors.update((error,)) + else: + check_rules.append(rule) + return check_rules @click.command() @click.option( @@ -66,76 +146,19 @@ def check( input, validation_config, file_pattern, fail_on_error, fail_on_issues, exclude ): """Check Sigma rules for validity and best practices (not yet implemented).""" - if ( - validation_config is None - ): # no validation config provided, use basic config with all validators - exclude_lower = [excluded.lower() for excluded in exclude] - exclude_invalid = [ - excluded for excluded in exclude_lower if excluded not in validators.keys() - ] - exclude_valid = [ - excluded for excluded in exclude_lower if excluded not in exclude_invalid - ] - - if len(exclude_invalid) > 0: - click.echo( - f"Invalid validators name : {exclude_invalid} use 'sigma list validators'" - ) - if len(exclude_valid) > 0: - click.echo(f"Ignoring these validators : {exclude_valid}'") - - validators_filtered = [ - validator - for name, validator in validators.items() - if name.lower() not in exclude_valid - ] - rule_validator = SigmaValidator(validators_filtered) - else: - if exclude: - click.echo( - f"A configuration file and the `--exclude` parameter was set, ignoring the `--exclude` parameter." - ) - rule_validator = SigmaValidator.from_yaml(validation_config.read(), validators) + + rule_validator = setup_validator(validation_config, exclude) try: - rule_collection = load_rules(input, file_pattern) rule_errors = Counter() cond_errors = Counter() - check_rules = list() - first_error = True - for rule in rule_collection.rules: - if ( - len(rule.errors) > 0 - ): # rule has errors: print errors and skip further checking of rule - if first_error: - click.echo("=== Sigma Rule Errors ===") - first_error = False - - for error in rule.errors: - click.echo(error) - rule_errors.update((error.__class__.__name__,)) - elif isinstance(rule, SigmaRule): # rule has no errors, parse condition - try: - for condition in rule.detection.parsed_condition: - condition.parse() - check_rules.append(rule) - except SigmaConditionError as e: # Error in condition - error = str(e) - click.echo( - f"Condition error in { str(condition.source) }:{ error }" - ) - cond_errors.update((error,)) - else: - check_rules.append(rule) + check_rules = load_and_check_rules(input, file_pattern, rule_errors, cond_errors) # TODO: From Python 3.10 the commented line below can be used. rule_error_count = sum(rule_errors.values()) # rule_error_count = rule_errors.total() - with click.progressbar( - check_rules, label="Checking Sigma rules", file=stderr - ) as rules: - issues = rule_validator.validate_rules(rules) + issues = validate_loaded_rules(check_rules, rule_validator) issue_count = len(issues) issue_counter = Counter() @@ -143,12 +166,12 @@ def check( click.echo("=== Issues ===") for issue in issues: # Need to split SigmaValidationIssue __str__ - rules = ", ".join( + formatted_rules_output = ", ".join( [ - str(rule.source) - if rule.source is not None - else str(rule.id) or rule.title - for rule in issue.rules + str(rule_with_issue.source) + if rule_with_issue.source is not None + else str(rule_with_issue.id) or rule_with_issue.title + for rule_with_issue in issue.rules ] ) additional_fields = " ".join( @@ -171,7 +194,7 @@ def check( + " description=" + click.style(issue.description, bold=True, fg="blue") + " rule=" - + click.style(rules, bold=True, fg="blue") + + click.style(formatted_rules_output, bold=True, fg="blue") + f" {additional_fields}" ) issue_counter.update((issue.__class__,))