diff --git a/detection_rules/rule.py b/detection_rules/rule.py index 28de81b9bb1..54a00c024ba 100644 --- a/detection_rules/rule.py +++ b/detection_rules/rule.py @@ -28,7 +28,7 @@ from semver import Version from . import beats, ecs, endgame, utils -from .config import load_current_package_version, parse_rules_config +from .config import CUSTOM_RULES_DIR, load_current_package_version, parse_rules_config from .esql import get_esql_query_event_dataset_integrations from .esql_errors import EsqlSemanticError from .integrations import ( @@ -750,6 +750,8 @@ def index_or_dataview(self) -> list[str]: @cached_property def validator(self) -> QueryValidator | None: if self.language == "kuery": + if not self.query.strip() and self.filters and CUSTOM_RULES_DIR: + return None return KQLValidator(self.query) if self.language == "eql": return EQLValidator(self.query) diff --git a/pyproject.toml b/pyproject.toml index c7d46bb03ec..9fcdff583ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "1.6.42" +version = "1.6.43" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12" diff --git a/tests/test_schemas.py b/tests/test_schemas.py index 52eac327bd3..56dee89d1b3 100644 --- a/tests/test_schemas.py +++ b/tests/test_schemas.py @@ -14,6 +14,7 @@ from typing import Any import eql +import kql import pytest from marshmallow import ValidationError from semver import Version @@ -305,6 +306,92 @@ def build_rule(response_actions: list[dict[str, Any]]) -> TOMLRuleContents: contents = build_rule(response_actions) self.assertEqual(contents.to_api_format()["response_actions"], response_actions) + @unittest.mock.patch.dict(os.environ, {"CUSTOM_RULES_DIR": "custom-rules-dir"}) + @unittest.mock.patch("detection_rules.rule.CUSTOM_RULES_DIR", "custom-rules-dir") + def test_empty_kuery_with_filters_is_valid_for_custom_rules(self) -> None: + """Test that filter-only KQL rules exported from Kibana can be loaded.""" + metadata = { + "creation_date": "1970/01/01", + "updated_date": "1970/01/01", + "min_stack_version": load_current_package_version(), + } + rule = { + "author": ["Elastic"], + "description": "test description", + "index": ["logs-*"], + "language": "kuery", + "license": "Elastic License v2", + "name": "filter only rule", + "query": "", + "filters": [ + { + "meta": { + "disabled": False, + "negate": False, + "alias": None, + "index": "logs-*", + "key": "message", + "field": "message", + "params": {"query": "expected phrase"}, + "type": "phrase", + }, + "query": {"match_phrase": {"message": "expected phrase"}}, + "$state": {"store": "appState"}, + } + ], + "risk_score": 21, + "rule_id": str(uuid.uuid4()), + "severity": "low", + "type": "query", + } + + contents = TOMLRuleContents.from_dict({"metadata": metadata, "rule": rule}) + self.assertEqual(contents.data.query, "") + + rule_without_filters = dict(rule, rule_id=str(uuid.uuid4()), filters=[]) + with self.assertRaises(kql.KqlParseError): + TOMLRuleContents.from_dict({"metadata": metadata, "rule": rule_without_filters}) + + def test_empty_kuery_with_filters_is_invalid_for_prebuilt_rules(self) -> None: + """Test that filter-only KQL remains invalid outside custom rule exports.""" + metadata = { + "creation_date": "1970/01/01", + "updated_date": "1970/01/01", + "min_stack_version": load_current_package_version(), + } + rule = { + "author": ["Elastic"], + "description": "test description", + "index": ["logs-*"], + "language": "kuery", + "license": "Elastic License v2", + "name": "filter only rule", + "query": "", + "filters": [ + { + "meta": { + "disabled": False, + "negate": False, + "alias": None, + "index": "logs-*", + "key": "message", + "field": "message", + "params": {"query": "expected phrase"}, + "type": "phrase", + }, + "query": {"match_phrase": {"message": "expected phrase"}}, + "$state": {"store": "appState"}, + } + ], + "risk_score": 21, + "rule_id": str(uuid.uuid4()), + "severity": "low", + "type": "query", + } + + with self.assertRaises(kql.KqlParseError): + TOMLRuleContents.from_dict({"metadata": metadata, "rule": rule}) + class TestVersionLockSchema(unittest.TestCase): """Test that the version lock has proper entries."""