Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/build-test-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ jobs:
"splunk_app_req",
"splunk_app_req_broken",
"splunk_cim_model",
"splunk_app_fiction_with_uuid",
"splunk_app_req_with_uuid",
]
steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ services:
SPLUNK_APP_ID: ${SPLUNK_APP_ID}
SPLUNK_APP_PACKAGE: ${SPLUNK_APP_PACKAGE}
SPLUNK_VERSION: ${SPLUNK_VERSION}
platform: linux/amd64
ports:
- "8000"
- "8088"
Expand All @@ -91,6 +92,7 @@ services:
SPLUNK_APP_PACKAGE: ${SPLUNK_APP_PACKAGE}
SPLUNK_VERSION: ${SPLUNK_VERSION}
hostname: uf
platform: linux/amd64
ports:
- "9997"
- "8089"
Expand Down
13 changes: 13 additions & 0 deletions docs/how_to_use.md
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,19 @@ The following optional arguments are available to modify the default settings in

- Select false to disable test execution, default value is true

```console
--splunk-ep
```

- Enable Splunk Edge Processor mode when your events are transformed during ingestion.
- **Why needed**: Edge Processor modifies event content (transformations, parsing, enrichment), which breaks tests that search for literal event content.
- When enabled, the following tests use UUID-based matching instead of escaped _raw:
- `test_cim_fields_recommended` (CIM compliance tests)
- `test_requirement_fields` (requirement field tests)
- `test_datamodels` (datamodel mapping tests)
- **Limitation**: These tests are only generated for samples using HEC Event ingestor (`modinput`, `windows_input`) because other ingestors don't support UUID indexed fields.
- **Other test types**: Field extraction, tags, eventtypes, savedsearches, etc. are generated for ALL samples and work normally with EP transformations.

## Extending pytest-splunk-addon

**1. Test cases taking too long to execute**
Expand Down
86 changes: 42 additions & 44 deletions pytest_splunk_addon/app_test_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,23 @@ class AppTestGenerator(object):
def __init__(self, pytest_config):
self.pytest_config = pytest_config
self.seen_tests = set()
self.splunk_ep = self.pytest_config.getoption("splunk_ep")
self.config_path = self.pytest_config.getoption("splunk_data_generator")
self.store_events = self.pytest_config.getoption("store_events")

store_events = self.pytest_config.getoption("store_events")
config_path = self.pytest_config.getoption("splunk_data_generator")
sample_generator = SampleXdistGenerator(
self.pytest_config.getoption("splunk_app"), config_path
self.pytest_config.getoption("splunk_app"),
self.splunk_ep,
self.config_path,
)
store_sample = sample_generator.get_samples(store_events)
store_sample = sample_generator.get_samples(self.store_events)
self.tokenized_events = store_sample.get("tokenized_events")
LOGGER.debug("Initializing FieldTestGenerator to generate the test cases")
self.fieldtest_generator = FieldTestGenerator(
self.pytest_config.getoption("splunk_app"),
self.tokenized_events,
field_bank=self.pytest_config.getoption("field_bank", False),
splunk_ep=self.splunk_ep,
)

data_model_path = os.path.join(
Expand All @@ -68,9 +72,42 @@ def __init__(self, pytest_config):
self.pytest_config.getoption("splunk_app"),
self.pytest_config.getoption("splunk_dm_path") or data_model_path,
self.tokenized_events,
splunk_ep=self.splunk_ep,
)
self.indextime_test_generator = IndexTimeTestGenerator()

def _generate_indextime_tests(self, fixture):
"""
Generate index time tests based on the fixture type.

Args:
fixture (str): The fixture name containing the test type

Returns:
list: List of pytest parameters for the specified test type
"""
app_path = self.pytest_config.getoption("splunk_app")
config_path = self.pytest_config.getoption("splunk_data_generator")

if "key_fields" in fixture:
test_type = "key_fields"
elif "_time" in fixture:
test_type = "_time"
elif "line_breaker" in fixture:
test_type = "line_breaker"
else:
return []

return list(
self.indextime_test_generator.generate_tests(
self.store_events,
app_path=app_path,
config_path=config_path,
test_type=test_type,
splunk_ep=self.splunk_ep,
)
)

