Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion detection_rules/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from semver import Version

from . import beats, ecs, endgame, utils
from .config import load_current_package_version, parse_rules_config
from .config import CUSTOM_RULES_DIR, load_current_package_version, parse_rules_config
from .esql import get_esql_query_event_dataset_integrations
from .esql_errors import EsqlSemanticError
from .integrations import (
Expand Down Expand Up @@ -750,6 +750,8 @@ def index_or_dataview(self) -> list[str]:
@cached_property
def validator(self) -> QueryValidator | None:
if self.language == "kuery":
if not self.query.strip() and self.filters and CUSTOM_RULES_DIR:
return None
return KQLValidator(self.query)
if self.language == "eql":
return EQLValidator(self.query)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.6.42"
version = "1.6.43"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
87 changes: 87 additions & 0 deletions tests/test_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from typing import Any

import eql
import kql
import pytest
from marshmallow import ValidationError
from semver import Version
Expand Down Expand Up @@ -305,6 +306,92 @@ def build_rule(response_actions: list[dict[str, Any]]) -> TOMLRuleContents:
contents = build_rule(response_actions)
self.assertEqual(contents.to_api_format()["response_actions"], response_actions)

@unittest.mock.patch.dict(os.environ, {"CUSTOM_RULES_DIR": "custom-rules-dir"})
@unittest.mock.patch("detection_rules.rule.CUSTOM_RULES_DIR", "custom-rules-dir")
def test_empty_kuery_with_filters_is_valid_for_custom_rules(self) -> None:
"""Test that filter-only KQL rules exported from Kibana can be loaded."""
metadata = {
"creation_date": "1970/01/01",
"updated_date": "1970/01/01",
"min_stack_version": load_current_package_version(),
}
rule = {
"author": ["Elastic"],
"description": "test description",
"index": ["logs-*"],
"language": "kuery",
"license": "Elastic License v2",
"name": "filter only rule",
"query": "",
"filters": [
{
"meta": {
"disabled": False,
"negate": False,
"alias": None,
"index": "logs-*",
"key": "message",
"field": "message",
"params": {"query": "expected phrase"},
"type": "phrase",
},
"query": {"match_phrase": {"message": "expected phrase"}},
"$state": {"store": "appState"},
}
],
"risk_score": 21,
"rule_id": str(uuid.uuid4()),
"severity": "low",
"type": "query",
}

contents = TOMLRuleContents.from_dict({"metadata": metadata, "rule": rule})
self.assertEqual(contents.data.query, "")

rule_without_filters = dict(rule, rule_id=str(uuid.uuid4()), filters=[])
with self.assertRaises(kql.KqlParseError):
TOMLRuleContents.from_dict({"metadata": metadata, "rule": rule_without_filters})

def test_empty_kuery_with_filters_is_invalid_for_prebuilt_rules(self) -> None:
"""Test that filter-only KQL remains invalid outside custom rule exports."""
metadata = {
"creation_date": "1970/01/01",
"updated_date": "1970/01/01",
"min_stack_version": load_current_package_version(),
}
rule = {
"author": ["Elastic"],
"description": "test description",
"index": ["logs-*"],
"language": "kuery",
"license": "Elastic License v2",
"name": "filter only rule",
"query": "",
"filters": [
{
"meta": {
"disabled": False,
"negate": False,
"alias": None,
"index": "logs-*",
"key": "message",
"field": "message",
"params": {"query": "expected phrase"},
"type": "phrase",
},
"query": {"match_phrase": {"message": "expected phrase"}},
"$state": {"store": "appState"},
}
],
"risk_score": 21,
"rule_id": str(uuid.uuid4()),
"severity": "low",
"type": "query",
}

with self.assertRaises(kql.KqlParseError):
TOMLRuleContents.from_dict({"metadata": metadata, "rule": rule})


class TestVersionLockSchema(unittest.TestCase):
"""Test that the version lock has proper entries."""
Expand Down