def generate_tests(self, fixture):
"""
Generate the test cases based on the fixture provided
Expand Down Expand Up @@ -100,47 +137,8 @@ def generate_tests(self, fixture):
self.cim_test_generator.generate_tests(fixture),
fixture,
)

elif fixture.startswith("splunk_indextime"):
# TODO: What should be the id of the test case?
# Sourcetype + Host + Key field + _count

pytest_params = None

store_events = self.pytest_config.getoption("store_events")
app_path = self.pytest_config.getoption("splunk_app")
config_path = self.pytest_config.getoption("splunk_data_generator")

if "key_fields" in fixture:
pytest_params = list(
self.indextime_test_generator.generate_tests(
store_events,
app_path=app_path,
config_path=config_path,
test_type="key_fields",
)
)

elif "_time" in fixture:
pytest_params = list(
self.indextime_test_generator.generate_tests(
store_events,
app_path=app_path,
config_path=config_path,
test_type="_time",
)
)

elif "line_breaker" in fixture:
pytest_params = list(
self.indextime_test_generator.generate_tests(
store_events,
app_path=app_path,
config_path=config_path,
test_type="line_breaker",
)
)

pytest_params = self._generate_indextime_tests(fixture)
yield from sorted(pytest_params, key=lambda param: param.id)

def dedup_tests(self, test_list, fixture):
Expand Down
20 changes: 20 additions & 0 deletions pytest_splunk_addon/cim_tests/test_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from . import DataModelHandler
from ..addon_parser import AddonParser
from ..addon_parser import Field
from ..utils import EP_COMPATIBLE_INPUT_TYPES

LOGGER = logging.getLogger("pytest-splunk-addon")

Expand Down Expand Up @@ -52,6 +53,7 @@ def __init__(
tokenized_events,
test_field_type=["required", "conditional"],
common_fields_path=None,
splunk_ep=False,
):

self.data_model_handler = DataModelHandler(data_model_path)
Expand All @@ -61,6 +63,7 @@ def __init__(
self.common_fields_path = common_fields_path or op.join(
op.dirname(op.abspath(__file__)), self.COMMON_FIELDS_PATH
)
self.splunk_ep = splunk_ep

def generate_tests(self, fixture):
"""
Expand Down Expand Up @@ -267,12 +270,29 @@ def generate_recommended_fields_tests(self):
2. combine the fields list with the defined exceptions
3. yield object with datamodel, dataset, cim_version and list of fields
"""
skipped_samples = set()

# Get EP-compatible input types once before the loop if EP mode is enabled
ep_compatible_types = EP_COMPATIBLE_INPUT_TYPES if self.splunk_ep else None

for event in self.tokenized_events:
if (
not event.requirement_test_data
or event.requirement_test_data.keys() == {"other_fields"}
):
continue

# Skip incompatible samples when Splunk EP mode is enabled
if self.splunk_ep:
input_type = event.metadata.get("input_type", "default")
if input_type not in ep_compatible_types:
if event.sample_name not in skipped_samples:
LOGGER.info(
f"Splunk EP mode: Skipping CIM recommended fields tests for sample '{event.sample_name}' "
f"(input_type: {input_type}) as it's not ingested by HECEventIngestor"
)
skipped_samples.add(event.sample_name)
continue
for _, datamodels in event.requirement_test_data["datamodels"].items():
if type(datamodels) is not list:
datamodels = [datamodels]
Expand Down
5 changes: 4 additions & 1 deletion pytest_splunk_addon/docker_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ def start(self, *services):

:param services: the names of the services as defined in compose file
"""
self._docker_compose.execute("up", "--build", "--wait", *services)

self._docker_compose.execute(
"up", "--build", "--wait", "--no-recreate", "-d", *services
)

def stop(self, *services):
"""Ensures that the given services are stopped via docker compose.
Expand Down
2 changes: 2 additions & 0 deletions pytest_splunk_addon/event_ingestors/hec_event_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ def ingest(self, events, thread_count):
"event": event.event,
"index": event.metadata.get("index", "main"),
}
if event.metadata.get("splunk_ep"):
event_dict["fields"] = {"unique_identifier": event.unique_identifier}

if event.metadata.get("host_type") in ("plugin", None):
host = event.metadata.get("host")
Expand Down
31 changes: 17 additions & 14 deletions pytest_splunk_addon/event_ingestors/ingestor_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ class IngestorHelper(object):
Module for helper methods for ingestors.
"""

# Mapping of input types to their corresponding ingestor classes
# Note: This is used by utils.get_ep_compatible_input_types() to determine EP-compatible types
INGEST_METHODS = {
"modinput": HECEventIngestor,
"windows_input": HECEventIngestor,
"file_monitor": HECRawEventIngestor,
"uf_file_monitor": FileMonitorEventIngestor,
"scripted_input": HECRawEventIngestor,
"hec_metric": HECMetricEventIngestor,
"syslog_tcp": SC4SEventIngestor,
"syslog_udp": None, # TBD
"default": HECRawEventIngestor,
}

@classmethod
def get_event_ingestor(cls, input_type, ingest_meta_data):
"""
Expand All @@ -40,19 +54,7 @@ def get_event_ingestor(cls, input_type, ingest_meta_data):
input_type (str): input_type defined in pytest-splunk-addon-data.conf
ingest_meta_data (dict): Dictionary of required meta_data.
"""
ingest_methods = {
"modinput": HECEventIngestor,
"windows_input": HECEventIngestor,
"file_monitor": HECRawEventIngestor,
"uf_file_monitor": FileMonitorEventIngestor,
"scripted_input": HECRawEventIngestor,
"hec_metric": HECMetricEventIngestor,
"syslog_tcp": SC4SEventIngestor,
"syslog_udp": None, # TBD
"default": HECRawEventIngestor,
}

ingestor = ingest_methods.get(input_type)(ingest_meta_data)
ingestor = cls.INGEST_METHODS.get(input_type)(ingest_meta_data)
LOGGER.debug("Using the following HEC ingestor: {}".format(str(ingestor)))
return ingestor

Expand Down Expand Up @@ -95,7 +97,8 @@ def ingest_events(
thread_count (int): number of threads to use for ingestion
store_events (bool): Boolean param for generating json files with tokenised events
"""
sample_generator = SampleXdistGenerator(addon_path, config_path)
splunk_ep = ingest_meta_data.get("splunk_ep", False)
sample_generator = SampleXdistGenerator(addon_path, splunk_ep, config_path)
store_sample = sample_generator.get_samples(store_events)
tokenized_events = store_sample.get("tokenized_events")
ingestor_dict = cls.get_consolidated_events(tokenized_events)
Expand Down
Loading