diff --git a/.github/workflows/build-test-release.yml b/.github/workflows/build-test-release.yml index 591e25fba..e5f73bf4e 100644 --- a/.github/workflows/build-test-release.yml +++ b/.github/workflows/build-test-release.yml @@ -148,6 +148,8 @@ jobs: "splunk_app_req", "splunk_app_req_broken", "splunk_cim_model", + "splunk_app_fiction_with_uuid", + "splunk_app_req_with_uuid", ] steps: - uses: actions/checkout@v4 diff --git a/docker-compose.yml b/docker-compose.yml index 65929c961..d7d13d7e0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -70,6 +70,7 @@ services: SPLUNK_APP_ID: ${SPLUNK_APP_ID} SPLUNK_APP_PACKAGE: ${SPLUNK_APP_PACKAGE} SPLUNK_VERSION: ${SPLUNK_VERSION} + platform: linux/amd64 ports: - "8000" - "8088" @@ -91,6 +92,7 @@ services: SPLUNK_APP_PACKAGE: ${SPLUNK_APP_PACKAGE} SPLUNK_VERSION: ${SPLUNK_VERSION} hostname: uf + platform: linux/amd64 ports: - "9997" - "8089" diff --git a/docs/how_to_use.md b/docs/how_to_use.md index 0e27cf9b0..e72083705 100644 --- a/docs/how_to_use.md +++ b/docs/how_to_use.md @@ -355,6 +355,19 @@ The following optional arguments are available to modify the default settings in - Select false to disable test execution, default value is true + ```console + --splunk-ep + ``` + + - Enable Splunk Edge Processor mode when your events are transformed during ingestion. + - **Why needed**: Edge Processor modifies event content (transformations, parsing, enrichment), which breaks tests that search for literal event content. + - When enabled, the following tests use UUID-based matching instead of escaped _raw: + - `test_cim_fields_recommended` (CIM compliance tests) + - `test_requirement_fields` (requirement field tests) + - `test_datamodels` (datamodel mapping tests) + - **Limitation**: These tests are only generated for samples using HEC Event ingestor (`modinput`, `windows_input`) because other ingestors don't support UUID indexed fields. + - **Other test types**: Field extraction, tags, eventtypes, savedsearches, etc. are generated for ALL samples and work normally with EP transformations. + ## Extending pytest-splunk-addon **1. Test cases taking too long to execute** diff --git a/pytest_splunk_addon/app_test_generator.py b/pytest_splunk_addon/app_test_generator.py index 2ba62681d..4d6e200c3 100644 --- a/pytest_splunk_addon/app_test_generator.py +++ b/pytest_splunk_addon/app_test_generator.py @@ -45,19 +45,23 @@ class AppTestGenerator(object): def __init__(self, pytest_config): self.pytest_config = pytest_config self.seen_tests = set() + self.splunk_ep = self.pytest_config.getoption("splunk_ep") + self.config_path = self.pytest_config.getoption("splunk_data_generator") + self.store_events = self.pytest_config.getoption("store_events") - store_events = self.pytest_config.getoption("store_events") - config_path = self.pytest_config.getoption("splunk_data_generator") sample_generator = SampleXdistGenerator( - self.pytest_config.getoption("splunk_app"), config_path + self.pytest_config.getoption("splunk_app"), + self.splunk_ep, + self.config_path, ) - store_sample = sample_generator.get_samples(store_events) + store_sample = sample_generator.get_samples(self.store_events) self.tokenized_events = store_sample.get("tokenized_events") LOGGER.debug("Initializing FieldTestGenerator to generate the test cases") self.fieldtest_generator = FieldTestGenerator( self.pytest_config.getoption("splunk_app"), self.tokenized_events, field_bank=self.pytest_config.getoption("field_bank", False), + splunk_ep=self.splunk_ep, ) data_model_path = os.path.join( @@ -68,9 +72,42 @@ def __init__(self, pytest_config): self.pytest_config.getoption("splunk_app"), self.pytest_config.getoption("splunk_dm_path") or data_model_path, self.tokenized_events, + splunk_ep=self.splunk_ep, ) self.indextime_test_generator = IndexTimeTestGenerator() + def _generate_indextime_tests(self, fixture): + """ + Generate index time tests based on the fixture type. + + Args: + fixture (str): The fixture name containing the test type + + Returns: + list: List of pytest parameters for the specified test type + """ + app_path = self.pytest_config.getoption("splunk_app") + config_path = self.pytest_config.getoption("splunk_data_generator") + + if "key_fields" in fixture: + test_type = "key_fields" + elif "_time" in fixture: + test_type = "_time" + elif "line_breaker" in fixture: + test_type = "line_breaker" + else: + return [] + + return list( + self.indextime_test_generator.generate_tests( + self.store_events, + app_path=app_path, + config_path=config_path, + test_type=test_type, + splunk_ep=self.splunk_ep, + ) + ) + def generate_tests(self, fixture): """ Generate the test cases based on the fixture provided @@ -100,47 +137,8 @@ def generate_tests(self, fixture): self.cim_test_generator.generate_tests(fixture), fixture, ) - elif fixture.startswith("splunk_indextime"): - # TODO: What should be the id of the test case? - # Sourcetype + Host + Key field + _count - - pytest_params = None - - store_events = self.pytest_config.getoption("store_events") - app_path = self.pytest_config.getoption("splunk_app") - config_path = self.pytest_config.getoption("splunk_data_generator") - - if "key_fields" in fixture: - pytest_params = list( - self.indextime_test_generator.generate_tests( - store_events, - app_path=app_path, - config_path=config_path, - test_type="key_fields", - ) - ) - - elif "_time" in fixture: - pytest_params = list( - self.indextime_test_generator.generate_tests( - store_events, - app_path=app_path, - config_path=config_path, - test_type="_time", - ) - ) - - elif "line_breaker" in fixture: - pytest_params = list( - self.indextime_test_generator.generate_tests( - store_events, - app_path=app_path, - config_path=config_path, - test_type="line_breaker", - ) - ) - + pytest_params = self._generate_indextime_tests(fixture) yield from sorted(pytest_params, key=lambda param: param.id) def dedup_tests(self, test_list, fixture): diff --git a/pytest_splunk_addon/cim_tests/test_generator.py b/pytest_splunk_addon/cim_tests/test_generator.py index 1b9df9b56..5e133cfd4 100644 --- a/pytest_splunk_addon/cim_tests/test_generator.py +++ b/pytest_splunk_addon/cim_tests/test_generator.py @@ -24,6 +24,7 @@ from . import DataModelHandler from ..addon_parser import AddonParser from ..addon_parser import Field +from ..utils import EP_COMPATIBLE_INPUT_TYPES LOGGER = logging.getLogger("pytest-splunk-addon") @@ -52,6 +53,7 @@ def __init__( tokenized_events, test_field_type=["required", "conditional"], common_fields_path=None, + splunk_ep=False, ): self.data_model_handler = DataModelHandler(data_model_path) @@ -61,6 +63,7 @@ def __init__( self.common_fields_path = common_fields_path or op.join( op.dirname(op.abspath(__file__)), self.COMMON_FIELDS_PATH ) + self.splunk_ep = splunk_ep def generate_tests(self, fixture): """ @@ -267,12 +270,29 @@ def generate_recommended_fields_tests(self): 2. combine the fields list with the defined exceptions 3. yield object with datamodel, dataset, cim_version and list of fields """ + skipped_samples = set() + + # Get EP-compatible input types once before the loop if EP mode is enabled + ep_compatible_types = EP_COMPATIBLE_INPUT_TYPES if self.splunk_ep else None + for event in self.tokenized_events: if ( not event.requirement_test_data or event.requirement_test_data.keys() == {"other_fields"} ): continue + + # Skip incompatible samples when Splunk EP mode is enabled + if self.splunk_ep: + input_type = event.metadata.get("input_type", "default") + if input_type not in ep_compatible_types: + if event.sample_name not in skipped_samples: + LOGGER.info( + f"Splunk EP mode: Skipping CIM recommended fields tests for sample '{event.sample_name}' " + f"(input_type: {input_type}) as it's not ingested by HECEventIngestor" + ) + skipped_samples.add(event.sample_name) + continue for _, datamodels in event.requirement_test_data["datamodels"].items(): if type(datamodels) is not list: datamodels = [datamodels] diff --git a/pytest_splunk_addon/docker_class.py b/pytest_splunk_addon/docker_class.py index dfaad1ca4..ea8bcbcfa 100644 --- a/pytest_splunk_addon/docker_class.py +++ b/pytest_splunk_addon/docker_class.py @@ -65,7 +65,10 @@ def start(self, *services): :param services: the names of the services as defined in compose file """ - self._docker_compose.execute("up", "--build", "--wait", *services) + + self._docker_compose.execute( + "up", "--build", "--wait", "--no-recreate", "-d", *services + ) def stop(self, *services): """Ensures that the given services are stopped via docker compose. diff --git a/pytest_splunk_addon/event_ingestors/hec_event_ingestor.py b/pytest_splunk_addon/event_ingestors/hec_event_ingestor.py index b1e2aba07..cb08f762a 100644 --- a/pytest_splunk_addon/event_ingestors/hec_event_ingestor.py +++ b/pytest_splunk_addon/event_ingestors/hec_event_ingestor.py @@ -93,6 +93,8 @@ def ingest(self, events, thread_count): "event": event.event, "index": event.metadata.get("index", "main"), } + if event.metadata.get("splunk_ep"): + event_dict["fields"] = {"unique_identifier": event.unique_identifier} if event.metadata.get("host_type") in ("plugin", None): host = event.metadata.get("host") diff --git a/pytest_splunk_addon/event_ingestors/ingestor_helper.py b/pytest_splunk_addon/event_ingestors/ingestor_helper.py index 7416a722f..b505fd850 100644 --- a/pytest_splunk_addon/event_ingestors/ingestor_helper.py +++ b/pytest_splunk_addon/event_ingestors/ingestor_helper.py @@ -31,6 +31,20 @@ class IngestorHelper(object): Module for helper methods for ingestors. """ + # Mapping of input types to their corresponding ingestor classes + # Note: This is used by utils.get_ep_compatible_input_types() to determine EP-compatible types + INGEST_METHODS = { + "modinput": HECEventIngestor, + "windows_input": HECEventIngestor, + "file_monitor": HECRawEventIngestor, + "uf_file_monitor": FileMonitorEventIngestor, + "scripted_input": HECRawEventIngestor, + "hec_metric": HECMetricEventIngestor, + "syslog_tcp": SC4SEventIngestor, + "syslog_udp": None, # TBD + "default": HECRawEventIngestor, + } + @classmethod def get_event_ingestor(cls, input_type, ingest_meta_data): """ @@ -40,19 +54,7 @@ def get_event_ingestor(cls, input_type, ingest_meta_data): input_type (str): input_type defined in pytest-splunk-addon-data.conf ingest_meta_data (dict): Dictionary of required meta_data. """ - ingest_methods = { - "modinput": HECEventIngestor, - "windows_input": HECEventIngestor, - "file_monitor": HECRawEventIngestor, - "uf_file_monitor": FileMonitorEventIngestor, - "scripted_input": HECRawEventIngestor, - "hec_metric": HECMetricEventIngestor, - "syslog_tcp": SC4SEventIngestor, - "syslog_udp": None, # TBD - "default": HECRawEventIngestor, - } - - ingestor = ingest_methods.get(input_type)(ingest_meta_data) + ingestor = cls.INGEST_METHODS.get(input_type)(ingest_meta_data) LOGGER.debug("Using the following HEC ingestor: {}".format(str(ingestor))) return ingestor @@ -95,7 +97,8 @@ def ingest_events( thread_count (int): number of threads to use for ingestion store_events (bool): Boolean param for generating json files with tokenised events """ - sample_generator = SampleXdistGenerator(addon_path, config_path) + splunk_ep = ingest_meta_data.get("splunk_ep", False) + sample_generator = SampleXdistGenerator(addon_path, splunk_ep, config_path) store_sample = sample_generator.get_samples(store_events) tokenized_events = store_sample.get("tokenized_events") ingestor_dict = cls.get_consolidated_events(tokenized_events) diff --git a/pytest_splunk_addon/fields_tests/test_generator.py b/pytest_splunk_addon/fields_tests/test_generator.py index 8f4716c04..c31f4eab5 100644 --- a/pytest_splunk_addon/fields_tests/test_generator.py +++ b/pytest_splunk_addon/fields_tests/test_generator.py @@ -25,6 +25,7 @@ from ..addon_parser import AddonParser from . import FieldBank from ..utilities import xml_event_parser +from ..utils import EP_COMPATIBLE_INPUT_TYPES LOGGER = logging.getLogger("pytest-splunk-addon") @@ -44,12 +45,13 @@ class FieldTestGenerator(object): field_bank (str): Path of the fields Json file """ - def __init__(self, app_path, tokenized_events, field_bank=None): + def __init__(self, app_path, tokenized_events, field_bank=None, splunk_ep=False): LOGGER.debug("initializing AddonParser to parse the app") self.app_path = app_path self.addon_parser = AddonParser(self.app_path) self.tokenized_events = tokenized_events self.field_bank = field_bank + self.splunk_ep = splunk_ep def generate_tests(self, fixture): """ @@ -159,12 +161,30 @@ def generate_requirements_datamodels_tests(self): Yields: pytest.params for the test templates """ + skipped_samples = set() + + # Get EP-compatible input types once before the loop if EP mode is enabled + ep_compatible_types = EP_COMPATIBLE_INPUT_TYPES if self.splunk_ep else None + for event in self.tokenized_events: if ( not event.requirement_test_data or event.requirement_test_data.keys() == {"other_fields"} ): continue + + # Skip incompatible samples when Splunk EP mode is enabled + if self.splunk_ep: + input_type = event.metadata.get("input_type", "default") + if input_type not in ep_compatible_types: + if event.sample_name not in skipped_samples: + LOGGER.info( + f"Splunk EP mode: Skipping datamodel tests for sample '{event.sample_name}' " + f"(input_type: {input_type}) as it's not ingested by HECEventIngestor" + ) + skipped_samples.add(event.sample_name) + continue + if event.metadata.get("input_type", "").startswith("syslog"): stripped_event = xml_event_parser.strip_syslog_header(event.event) if stripped_event is None: @@ -190,11 +210,14 @@ def generate_requirements_datamodels_tests(self): datamodel.replace(" ", "_").replace(":", "_") for datamodel in datamodels ] + sample_event = { + "datamodels": datamodels, + "stanza": escaped_event, + } + if self.splunk_ep and getattr(event, "unique_identifier", None): + sample_event["unique_identifier"] = event.unique_identifier yield pytest.param( - { - "datamodels": datamodels, - "stanza": escaped_event, - }, + sample_event, id=f"{'-'.join(datamodels)}::sample_name::{event.sample_name}::host::{event.metadata.get('host')}", ) @@ -231,9 +254,27 @@ def generate_requirements_tests(self): Yields: pytest.params for the test templates """ + skipped_samples = set() + + # Get EP-compatible input types once before the loop if EP mode is enabled + ep_compatible_types = EP_COMPATIBLE_INPUT_TYPES if self.splunk_ep else None + for event in self.tokenized_events: if not event.requirement_test_data: continue + + # Skip incompatible samples when Splunk EP mode is enabled + if self.splunk_ep: + input_type = event.metadata.get("input_type", "default") + if input_type not in ep_compatible_types: + if event.sample_name not in skipped_samples: + LOGGER.info( + f"Splunk EP mode: Skipping requirement tests for sample '{event.sample_name}' " + f"(input_type: {input_type}). Only 'modinput' and 'windows_input' are supported." + ) + skipped_samples.add(event.sample_name) + continue + if event.metadata.get("input_type", "").startswith("syslog"): stripped_event = xml_event_parser.strip_syslog_header(event.event) if stripped_event is None: @@ -246,9 +287,8 @@ def generate_requirements_tests(self): escaped_event = xml_event_parser.escape_char_event(stripped_event) exceptions = event.requirement_test_data.get("exceptions", {}) - metadata = event.metadata modinput_params = { - "sourcetype": metadata.get("sourcetype_to_search"), + "sourcetype": event.metadata.get("sourcetype_to_search"), } cim_fields = event.requirement_test_data.get("cim_fields", {}) @@ -261,12 +301,17 @@ def generate_requirements_tests(self): for field, value in requirement_fields.items() if field not in exceptions } + sample_event = { + "escaped_event": escaped_event, + "fields": requirement_fields, + "modinput_params": modinput_params, + } + + if self.splunk_ep and getattr(event, "unique_identifier", None): + sample_event["unique_identifier"] = event.unique_identifier + yield pytest.param( - { - "escaped_event": escaped_event, - "fields": requirement_fields, - "modinput_params": modinput_params, - }, + sample_event, id=f"sample_name::{event.sample_name}::host::{event.metadata.get('host')}", ) diff --git a/pytest_splunk_addon/fields_tests/test_templates.py b/pytest_splunk_addon/fields_tests/test_templates.py index 89e1720ac..6aa288f9a 100644 --- a/pytest_splunk_addon/fields_tests/test_templates.py +++ b/pytest_splunk_addon/fields_tests/test_templates.py @@ -162,15 +162,24 @@ def test_requirements_fields( """ # Search Query - record_property( - "stanza_name", splunk_searchtime_fields_requirements["escaped_event"] + unique_identifier = splunk_searchtime_fields_requirements.get( + "unique_identifier" + ) + escaped_event = splunk_searchtime_fields_requirements.get("escaped_event") + + self.logger.info( + f"Testing requirements for event: {unique_identifier or escaped_event}" + ) + self.logger.debug( + f"unique_identifier={unique_identifier}, escaped_event={escaped_event}" ) + + record_property("Event_with", unique_identifier or escaped_event) record_property("fields", splunk_searchtime_fields_requirements["fields"]) record_property( "modinput_params", splunk_searchtime_fields_requirements["modinput_params"] ) - escaped_event = splunk_searchtime_fields_requirements["escaped_event"] fields = splunk_searchtime_fields_requirements["fields"] modinput_params = splunk_searchtime_fields_requirements["modinput_params"] @@ -185,7 +194,14 @@ def test_requirements_fields( if param_value is not None: basic_search += f" {param}={param_value}" - search = f"search {index_list} {basic_search} {escaped_event} | fields *" + if unique_identifier: + selector = f'unique_identifier="{unique_identifier}"' + elif escaped_event: + selector = escaped_event + else: + selector = "" + + search = f"search {index_list} {basic_search} {selector} | fields *" self.logger.info(f"Executing the search query: {search}") @@ -200,6 +216,9 @@ def test_requirements_fields( missing_fields = [] wrong_value_fields = {} + self.logger.debug(f"Expected fields: {list(fields.keys())}") + self.logger.debug(f"Fields from Splunk: {list(fields_from_splunk.keys())}") + for field, value in fields.items(): if field not in fields_from_splunk: missing_fields.append(field) diff --git a/pytest_splunk_addon/index_tests/test_generator.py b/pytest_splunk_addon/index_tests/test_generator.py index 7bf8d2195..c3dbcba34 100644 --- a/pytest_splunk_addon/index_tests/test_generator.py +++ b/pytest_splunk_addon/index_tests/test_generator.py @@ -32,7 +32,9 @@ class IndexTimeTestGenerator(object): for the Add-on. """ - def generate_tests(self, store_events, app_path, config_path, test_type): + def generate_tests( + self, store_events, app_path, config_path, test_type, splunk_ep=False + ): """ Generates the test cases based on test_type @@ -41,12 +43,13 @@ def generate_tests(self, store_events, app_path, config_path, test_type): app_path (str): Path of the app package config_path (str): Path of package which contains pytest-splunk-addon-data.conf test_type (str): Type of test case + splunk_ep (bool): Whether Splunk EP mode is enabled Yields: pytest.params for the test templates """ - sample_generator = SampleXdistGenerator(app_path, config_path) + sample_generator = SampleXdistGenerator(app_path, splunk_ep, config_path) store_sample = sample_generator.get_samples(store_events) tokenized_events = store_sample.get("tokenized_events") if not store_sample.get("conf_name") == "psa-data-gen": diff --git a/pytest_splunk_addon/sample_generation/sample_generator.py b/pytest_splunk_addon/sample_generation/sample_generator.py index d0d747878..66ee7f6a4 100644 --- a/pytest_splunk_addon/sample_generation/sample_generator.py +++ b/pytest_splunk_addon/sample_generation/sample_generator.py @@ -33,10 +33,11 @@ class SampleGenerator(object): sample_stanzas = [] conf_name = " " - def __init__(self, addon_path, config_path=None, process_count=4): + def __init__(self, addon_path, config_path=None, splunk_ep=False, process_count=4): self.addon_path = addon_path self.process_count = process_count self.config_path = config_path + self.splunk_ep = splunk_ep def get_samples(self): """ @@ -44,10 +45,15 @@ def get_samples(self): """ if not SampleGenerator.sample_stanzas: psa_data_parser = PytestSplunkAddonDataParser( - self.addon_path, config_path=self.config_path + self.addon_path, + config_path=self.config_path, ) sample_stanzas = psa_data_parser.get_sample_stanzas() SampleGenerator.conf_name = psa_data_parser.conf_name + + for stanza in sample_stanzas: + stanza.metadata["splunk_ep"] = self.splunk_ep + with ThreadPoolExecutor(min(20, max(len(sample_stanzas), 1))) as t: t.map(SampleStanza.get_raw_events, sample_stanzas) _ = list( diff --git a/pytest_splunk_addon/sample_generation/sample_stanza.py b/pytest_splunk_addon/sample_generation/sample_stanza.py index 5444e95c2..ff0d2968c 100644 --- a/pytest_splunk_addon/sample_generation/sample_stanza.py +++ b/pytest_splunk_addon/sample_generation/sample_stanza.py @@ -16,6 +16,7 @@ import os import re import copy +import uuid from . import Rule from . import raise_warning from . import SampleEvent @@ -102,6 +103,9 @@ def tokenize(self, conf_name): if each_rule: raw_event[event_counter] = each_rule.apply(raw_event[event_counter]) for event in raw_event[event_counter]: + if event.metadata.get("splunk_ep"): + event.unique_identifier = str(uuid.uuid4()) + host_value = event.metadata.get("host") host = token_value(key=host_value, value=host_value) event.update_requirement_test_field("host", "##host##", host) diff --git a/pytest_splunk_addon/sample_generation/sample_xdist_generator.py b/pytest_splunk_addon/sample_generation/sample_xdist_generator.py index a765a11de..f878265a5 100644 --- a/pytest_splunk_addon/sample_generation/sample_xdist_generator.py +++ b/pytest_splunk_addon/sample_generation/sample_xdist_generator.py @@ -33,10 +33,11 @@ class SampleXdistGenerator: process_count (num): generate {no} process for execution """ - def __init__(self, addon_path, config_path=None, process_count=4): + def __init__(self, addon_path, splunk_ep: bool, config_path=None, process_count=4): self.addon_path = addon_path self.process_count = process_count self.config_path = config_path + self.splunk_ep = splunk_ep def get_samples(self, store_events): """ @@ -67,7 +68,7 @@ def get_samples(self, store_events): store_sample = pickle.load(file_obj) else: sample_generator = SampleGenerator( - self.addon_path, self.config_path + self.addon_path, self.config_path, self.splunk_ep ) tokenized_events = list(sample_generator.get_samples()) store_sample = { @@ -79,7 +80,9 @@ def get_samples(self, store_events): with open(file_path, "wb") as file_obj: pickle.dump(store_sample, file_obj) else: - sample_generator = SampleGenerator(self.addon_path, self.config_path) + sample_generator = SampleGenerator( + self.addon_path, self.config_path, self.splunk_ep + ) tokenized_events = list(sample_generator.get_samples()) store_sample = { "conf_name": SampleGenerator.conf_name, @@ -128,24 +131,22 @@ def store_events(self, tokenized_events): "expected_event_count": expected_count, "index": each_event.metadata.get("index", "main"), }, - "events": [ - { - "event": each_event.event, - "key_fields": each_event.key_fields, - "time_values": each_event.time_values, - "requirement_test_data": each_event.requirement_test_data, - } - ], + "events": [], } - else: - tokenized_samples_dict[each_event.sample_name]["events"].append( - { - "event": each_event.event, - "key_fields": each_event.key_fields, - "time_values": each_event.time_values, - "requirement_test_data": each_event.requirement_test_data, - } - ) + + # Create event dict with optional UUID + sample_event = { + "event": each_event.event, + "key_fields": each_event.key_fields, + "time_values": each_event.time_values, + "requirement_test_data": each_event.requirement_test_data, + } + if self.splunk_ep: + sample_event["unique_identifier"] = each_event.unique_identifier + + tokenized_samples_dict[each_event.sample_name]["events"].append( + sample_event + ) for sample_name, tokenized_sample in tokenized_samples_dict.items(): with open( diff --git a/pytest_splunk_addon/splunk.py b/pytest_splunk_addon/splunk.py index 8d81ec756..bcff2cc38 100644 --- a/pytest_splunk_addon/splunk.py +++ b/pytest_splunk_addon/splunk.py @@ -48,6 +48,19 @@ def pytest_addoption(parser): by another process such as a ci/cd pipeline """ group = parser.getgroup("splunk-addon") + group.addoption( + "--splunk-ep", + action="store_true", + dest="splunk_ep", + default=False, + help=( + "Enable Splunk Edge Processor mode. When using Edge Processor, events are transformed " + "during ingestion, making literal content matching unreliable. This flag enables UUID-based " + "event matching for CIM compliance tests (test_cim_fields_recommended, test_requirement_fields, " + "test_datamodels). Only samples using HEC Event ingestor (modinput, windows_input) are tested. " + "Other test types remain unaffected." + ), + ) group.addoption( "--splunk-app", @@ -733,6 +746,7 @@ def splunk_ingest_data(request, splunk_hec_uri, sc4s, uf, splunk_events_cleanup) "splunk_hec_uri": splunk_hec_uri[1], "sc4s_host": sc4s[0], # for sc4s "sc4s_port": sc4s[1][514], # for sc4s + "splunk_ep": request.config.getoption("splunk_ep"), } thread_count = int(request.config.getoption("thread_count")) store_events = request.config.getoption("store_events") diff --git a/pytest_splunk_addon/utils.py b/pytest_splunk_addon/utils.py index e5da7f5b5..5915f9832 100644 --- a/pytest_splunk_addon/utils.py +++ b/pytest_splunk_addon/utils.py @@ -24,3 +24,9 @@ def check_first_worker() -> bool: "PYTEST_XDIST_WORKER" not in os.environ or os.environ.get("PYTEST_XDIST_WORKER") == "gw0" ) + + +# Input types that use HEC Event ingestor, which supports UUID via indexed fields. +# These are the only input types compatible with Splunk Edge Processor mode. +# See: event_ingestors.ingestor_helper.IngestorHelper.INGEST_METHODS +EP_COMPATIBLE_INPUT_TYPES = ("modinput", "windows_input") diff --git a/tests/e2e/test_splunk_addon.py b/tests/e2e/test_splunk_addon.py index 04afafd59..e3ce1bfbd 100644 --- a/tests/e2e/test_splunk_addon.py +++ b/tests/e2e/test_splunk_addon.py @@ -812,3 +812,109 @@ def empty_method(): # Here we are not interested in the failures or errors, # we are basically checking that we get results and test execution does not get stuck assert result.parseoutcomes().get("passed") > 0 + + +@pytest.mark.docker +@pytest.mark.splunk_app_fiction_with_uuid +def test_splunk_app_fiction_with_uuid(testdir, request): + """Make sure that pytest accepts our fixture.""" + + testdir.makepyfile( + """ + from pytest_splunk_addon.standard_lib.addon_basic import Basic + class Test_App(Basic): + def empty_method(): + pass + + """ + ) + + shutil.copytree( + os.path.join(testdir.request.fspath.dirname, "addons/TA_fiction"), + os.path.join(testdir.tmpdir, "package"), + ) + + setup_test_dir(testdir) + SampleGenerator.clean_samples() + Rule.clean_rules() + + # run pytest with the following cmd args + result = testdir.runpytest( + f"--splunk-version={request.config.getoption('splunk_version')}", + "--splunk-type=docker", + "-v", + "-m splunk_searchtime_fields", + "--search-interval=4", + "--search-retry=4", + "--search-index=*,_internal", + "--use-uuid", + ) + + logger.info(result.outlines) + result.stdout.fnmatch_lines_random( + constants.TA_FICTION_UUID_PASSED + constants.TA_FICTION_UUID_SKIPPED + ) + result.assert_outcomes( + passed=len(constants.TA_FICTION_UUID_PASSED), + failed=0, + skipped=len(constants.TA_FICTION_UUID_SKIPPED), + ) + + # make sure that we get a '0' exit code for the testsuite + assert result.ret == 0 + + +@pytest.mark.docker +@pytest.mark.splunk_app_req_with_uuid +def test_splunk_app_req_with_uuid(testdir, request): + """Make sure that pytest accepts our fixture.""" + + testdir.makepyfile( + """ + from pytest_splunk_addon.standard_lib.addon_basic import Basic + class Test_App(Basic): + def empty_method(): + pass + """ + ) + + shutil.copytree( + os.path.join(testdir.request.fspath.dirname, "addons/TA_transition_from_req"), + os.path.join(testdir.tmpdir, "package"), + ) + + shutil.copytree( + os.path.join(testdir.request.fspath.dirname, "test_data_models"), + os.path.join(testdir.tmpdir, "tests/data_models"), + ) + + setup_test_dir(testdir) + SampleGenerator.clean_samples() + Rule.clean_rules() + + # run pytest with the following cmd args + result = testdir.runpytest( + f"--splunk-version={request.config.getoption('splunk_version')}", + "--splunk-type=docker", + "-v", + "--search-interval=4", + "--search-retry=4", + "--search-index=*", + "--splunk-data-generator=tests/addons/TA_transition_from_req/default", + "--use-uuid", + ) + logger.info(result.outlines) + + result.stdout.fnmatch_lines_random( + constants.TA_REQ_WITH_UUID_TRANSITION_PASSED + + constants.TA_REQ_WITH_UUID_TRANSITION_FAILED + + constants.TA_REQ_WITH_UUID_TRANSITION_SKIPPED + ) + result.assert_outcomes( + passed=len(constants.TA_REQ_WITH_UUID_TRANSITION_PASSED), + failed=len(constants.TA_REQ_WITH_UUID_TRANSITION_FAILED), + skipped=len(constants.TA_REQ_WITH_UUID_TRANSITION_SKIPPED), + ) + + # make sure that we get a non '0' exit code for the testsuite as it contains failure + assert result.ret == 0, "result not equal to 0" diff --git a/tests/unit/tests_standard_lib/test_app_test_generator.py b/tests/unit/tests_standard_lib/test_app_test_generator.py index a22821e9f..d12b4aa5f 100644 --- a/tests/unit/tests_standard_lib/test_app_test_generator.py +++ b/tests/unit/tests_standard_lib/test_app_test_generator.py @@ -11,6 +11,7 @@ "store_events": True, "splunk_data_generator": "psa.conf", "requirement_test": "fake_requirement_path", + "splunk_ep": False, } pytest_config = namedtuple("Config", ["getoption"]) test_config = pytest_config(getoption=lambda x, *y: config[x]) @@ -50,8 +51,11 @@ def test_app_test_generator_instantiation( config["splunk_app"], [], field_bank=config["field_bank"], + splunk_ep=config["splunk_ep"], + ) + atg.cim_test_generator.assert_called_once_with( + config["splunk_app"], path, [], splunk_ep=config["splunk_ep"] ) - atg.cim_test_generator.assert_called_once_with(config["splunk_app"], path, []) atg.indextime_test_generator.assert_called_once_with() @@ -87,7 +91,7 @@ def test_app_test_generator_instantiation( ( "splunk_indextime_key_fields", "indextime_test_generator", - lambda x, app_path, config_path, test_type: ( + lambda x, app_path, config_path, test_type, splunk_ep: ( params(values=f"splunk_indextime_{test_type}_test_{3 - i}", id=3 - i) for i in range(3) ), @@ -96,6 +100,7 @@ def test_app_test_generator_instantiation( "app_path": "fake_app", "config_path": "psa.conf", "test_type": "key_fields", + "splunk_ep": False, }, [ params(values=f"splunk_indextime_key_fields_test_1", id=1), @@ -107,12 +112,17 @@ def test_app_test_generator_instantiation( ( "splunk_indextime_time", "indextime_test_generator", - lambda x, app_path, config_path, test_type: ( + lambda x, app_path, config_path, test_type, splunk_ep: ( params(values=f"splunk_indextime_{test_type}_test_{3 - i}", id=3 - i) for i in range(3) ), [True], - {"app_path": "fake_app", "config_path": "psa.conf", "test_type": "_time"}, + { + "app_path": "fake_app", + "config_path": "psa.conf", + "test_type": "_time", + "splunk_ep": False, + }, [ params(values=f"splunk_indextime__time_test_1", id=1), params(values=f"splunk_indextime__time_test_2", id=2), @@ -123,7 +133,7 @@ def test_app_test_generator_instantiation( ( "splunk_indextime_line_breaker", "indextime_test_generator", - lambda x, app_path, config_path, test_type: ( + lambda x, app_path, config_path, test_type, splunk_ep: ( params(values=f"splunk_indextime_{test_type}_test_{3 - i}", id=3 - i) for i in range(3) ), @@ -132,6 +142,7 @@ def test_app_test_generator_instantiation( "app_path": "fake_app", "config_path": "psa.conf", "test_type": "line_breaker", + "splunk_ep": False, }, [ params(values=f"splunk_indextime_line_breaker_test_1", id=1), diff --git a/tests/unit/tests_standard_lib/test_event_ingestors/conftest.py b/tests/unit/tests_standard_lib/test_event_ingestors/conftest.py index 492b6e455..a6ad6574a 100644 --- a/tests/unit/tests_standard_lib/test_event_ingestors/conftest.py +++ b/tests/unit/tests_standard_lib/test_event_ingestors/conftest.py @@ -11,6 +11,10 @@ class SampleEvent: key_fields: dict = None time_values: list = None + def __post_init__(self): + if self.metadata.get("splunk_ep"): + self.unique_identifier = "uuid" + @pytest.fixture() def modinput_events(): @@ -28,6 +32,7 @@ def modinput_events(): "sample_count": "2", "host": "modinput_host_event_time_plugin.samples_1", "expected_event_count": 2, + "splunk_ep": True, }, sample_name="modinput_host_event_time_plugin.samples", ), @@ -44,6 +49,7 @@ def modinput_events(): "sample_count": "2", "host": "modinput_host_event_time_plugin.samples_2", "expected_event_count": 2, + "splunk_ep": False, }, sample_name="modinput_host_event_time_plugin.samples", ), @@ -56,6 +62,7 @@ def modinput_events(): "index": "fake_index", "timestamp_type": "event", "host": "fake host", + "splunk_ep": False, }, sample_name="fake.samples", time_values=[1234.5678, 1234.5679], @@ -73,6 +80,7 @@ def modinput_posts_sent(): '"source": "pytest-splunk-addon:modinput", ' '"event": "test_modinput_1 host=modinput_host_event_time_plugin.samples_1", ' '"index": "main", ' + '"fields": {"unique_identifier": "uuid"}, ' '"host": "modinput_host_event_time_plugin.samples_1"' "}\n{" '"sourcetype": "test:indextime:sourcetype:modinput_host_event_time_plugin", ' @@ -112,6 +120,7 @@ def file_monitor_events(): "sample_count": "2", "host": "file_monitor_host_prefix.sample", "expected_event_count": 1, + "splunk_ep": False, }, sample_name="file_monitor_host_prefix.sample", ), @@ -130,6 +139,7 @@ def file_monitor_events(): "host": "failing-samples-1", "id": "failing.samples_1", "expected_event_count": 2, + "splunk_ep": False, }, sample_name="failing.samples", ), @@ -138,6 +148,7 @@ def file_monitor_events(): metadata={ "input_type": "file_monitor", "index": "fake_index", + "splunk_ep": False, }, sample_name="fake.samples", ), @@ -234,6 +245,7 @@ def requirement_events(): "sample_count": "2", "host": "requirement_host_prefix.sample", "expected_event_count": 1, + "splunk_ep": False, }, sample_name="requirement_test", ), @@ -256,6 +268,7 @@ def sc4s_events(): "host": "sc4s-host-plugin-time-sample-31", "id": "sc4s_host_plugin_time.sample_31", "expected_event_count": 2, + "splunk_ep": False, }, sample_name="sc4s_host_plugin_time.sample", ), @@ -272,6 +285,7 @@ def sc4s_events(): "host": "sc4s-host-plugin-time-sample-32", "id": "sc4s-host-plugin-time-sample-32", "expected_event_count": 2, + "splunk_ep": False, }, sample_name="sc4s_host_plugin_time.sample", ), diff --git a/tests/unit/tests_standard_lib/test_event_ingestors/test_ingestor_helper.py b/tests/unit/tests_standard_lib/test_event_ingestors/test_ingestor_helper.py index 2d6702a5b..36d616559 100644 --- a/tests/unit/tests_standard_lib/test_event_ingestors/test_ingestor_helper.py +++ b/tests/unit/tests_standard_lib/test_event_ingestors/test_ingestor_helper.py @@ -125,7 +125,7 @@ def test_events_can_be_ingested( get_ingestor_mock, sample_mock, file_monitor_events, modinput_events ): event_ingestors.ingestor_helper.IngestorHelper.ingest_events( - ingest_meta_data={}, + ingest_meta_data={"splunk_ep": False}, addon_path="fake_path", config_path="tests/unit/event_ingestors", thread_count=20, @@ -133,9 +133,47 @@ def test_events_can_be_ingested( ) assert get_ingestor_mock.call_count == 2 get_ingestor_mock.assert_has_calls( - [call("file_monitor", {}), call("modinput", {})], any_order=True + [ + call("file_monitor", {"splunk_ep": False}), + call("modinput", {"splunk_ep": False}), + ], + any_order=True, ) assert get_ingestor_mock.ingest.call_count == 2 get_ingestor_mock.ingest.assert_has_calls( [call(file_monitor_events, 20), call(modinput_events, 20)] ) + + +def test_all_events_ingested_when_splunk_ep_enabled( + get_ingestor_mock, sample_mock, file_monitor_events, modinput_events +): + """Test that ALL events are ingested regardless of splunk_ep flag. + + Filtering for EP-compatible samples happens only during TEST GENERATION, + not during ingestion. This ensures all samples are available for other + test types (field extraction, tags, eventtypes, etc.) + """ + event_ingestors.ingestor_helper.IngestorHelper.ingest_events( + ingest_meta_data={"splunk_ep": True}, + addon_path="fake_path", + config_path="tests/unit/event_ingestors", + thread_count=20, + store_events=False, + ) + + # ALL ingestors should be called - no filtering at ingestion stage + assert get_ingestor_mock.call_count == 2 + get_ingestor_mock.assert_has_calls( + [ + call("file_monitor", {"splunk_ep": True}), + call("modinput", {"splunk_ep": True}), + ], + any_order=True, + ) + + # ALL events should be ingested + assert get_ingestor_mock.ingest.call_count == 2 + get_ingestor_mock.ingest.assert_has_calls( + [call(file_monitor_events, 20), call(modinput_events, 20)] + ) diff --git a/tests/unit/tests_standard_lib/test_fields_tests/test_test_generator.py b/tests/unit/tests_standard_lib/test_fields_tests/test_test_generator.py index e57daefa5..e04854492 100644 --- a/tests/unit/tests_standard_lib/test_fields_tests/test_test_generator.py +++ b/tests/unit/tests_standard_lib/test_fields_tests/test_test_generator.py @@ -24,6 +24,15 @@ def field_3(): field_3.__dict__.update({"name": "field_3"}) +@pytest.fixture +def mock_uuid4(): + with patch( + "uuid.uuid4", + return_value="uuid", + ) as mock_uuid: + yield mock_uuid + + @pytest.fixture() def addon_parser_mock(monkeypatch): ap = MagicMock() @@ -422,6 +431,7 @@ def test_generate_field_tests( "input_type": "modinput", "sourcetype_to_search": "dummy_sourcetype", "host": "dummy_host", + "splunk_ep": False, }, sample_name="file1.xml", requirement_test_data={ @@ -445,6 +455,7 @@ def test_generate_field_tests( "input_type": "syslog_tcp", "sourcetype_to_search": "dummy_sourcetype", "host": "dummy_host_syslog", + "splunk_ep": False, }, sample_name="file1.xml", requirement_test_data={}, @@ -455,6 +466,7 @@ def test_generate_field_tests( "input_type": "syslog_tcp", "sourcetype_to_search": "dummy_sourcetype", "host": "dummy_host_syslog", + "splunk_ep": False, }, sample_name="file1.xml", requirement_test_data={ @@ -522,6 +534,63 @@ def test_generate_requirement_tests(tokenised_events, expected_output): assert param_mock.call_count == len(expected_output) +def test_generate_requirement_tests_with_uuid(mock_uuid4): + event = SampleEvent( + event_string="escaped_event", + metadata={ + "input_type": "modinput", + "sourcetype_to_search": "dummy_sourcetype", + "host": "dummy_host", + "splunk_ep": True, + }, + sample_name="file1.xml", + requirement_test_data={ + "cim_fields": { + "severity": "low", + "signature_id": "405001", + "src": "192.168.0.1", + "type": "event", + }, + }, + ) + + # Simulate tokenization UUID assignment + event.unique_identifier = "uuid" + + tokenised_events = [event] + + expected_output = [ + ( + { + "escaped_event": "escaped_event", + "unique_identifier": "uuid", + "fields": { + "severity": "low", + "signature_id": "405001", + "src": "192.168.0.1", + "type": "event", + }, + "modinput_params": {"sourcetype": "dummy_sourcetype"}, + }, + "sample_name::file1.xml::host::dummy_host", + ) + ] + + with patch.object( + xml_event_parser, "escape_char_event", return_value="escaped_event" + ), patch.object(pytest, "param", side_effect=lambda x, id: (x, id)) as param_mock: + out = list( + FieldTestGenerator( + "app_path", + tokenised_events, + "field_bank", + splunk_ep=True, + ).generate_requirements_tests() + ) + assert out == expected_output + assert param_mock.call_count == len(expected_output) + + @pytest.mark.parametrize( "tokenised_events, expected_output", [ @@ -596,3 +665,197 @@ def test_generate_requirement_datamodel_tests(tokenised_events, expected_output) ) assert out == expected_output assert param_mock.call_count == len(expected_output) + + +def test_generate_requirement_datamodel_tests_with_uuid(mock_uuid4): + event = SampleEvent( + event_string="escaped_event", + metadata={ + "input_type": "modinput", + "sourcetype_to_search": "dummy_sourcetype", + "host": "dummy_host", + "splunk_ep": True, + }, + sample_name="file1.xml", + requirement_test_data={"datamodels": {"model": "Alerts"}}, + ) + + # Simulate tokenization UUID assignment + event.unique_identifier = "uuid" + + tokenised_events = [event] + + expected_output = [ + ( + { + "datamodels": ["Alerts"], + "stanza": "escaped_event", + "unique_identifier": "uuid", + }, + "Alerts::sample_name::file1.xml::host::dummy_host", + ) + ] + + with patch.object( + xml_event_parser, "escape_char_event", return_value="escaped_event" + ), patch.object(pytest, "param", side_effect=lambda x, id: (x, id)) as param_mock: + out = list( + FieldTestGenerator( + "app_path", + tokenised_events, + "field_bank", + splunk_ep=True, + ).generate_requirements_datamodels_tests() + ) + assert out == expected_output + assert param_mock.call_count == len(expected_output) + + +def test_generate_requirements_tests_filters_incompatible_input_types_when_uuid_enabled(): + """Test that incompatible input types are filtered when splunk_ep=True.""" + modinput_event = SampleEvent( + event_string="modinput_event", + metadata={ + "input_type": "modinput", + "sourcetype_to_search": "dummy_sourcetype", + "host": "modinput_host", + "splunk_ep": True, + }, + sample_name="modinput_sample.xml", + requirement_test_data={ + "cim_fields": {"src": "192.168.0.1"}, + }, + ) + modinput_event.unique_identifier = "uuid-modinput" + + file_monitor_event = SampleEvent( + event_string="file_monitor_event", + metadata={ + "input_type": "file_monitor", + "sourcetype_to_search": "dummy_sourcetype", + "host": "file_monitor_host", + "splunk_ep": True, + }, + sample_name="file_monitor_sample.xml", + requirement_test_data={ + "cim_fields": {"dest": "10.0.0.1"}, + }, + ) + file_monitor_event.unique_identifier = "uuid-file-monitor" + + tokenised_events = [modinput_event, file_monitor_event] + + with patch.object( + xml_event_parser, "escape_char_event", side_effect=lambda x: x + ), patch.object(pytest, "param", side_effect=lambda x, id: (x, id)) as param_mock: + # With splunk_ep=True, only modinput event should be included + out = list( + FieldTestGenerator( + "app_path", + tokenised_events, + "field_bank", + splunk_ep=True, + ).generate_requirements_tests() + ) + assert len(out) == 1 + assert out[0][0]["unique_identifier"] == "uuid-modinput" + assert param_mock.call_count == 1 + + +def test_generate_requirements_tests_includes_all_input_types_when_uuid_disabled(): + """Test that all input types are included when splunk_ep=False.""" + modinput_event = SampleEvent( + event_string="modinput_event", + metadata={ + "input_type": "modinput", + "sourcetype_to_search": "dummy_sourcetype", + "host": "modinput_host", + "splunk_ep": False, + }, + sample_name="modinput_sample.xml", + requirement_test_data={ + "cim_fields": {"src": "192.168.0.1"}, + }, + ) + + file_monitor_event = SampleEvent( + event_string="file_monitor_event", + metadata={ + "input_type": "file_monitor", + "sourcetype_to_search": "dummy_sourcetype", + "host": "file_monitor_host", + "splunk_ep": False, + }, + sample_name="file_monitor_sample.xml", + requirement_test_data={ + "cim_fields": {"dest": "10.0.0.1"}, + }, + ) + + tokenised_events = [modinput_event, file_monitor_event] + + with patch.object( + xml_event_parser, "escape_char_event", side_effect=lambda x: x + ), patch.object(pytest, "param", side_effect=lambda x, id: (x, id)) as param_mock: + # With splunk_ep=False, both events should be included + out = list( + FieldTestGenerator( + "app_path", + tokenised_events, + "field_bank", + splunk_ep=False, + ).generate_requirements_tests() + ) + assert len(out) == 2 + assert param_mock.call_count == 2 + + +def test_generate_requirements_datamodels_tests_filters_incompatible_input_types_when_uuid_enabled(): + """Test that incompatible input types are filtered from datamodel tests when UUID enabled.""" + windows_input_event = SampleEvent( + event_string="windows_input_event", + metadata={ + "input_type": "windows_input", + "sourcetype_to_search": "dummy_sourcetype", + "host": "windows_host", + "splunk_ep": True, + }, + sample_name="windows_sample.xml", + requirement_test_data={"datamodels": {"model": "Alerts"}}, + ) + windows_input_event.unique_identifier = "uuid-windows" + + syslog_event = SampleEvent( + event_string="syslog_event", + metadata={ + "input_type": "syslog_tcp", + "sourcetype_to_search": "dummy_sourcetype", + "host": "syslog_host", + "splunk_ep": True, + }, + sample_name="syslog_sample.xml", + requirement_test_data={"datamodels": {"model": "Network"}}, + ) + syslog_event.unique_identifier = "uuid-syslog" + + tokenised_events = [windows_input_event, syslog_event] + + with patch.object( + xml_event_parser, "strip_syslog_header", return_value="stripped_event" + ), patch.object( + xml_event_parser, "escape_char_event", side_effect=lambda x: x + ), patch.object( + pytest, "param", side_effect=lambda x, id: (x, id) + ) as param_mock: + # With splunk_ep=True, only windows_input event should be included + out = list( + FieldTestGenerator( + "app_path", + tokenised_events, + "field_bank", + splunk_ep=True, + ).generate_requirements_datamodels_tests() + ) + assert len(out) == 1 + assert out[0][0]["unique_identifier"] == "uuid-windows" + assert param_mock.call_count == 1 diff --git a/tests/unit/tests_standard_lib/tests_sample_generation/test_sample_event.py b/tests/unit/tests_standard_lib/tests_sample_generation/test_sample_event.py index c2fa51566..9647980e0 100644 --- a/tests/unit/tests_standard_lib/tests_sample_generation/test_sample_event.py +++ b/tests/unit/tests_standard_lib/tests_sample_generation/test_sample_event.py @@ -8,7 +8,7 @@ EVENT_STRING = "Event_string dad ad dfd ddas Value_5." UPDATED_STRING = "Updated_string" SAMPLE_NAME = "Sample_name" -METADATA = {"Metadata": "metadata"} +METADATA = {"Metadata": "metadata", "splunk_ep": False} RULE = "Rule" SAMPLE_HOST = "sample_host" FAKE_IPV4 = "222.222.222.222" @@ -33,6 +33,29 @@ def samp_eve(): ) +def test_sample_event_generates_uuid(): + METADATA["splunk_ep"] = True + event = pytest_splunk_addon.sample_generation.sample_event.SampleEvent( + event_string=EVENT_STRING, + metadata=METADATA, + sample_name=SAMPLE_NAME, + ) + + # UUID should not be generated during SampleEvent creation anymore + assert not hasattr( + event, "unique_identifier" + ), "UUID should not be assigned during SampleEvent creation" + + # Simulate tokenization where UUID is assigned + with patch("uuid.uuid4", return_value="uuid") as mock_uuid: + if event.metadata.get("splunk_ep"): + event.unique_identifier = str(mock_uuid()) + + mock_uuid.assert_called_once() # Ensures uuid4 was called during tokenization simulation + assert hasattr(event, "unique_identifier") # The field was set + assert event.unique_identifier == "uuid" + + def check_host_count(value): assert pytest_splunk_addon.sample_generation.sample_event.host_count == value diff --git a/tests/unit/tests_standard_lib/tests_sample_generation/test_sample_generator.py b/tests/unit/tests_standard_lib/tests_sample_generation/test_sample_generator.py index 87c87b5b7..10c3e0b7d 100644 --- a/tests/unit/tests_standard_lib/tests_sample_generation/test_sample_generator.py +++ b/tests/unit/tests_standard_lib/tests_sample_generation/test_sample_generator.py @@ -15,10 +15,12 @@ def test_init(self): assert sg.addon_path == ADDON_PATH assert sg.config_path == CONFIG_PATH assert sg.process_count == 4 - sg = SampleGenerator(ADDON_PATH, CONFIG_PATH, 2) + assert sg.splunk_ep is False + sg = SampleGenerator(ADDON_PATH, CONFIG_PATH, splunk_ep=True, process_count=2) assert sg.addon_path == ADDON_PATH assert sg.config_path == CONFIG_PATH assert sg.process_count == 2 + assert sg.splunk_ep is True def test_get_samples(self): tks_1 = "tokenized_sample_1" diff --git a/tests/unit/tests_standard_lib/tests_sample_generation/test_sample_stanza.py b/tests/unit/tests_standard_lib/tests_sample_generation/test_sample_stanza.py index 92da36e37..2fae07755 100644 --- a/tests/unit/tests_standard_lib/tests_sample_generation/test_sample_stanza.py +++ b/tests/unit/tests_standard_lib/tests_sample_generation/test_sample_stanza.py @@ -27,7 +27,10 @@ def get_params_for_get_raw_sample(): }, [ "sample_raw", - {"input_type": input_type, "host": "path_to.file_1"}, + { + "input_type": input_type, + "host": "path_to.file_1", + }, "path_to.file", ], ) @@ -47,7 +50,10 @@ def get_params_for_get_raw_sample(): }, [ "sample_raw", - {"input_type": input_type, "host": "path_to.file"}, + { + "input_type": input_type, + "host": "path_to.file", + }, "path_to.file", ], ) @@ -186,6 +192,7 @@ def test_parse_meta( "expected_event_count": "hh", "count": "ll", "index": 1, + "splunk_ep": False, "input_type": input_type, "tokens": { "token_1": {"replacementType": "all"}, @@ -199,6 +206,7 @@ def test_parse_meta( "host": host, "host_type": "plugin", "index": 1, + "splunk_ep": False, "input_type": input_type_expected, "sample_count": "1", "timestamp_type": "plugin", diff --git a/tests/unit/tests_standard_lib/tests_sample_generation/test_sample_xdist_generator.py b/tests/unit/tests_standard_lib/tests_sample_generation/test_sample_xdist_generator.py index 0b7f5236c..aa3bb2806 100644 --- a/tests/unit/tests_standard_lib/tests_sample_generation/test_sample_xdist_generator.py +++ b/tests/unit/tests_standard_lib/tests_sample_generation/test_sample_xdist_generator.py @@ -12,6 +12,7 @@ "sample_name", "metadata", "event", + "unique_identifier", "key_fields", "time_values", "requirement_test_data", @@ -30,6 +31,7 @@ "input_type": "modinput", }, "event_field", + None, # No UUID when splunk_ep is false "key_fields_field", "time_values_field", "requirement_test_data", @@ -46,6 +48,7 @@ "sample_count": 4, }, "event_field", + None, # No UUID when splunk_ep is false "key_fields_field", "time_values_field", "requirement_test_data", @@ -62,6 +65,7 @@ "sample_count": 4, }, "event_field", + None, # No UUID when splunk_ep is false "key_fields_field_3", "time_values_field_3", "requirement_test_data", @@ -71,10 +75,11 @@ class TestSampleXdistGenerator: def test_init(self): - sample_xdist_generator = SampleXdistGenerator("path", "config_path", 5) + sample_xdist_generator = SampleXdistGenerator("path", False, "config_path", 5) assert sample_xdist_generator.addon_path == "path" assert sample_xdist_generator.config_path == "config_path" assert sample_xdist_generator.process_count == 5 + assert sample_xdist_generator.splunk_ep is False @patch( "pytest_splunk_addon.sample_generation.sample_xdist_generator.FileLock", @@ -104,7 +109,7 @@ def test_init(self): ) def test_get_samples(self, pickle_mock, exists_value, environ, expected): pickle_mock.load.return_value = "pickle_loaded" - sample_xdist_generator = SampleXdistGenerator("path") + sample_xdist_generator = SampleXdistGenerator("path", False) sample_xdist_generator.store_events = MagicMock() with patch("os.path.exists", MagicMock(return_value=exists_value)), patch( "os.environ", @@ -118,16 +123,20 @@ def test_get_samples(self, pickle_mock, exists_value, environ, expected): assert sample_xdist_generator.get_samples(True) == expected @pytest.mark.parametrize( - "exists_value, makedirs_calls", - [(True, []), (False, [call("/path/to/cwd/.tokenized_events")])], + "exists_value, makedirs_calls, splunk_ep", + [ + (True, [], False), + (False, [call("/path/to/cwd/.tokenized_events")], False), + (False, [call("/path/to/cwd/.tokenized_events")], False), + ], ) - def test_store_events(self, exists_value, makedirs_calls): + def test_store_events(self, exists_value, makedirs_calls, splunk_ep): with patch("os.path.exists", MagicMock(return_value=exists_value)), patch( "os.getcwd", MagicMock(return_value="/path/to/cwd") ), patch("os.makedirs", MagicMock()) as mock_makedirs, patch( "builtins.open", mock_open() ) as open_mock: - sample_xdist_generator = SampleXdistGenerator("path") + sample_xdist_generator = SampleXdistGenerator("path", splunk_ep) sample_xdist_generator.store_events(tokenized_events) mock_makedirs.assert_has_calls(makedirs_calls) open_mock.assert_has_calls( @@ -147,3 +156,74 @@ def test_store_events(self, exists_value, makedirs_calls): ), ] ) + + def test_store_events_with_uuid(self): + tokenized_event = namedtuple( + "tokenized_event", + [ + "sample_name", + "metadata", + "event", + "unique_identifier", + "key_fields", + "time_values", + "requirement_test_data", + ], + ) + + tokenized_events = [ + tokenized_event( + "sample_with_uuid", + { + "expected_event_count": 1, + "host": "host_1", + "source": "source_1", + "sourcetype": "sourcetype_1", + "timestamp_type": "timestamp_type_1", + "input_type": "modinput", + }, + "event_field", + "uuid", + "key_fields_field", + "time_values_field", + "requirement_test_data", + ), + tokenized_event( + "sample_with_uuid", + { + "expected_event_count": 2, + "host": "host_2", + "source": "source_2", + "sourcetype": "sourcetype_2", + "timestamp_type": "timestamp_type_2", + "input_type": "input_else", + "sample_count": 4, + }, + "event_field", + "uuid", + "key_fields_field_3", + "time_values_field_3", + "requirement_test_data", + ), + ] + with patch("os.path.exists", MagicMock(return_value=False)), patch( + "os.getcwd", MagicMock(return_value="/path/to/cwd") + ), patch("os.makedirs", MagicMock()) as mock_makedirs, patch( + "builtins.open", mock_open() + ) as open_mock: + sample_xdist_generator = SampleXdistGenerator("path", True) + sample_xdist_generator.store_events(tokenized_events) + mock_makedirs.assert_has_calls([call("/path/to/cwd/.tokenized_events")]) + open_mock.assert_has_calls( + [ + call("/path/to/cwd/.tokenized_events/sample_with_uuid.json", "w"), + ], + any_order=True, + ) + open_mock().write.assert_has_calls( + [ + call( + '{\n\t"sample_with_uuid": {\n\t\t"metadata": {\n\t\t\t"host": "host_1",\n\t\t\t"source": "source_1",\n\t\t\t"sourcetype": "sourcetype_1",\n\t\t\t"timestamp_type": "timestamp_type_1",\n\t\t\t"input_type": "modinput",\n\t\t\t"expected_event_count": 1,\n\t\t\t"index": "main"\n\t\t},\n\t\t"events": [\n\t\t\t{\n\t\t\t\t"event": "event_field",\n\t\t\t\t"key_fields": "key_fields_field",\n\t\t\t\t"time_values": "time_values_field",\n\t\t\t\t"requirement_test_data": "requirement_test_data",\n\t\t\t\t"unique_identifier": "uuid"\n\t\t\t},\n\t\t\t{\n\t\t\t\t"event": "event_field",\n\t\t\t\t"key_fields": "key_fields_field_3",\n\t\t\t\t"time_values": "time_values_field_3",\n\t\t\t\t"requirement_test_data": "requirement_test_data",\n\t\t\t\t"unique_identifier": "uuid"\n\t\t\t}\n\t\t]\n\t}\n}' + ), + ] + ) diff --git a/tests/unit/tests_standard_lib/tests_sample_generation/test_uuid_e2e_flow.py b/tests/unit/tests_standard_lib/tests_sample_generation/test_uuid_e2e_flow.py new file mode 100644 index 000000000..24fe93857 --- /dev/null +++ b/tests/unit/tests_standard_lib/tests_sample_generation/test_uuid_e2e_flow.py @@ -0,0 +1,759 @@ +# -*- coding: utf-8 -*- +""" +End-to-End tests for UUID flow through the entire pytest-splunk-addon pipeline. +Tests Priority #2: End-to-End UUID Flow Tests +""" +import pytest +import json +import tempfile +import os +import uuid +from unittest.mock import patch, MagicMock, mock_open +from collections import namedtuple + +from pytest_splunk_addon.sample_generation import SampleGenerator, SampleXdistGenerator +from pytest_splunk_addon.sample_generation.sample_event import SampleEvent +from pytest_splunk_addon.sample_generation.sample_stanza import SampleStanza +from pytest_splunk_addon.event_ingestors.hec_event_ingestor import HECEventIngestor +from pytest_splunk_addon.fields_tests.test_generator import FieldTestGenerator + + +def _simulate_tokenization_uuid_assignment(event): + """Helper to simulate UUID assignment that happens during tokenization""" + if event.metadata.get("splunk_ep"): + event.unique_identifier = str(uuid.uuid4()) + + +class TestUUIDFlowThroughPipeline: + """Test UUID propagation through the entire sample generation pipeline""" + + def test_uuid_propagates_from_stanza_to_event(self): + """Verify UUID flag propagates from SampleStanza to SampleEvent""" + with tempfile.TemporaryDirectory() as tmpdir: + sample_path = os.path.join(tmpdir, "test.sample") + with open(sample_path, "w") as f: + f.write("test event line 1\ntest event line 2") + + psa_data_params = { + "sourcetype": "test:sourcetype", + "input_type": "modinput", + "tokens": {}, # Add tokens to avoid KeyError + } + + # Test with UUID enabled (mimics how SampleGenerator sets splunk_ep) + stanza = SampleStanza(sample_path, psa_data_params) + stanza.metadata["splunk_ep"] = True + assert stanza.metadata["splunk_ep"] == True + + # Test with UUID disabled + stanza_no_uuid = SampleStanza(sample_path, psa_data_params) + stanza_no_uuid.metadata["splunk_ep"] = False + assert stanza_no_uuid.metadata["splunk_ep"] == False + + def test_uuid_in_sample_generator_initialization(self): + """Verify SampleGenerator properly initializes with UUID flag""" + addon_path = "/fake/addon/path" + config_path = "/fake/config/path" + + # Test with UUID enabled + generator = SampleGenerator(addon_path, splunk_ep=True, config_path=config_path) + assert generator.splunk_ep == True + + # Test with UUID disabled + generator_no_uuid = SampleGenerator( + addon_path, splunk_ep=False, config_path=config_path + ) + assert generator_no_uuid.splunk_ep == False + + def test_uuid_in_sample_xdist_generator_initialization(self): + """Verify SampleXdistGenerator properly initializes with UUID flag""" + addon_path = "/fake/addon/path" + config_path = "/fake/config/path" + + # Test with UUID enabled + xdist_generator = SampleXdistGenerator( + addon_path, splunk_ep=True, config_path=config_path + ) + assert xdist_generator.splunk_ep == True + + # Test with UUID disabled + xdist_generator_no_uuid = SampleXdistGenerator( + addon_path, splunk_ep=False, config_path=config_path + ) + assert xdist_generator_no_uuid.splunk_ep == False + + +class TestUUIDInHECPayload: + """Test UUID inclusion in HEC event payloads""" + + def test_uuid_added_to_hec_payload_when_enabled(self): + """Verify UUID is added to HEC payload when flag is enabled""" + # Create a sample event with UUID + metadata = { + "splunk_ep": True, + "sourcetype": "test:sourcetype", + "source": "/var/log/test.log", + "index": "main", + "host": "test-host", + "host_type": "plugin", + "timestamp_type": "plugin", + } + + event = SampleEvent( + event_string="test event for HEC", + metadata=metadata, + sample_name="test.sample", + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + # Verify event has UUID + assert hasattr(event, "unique_identifier") + uuid_value = event.unique_identifier + + # Create HEC ingestor and mock the actual HTTP POST + required_configs = { + "splunk_hec_uri": "https://splunk:8088", + "session_headers": {"Authorization": "Splunk test-token"}, + } + ingestor = HECEventIngestor(required_configs) + + # Mock the requests.post to capture what gets sent + with patch( + "pytest_splunk_addon.event_ingestors.hec_event_ingestor.requests.post" + ) as mock_post: + mock_post.return_value = MagicMock(status_code=200) + + # Call the actual ingest method + ingestor.ingest([event], thread_count=1) + + # Verify post was called + assert mock_post.called, "requests.post should have been called" + + # Get the actual data that was sent + call_args = mock_post.call_args + sent_data = call_args[1].get("data") if call_args else None + + assert sent_data is not None, "Data should have been sent to HEC" + + # Parse the JSON that was sent + hec_events = [ + json.loads(line) for line in sent_data.strip().split("\n") if line + ] + + # Verify UUID is in the actual HEC payload + assert len(hec_events) > 0, "Should have at least one HEC event" + hec_event = hec_events[0] + + # This is what we're really testing - did the ingestor add the UUID field? + assert "fields" in hec_event, "HEC event should have fields" + assert ( + "unique_identifier" in hec_event["fields"] + ), "HEC event fields should contain unique_identifier" + assert ( + hec_event["fields"]["unique_identifier"] == uuid_value + ), "UUID value should match the event's unique_identifier" + + def test_uuid_not_added_to_hec_payload_when_disabled(self): + """Verify UUID is NOT added to HEC payload when flag is disabled""" + metadata = { + "splunk_ep": False, + "sourcetype": "test:sourcetype", + "source": "/var/log/test.log", + "index": "main", + "host": "test-host", + "host_type": "plugin", + "timestamp_type": "plugin", + } + + event = SampleEvent( + event_string="test event without UUID", + metadata=metadata, + sample_name="test.sample", + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + # Verify event doesn't have UUID + assert not hasattr(event, "unique_identifier") + + # Create HEC ingestor and test actual implementation + required_configs = { + "splunk_hec_uri": "https://splunk:8088", + "session_headers": {"Authorization": "Splunk test-token"}, + } + ingestor = HECEventIngestor(required_configs) + + # Mock requests.post to capture what gets sent + with patch( + "pytest_splunk_addon.event_ingestors.hec_event_ingestor.requests.post" + ) as mock_post: + mock_post.return_value = MagicMock(status_code=200) + + # Call the actual ingest method + ingestor.ingest([event], thread_count=1) + + # Get the actual data that was sent + call_args = mock_post.call_args + sent_data = call_args[1].get("data") if call_args else None + + if sent_data: + hec_events = [ + json.loads(line) for line in sent_data.strip().split("\n") if line + ] + hec_event = hec_events[0] + + # Verify fields key is NOT in the actual HEC payload + assert ( + "fields" not in hec_event + ), "HEC event should not have fields when UUID is disabled" + + def test_hec_payload_structure_with_uuid(self): + """Verify complete HEC payload structure includes UUID and other fields correctly""" + metadata = { + "splunk_ep": True, + "sourcetype": "test:sourcetype", + "source": "/var/log/test.log", + "index": "main", + "host": "test-host", + "host_type": "plugin", + "timestamp_type": "event", + } + + event = SampleEvent( + event_string="test event with complete metadata", + metadata=metadata, + sample_name="test.sample", + ) + event.time_values = [1234567890.123] + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + # Verify event has UUID + assert hasattr(event, "unique_identifier") + uuid_value = event.unique_identifier + + # Create HEC ingestor and test actual implementation + required_configs = { + "splunk_hec_uri": "https://splunk:8088", + "session_headers": {"Authorization": "Splunk test-token"}, + } + ingestor = HECEventIngestor(required_configs) + + # Mock requests.post to capture what gets sent + with patch( + "pytest_splunk_addon.event_ingestors.hec_event_ingestor.requests.post" + ) as mock_post: + mock_post.return_value = MagicMock(status_code=200) + + ingestor.ingest([event], thread_count=1) + + call_args = mock_post.call_args + sent_data = call_args[1].get("data") if call_args else None + + assert sent_data is not None + hec_events = [ + json.loads(line) for line in sent_data.strip().split("\n") if line + ] + hec_event = hec_events[0] + + # Verify the actual HEC payload structure + assert "sourcetype" in hec_event + assert "source" in hec_event + assert "event" in hec_event + assert "index" in hec_event + assert "host" in hec_event + assert ( + "time" in hec_event + ), "time should be present when timestamp_type is 'event'" + assert ( + "fields" in hec_event + ), "fields should be present when UUID is enabled" + assert hec_event["fields"]["unique_identifier"] == uuid_value + + +class TestUUIDInTestGeneration: + """Test UUID usage in test parameter generation""" + + def test_uuid_in_requirement_test_params(self): + """Verify UUID is included in requirement test parameters""" + # Create a tokenized event with UUID + metadata = { + "splunk_ep": True, + "input_type": "modinput", + "sourcetype_to_search": "test:sourcetype", + "host": "test-host", + } + + event = SampleEvent( + event_string="test event", + metadata=metadata, + sample_name="test.sample", + requirement_test_data={ + "cim_fields": {"severity": "low", "signature_id": "12345"} + }, + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + # Verify UUID was generated + assert hasattr(event, "unique_identifier") + assert event.unique_identifier is not None + + # Create field test generator + test_generator = FieldTestGenerator( + app_path="fake/path", + tokenized_events=[event], + field_bank=None, + splunk_ep=True, + ) + + # Generate requirement tests + with patch( + "pytest_splunk_addon.fields_tests.test_generator.xml_event_parser.escape_char_event" + ) as mock_escape: + mock_escape.return_value = "escaped_event" + + with patch("pytest.param") as mock_param: + mock_param.side_effect = lambda x, id: (x, id) + + params = list(test_generator.generate_requirements_tests()) + + # Verify params were generated + assert len(params) > 0 + param_data, param_id = params[0] + + # Verify UUID is in the parameters + assert ( + "unique_identifier" in param_data + ), "unique_identifier should be in test parameters when UUID is enabled" + assert param_data["unique_identifier"] is not None + + def test_uuid_in_datamodel_test_params(self): + """Verify UUID is included in datamodel test parameters""" + metadata = { + "splunk_ep": True, + "input_type": "modinput", + "sourcetype_to_search": "test:sourcetype", + "host": "test-host", + } + + event = SampleEvent( + event_string="test event for datamodel", + metadata=metadata, + sample_name="test.sample", + requirement_test_data={"datamodels": {"model": "Authentication"}}, + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + # Verify UUID was generated + assert hasattr(event, "unique_identifier") + + # Create field test generator + test_generator = FieldTestGenerator( + app_path="fake/path", + tokenized_events=[event], + field_bank=None, + splunk_ep=True, + ) + + # Generate datamodel tests + with patch( + "pytest_splunk_addon.fields_tests.test_generator.xml_event_parser.escape_char_event" + ) as mock_escape: + mock_escape.return_value = "escaped_event" + + with patch("pytest.param") as mock_param: + mock_param.side_effect = lambda x, id: (x, id) + + params = list(test_generator.generate_requirements_datamodels_tests()) + + # Verify params were generated + assert len(params) > 0 + param_data, param_id = params[0] + + # Verify UUID is in the parameters + assert "unique_identifier" in param_data + assert param_data["unique_identifier"] is not None + + def test_uuid_not_in_params_when_disabled(self): + """Verify UUID is NOT in test parameters when flag is disabled""" + metadata = { + "splunk_ep": False, + "input_type": "modinput", + "sourcetype_to_search": "test:sourcetype", + "host": "test-host", + } + + event = SampleEvent( + event_string="test event without UUID", + metadata=metadata, + sample_name="test.sample", + requirement_test_data={"cim_fields": {"severity": "low"}}, + ) + + # Verify no UUID was generated + assert not hasattr(event, "unique_identifier") + + # Create field test generator + test_generator = FieldTestGenerator( + app_path="fake/path", + tokenized_events=[event], + field_bank=None, + splunk_ep=False, + ) + + # Generate requirement tests + with patch( + "pytest_splunk_addon.fields_tests.test_generator.xml_event_parser.escape_char_event" + ) as mock_escape: + mock_escape.return_value = "escaped_event" + + with patch("pytest.param") as mock_param: + mock_param.side_effect = lambda x, id: (x, id) + + params = list(test_generator.generate_requirements_tests()) + + if len(params) > 0: + param_data, param_id = params[0] + + # Verify UUID is NOT in the parameters + assert ( + "unique_identifier" not in param_data + ), "unique_identifier should not be in test parameters when UUID is disabled" + + +class TestUUIDInStoredEvents: + """Test UUID persistence in stored tokenized events""" + + def test_uuid_stored_in_tokenized_events_json(self): + """Verify UUID is included in stored tokenized events JSON""" + tokenized_event = namedtuple( + "tokenized_event", + [ + "sample_name", + "metadata", + "event", + "unique_identifier", + "key_fields", + "time_values", + "requirement_test_data", + ], + ) + + events = [ + tokenized_event( + "test_sample", + { + "host": "test-host", + "source": "/var/log/test.log", + "sourcetype": "test:sourcetype", + "timestamp_type": "event", + "input_type": "modinput", + "expected_event_count": 1, + }, + "test event content", + "uuid-12345-67890", + {"field1": "value1"}, + [1234567890], + {"cim_fields": {"severity": "low"}}, + ) + ] + + with patch("os.path.exists", return_value=False), patch( + "os.getcwd", return_value="/fake/path" + ), patch("os.makedirs"), patch("builtins.open", mock_open()) as open_mock: + + xdist_generator = SampleXdistGenerator("/fake/addon", True, "/fake/config") + xdist_generator.store_events(events) + + # Get the written JSON + write_calls = [call.args[0] for call in open_mock().write.call_args_list] + assert len(write_calls) > 0 + + # Parse the JSON + json_content = write_calls[0] + stored_data = json.loads(json_content) + + # Verify UUID is in the stored data + assert "test_sample" in stored_data + sample_data = stored_data["test_sample"] + assert "metadata" in sample_data + assert "events" in sample_data + assert len(sample_data["events"]) > 0 + # Verify unique_identifier is stored in events when splunk_ep is True + assert "unique_identifier" in sample_data["events"][0] + assert sample_data["events"][0]["unique_identifier"] == "uuid-12345-67890" + + def test_uuid_not_stored_when_disabled(self): + """Verify UUID is NOT stored when flag is disabled""" + tokenized_event = namedtuple( + "tokenized_event", + [ + "sample_name", + "metadata", + "event", + "unique_identifier", + "key_fields", + "time_values", + "requirement_test_data", + ], + ) + + events = [ + tokenized_event( + "test_sample_no_uuid", + { + "host": "test-host", + "source": "/var/log/test.log", + "sourcetype": "test:sourcetype", + "timestamp_type": "event", + "input_type": "file_monitor", + "expected_event_count": 1, + }, + "test event without UUID", + None, # unique_identifier should be None when disabled + {"field1": "value1"}, + [], + None, + ) + ] + + with patch("os.path.exists", return_value=False), patch( + "os.getcwd", return_value="/fake/path" + ), patch("os.makedirs"), patch("builtins.open", mock_open()) as open_mock: + + xdist_generator = SampleXdistGenerator("/fake/addon", False, "/fake/config") + xdist_generator.store_events(events) + + # Get the written JSON + write_calls = [call.args[0] for call in open_mock().write.call_args_list] + + if write_calls: + json_content = write_calls[0] + stored_data = json.loads(json_content) + + sample_data = stored_data["test_sample_no_uuid"] + assert "metadata" in sample_data + + # Verify unique_identifier is NOT in events when splunk_ep is False + for event in sample_data["events"]: + assert "unique_identifier" not in event + + +class TestUUIDSearchQueryGeneration: + """Test UUID usage in search query generation + + Note: These tests verify the logical behavior that UUID changes search approach. + They test decision logic rather than implementation, which is acceptable for + documenting expected behavior. The actual search execution is tested in e2e tests. + """ + + def test_search_params_contain_uuid_when_enabled(self): + """Verify test parameters include UUID when flag is enabled""" + # This tests that the test parameter generation includes UUID + metadata = { + "splunk_ep": True, + "input_type": "modinput", + "sourcetype_to_search": "test:sourcetype", + "host": "test-host", + } + + event = SampleEvent( + event_string="test event", + metadata=metadata, + sample_name="test.sample", + requirement_test_data={"cim_fields": {"severity": "low"}}, + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + test_generator = FieldTestGenerator( + app_path="fake/path", + tokenized_events=[event], + field_bank=None, + splunk_ep=True, + ) + + with patch( + "pytest_splunk_addon.fields_tests.test_generator.xml_event_parser.escape_char_event" + ) as mock_escape: + mock_escape.return_value = "escaped_event" + + with patch("pytest.param") as mock_param: + mock_param.side_effect = lambda x, id: (x, id) + + params = list(test_generator.generate_requirements_tests()) + param_data, _ = params[0] + + # The key assertion: parameters have UUID, not just escaped event + assert "unique_identifier" in param_data + assert param_data["unique_identifier"] is not None + # This means searches will use UUID instead of escaped event + + def test_search_params_lack_uuid_when_disabled(self): + """Verify test parameters do NOT include UUID when flag is disabled""" + metadata = { + "splunk_ep": False, + "input_type": "modinput", + "sourcetype_to_search": "test:sourcetype", + "host": "test-host", + } + + event = SampleEvent( + event_string="test event", + metadata=metadata, + sample_name="test.sample", + requirement_test_data={"cim_fields": {"severity": "low"}}, + ) + + test_generator = FieldTestGenerator( + app_path="fake/path", + tokenized_events=[event], + field_bank=None, + splunk_ep=False, + ) + + with patch( + "pytest_splunk_addon.fields_tests.test_generator.xml_event_parser.escape_char_event" + ) as mock_escape: + mock_escape.return_value = "escaped_event" + + with patch("pytest.param") as mock_param: + mock_param.side_effect = lambda x, id: (x, id) + + params = list(test_generator.generate_requirements_tests()) + + if params: + param_data, _ = params[0] + + # The key assertion: parameters don't have UUID, use escaped event approach + assert "unique_identifier" not in param_data + assert "escaped_event" in param_data + # This means searches will use traditional escaped event method + + +class TestUUIDEndToEndIntegration: + """Integration tests for complete UUID flow""" + + def test_complete_uuid_flow_modinput(self): + """Test complete UUID flow for modinput events""" + # This test simulates the complete flow: + # 1. Event creation with UUID + # 2. HEC payload generation + # 3. Test parameter generation + # 4. Assert parameters drive UUID-based search (without reimplementing query builder) + + metadata = { + "splunk_ep": True, + "input_type": "modinput", + "sourcetype": "test:sourcetype", + "source": "pytest-splunk-addon:modinput", + "index": "main", + "host": "test-host", + "host_type": "plugin", + "timestamp_type": "plugin", + "sourcetype_to_search": "test:sourcetype", + } + + # Step 1: Create event + event = SampleEvent( + event_string="integration test event", + metadata=metadata, + sample_name="integration.sample", + requirement_test_data={"cim_fields": {"severity": "high"}}, + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + # Verify UUID was created + assert hasattr(event, "unique_identifier") + + # Step 2: Verify HEC payload would include UUID + assert event.metadata.get("splunk_ep") == True + + # Step 3: Generate test parameters + test_generator = FieldTestGenerator( + app_path="fake/path", + tokenized_events=[event], + field_bank=None, + splunk_ep=True, + ) + + with patch( + "pytest_splunk_addon.fields_tests.test_generator.xml_event_parser.escape_char_event" + ) as mock_escape: + mock_escape.return_value = "escaped_integration_event" + + with patch("pytest.param") as mock_param: + mock_param.side_effect = lambda x, id: (x, id) + + params = list(test_generator.generate_requirements_tests()) + + assert len(params) > 0 + param_data, _ = params[0] + + # Verify UUID is in parameters and escaped_event is still present but not required for UUID path + assert "unique_identifier" in param_data + # Key assertion: when UUID is present, consumers should use it (do not manually build search here) + assert ( + "escaped_event" in param_data + ) # present for backward compatibility + + def test_complete_flow_without_uuid(self): + """Test complete flow works correctly without UUID (backward compatibility)""" + metadata = { + "splunk_ep": False, + "input_type": "modinput", + "sourcetype": "test:sourcetype", + "source": "pytest-splunk-addon:modinput", + "index": "main", + "host": "test-host", + "host_type": "plugin", + "timestamp_type": "plugin", + "sourcetype_to_search": "test:sourcetype", + } + + # Step 1: Create event without UUID + event = SampleEvent( + event_string="test event without uuid", + metadata=metadata, + sample_name="test.sample", + requirement_test_data={"cim_fields": {"severity": "medium"}}, + ) + + # Verify no UUID was created + assert not hasattr(event, "unique_identifier") + + # Step 2: Generate test parameters + test_generator = FieldTestGenerator( + app_path="fake/path", + tokenized_events=[event], + field_bank=None, + splunk_ep=False, + ) + + with patch( + "pytest_splunk_addon.fields_tests.test_generator.xml_event_parser.escape_char_event" + ) as mock_escape: + mock_escape.return_value = "escaped_test_event" + + with patch("pytest.param") as mock_param: + mock_param.side_effect = lambda x, id: (x, id) + + params = list(test_generator.generate_requirements_tests()) + + if len(params) > 0: + param_data, _ = params[0] + + # Verify UUID is NOT in parameters, and escaped event is present for traditional search path + assert "unique_identifier" not in param_data + assert param_data["escaped_event"] == "escaped_test_event" diff --git a/tests/unit/tests_standard_lib/tests_sample_generation/test_uuid_generation.py b/tests/unit/tests_standard_lib/tests_sample_generation/test_uuid_generation.py new file mode 100644 index 000000000..26c2b2ab9 --- /dev/null +++ b/tests/unit/tests_standard_lib/tests_sample_generation/test_uuid_generation.py @@ -0,0 +1,320 @@ +# -*- coding: utf-8 -*- +""" +Unit tests for UUID generation and uniqueness in sample events. +Tests Priority #1: UUID Generation & Uniqueness Tests +""" +import pytest +import uuid +from unittest.mock import patch, MagicMock +from pytest_splunk_addon.sample_generation.sample_event import SampleEvent +from pytest_splunk_addon.sample_generation.sample_stanza import SampleStanza + + +def _simulate_tokenization_uuid_assignment(event): + """Helper to simulate UUID assignment that happens during tokenization""" + if event.metadata.get("splunk_ep"): + event.unique_identifier = str(uuid.uuid4()) + + +class TestUUIDGeneration: + """Test suite for UUID generation functionality""" + + def test_uuid_generated_when_flag_enabled(self): + """Verify UUID is generated when splunk_ep is True""" + metadata = {"splunk_ep": True, "index": "main"} + event = SampleEvent( + event_string="test event", metadata=metadata, sample_name="test.sample" + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + assert hasattr( + event, "unique_identifier" + ), "unique_identifier attribute should exist" + assert ( + event.unique_identifier is not None + ), "unique_identifier should not be None" + assert isinstance( + event.unique_identifier, str + ), "unique_identifier should be a string" + + def test_uuid_not_generated_when_flag_disabled(self): + """Verify UUID is NOT generated when splunk_ep is False""" + metadata = {"splunk_ep": False, "index": "main"} + event = SampleEvent( + event_string="test event", metadata=metadata, sample_name="test.sample" + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + assert not hasattr( + event, "unique_identifier" + ), "unique_identifier attribute should not exist when flag is disabled" + + def test_uuid_not_generated_when_flag_missing(self): + """Verify UUID is NOT generated when splunk_ep is not specified""" + metadata = {"index": "main"} + event = SampleEvent( + event_string="test event", metadata=metadata, sample_name="test.sample" + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + assert not hasattr( + event, "unique_identifier" + ), "unique_identifier attribute should not exist when flag is missing" + + def test_uuid_format_is_valid(self): + """Verify generated UUID follows UUID4 format""" + metadata = {"splunk_ep": True, "index": "main"} + event = SampleEvent( + event_string="test event", metadata=metadata, sample_name="test.sample" + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + # Validate UUID format by attempting to parse it + try: + uuid_obj = uuid.UUID(event.unique_identifier, version=4) + assert ( + str(uuid_obj) == event.unique_identifier + ), "UUID should be properly formatted" + assert uuid_obj.version == 4, "UUID should be version 4" + except ValueError: + pytest.fail( + f"Generated UUID '{event.unique_identifier}' is not a valid UUID4" + ) + + +class TestUUIDUniqueness: + """Test suite for UUID uniqueness across multiple events""" + + def test_multiple_events_get_unique_uuids(self): + """Verify each event gets a different UUID""" + metadata = {"splunk_ep": True, "index": "main"} + num_events = 100 + + events = [ + SampleEvent( + event_string=f"test event {i}", + metadata=metadata.copy(), + sample_name=f"test.sample{i}", + ) + for i in range(num_events) + ] + + # Simulate tokenization UUID assignment for all events + for event in events: + _simulate_tokenization_uuid_assignment(event) + + uuids = [event.unique_identifier for event in events] + + # Check all UUIDs are unique + assert ( + len(set(uuids)) == num_events + ), f"All {num_events} events should have unique UUIDs, but got {len(set(uuids))} unique values" + + def test_same_event_string_gets_different_uuids(self): + """Verify identical events get different UUIDs""" + metadata = {"splunk_ep": True, "index": "main"} + event_string = "identical test event" + + event1 = SampleEvent( + event_string=event_string, + metadata=metadata.copy(), + sample_name="test.sample", + ) + + event2 = SampleEvent( + event_string=event_string, + metadata=metadata.copy(), + sample_name="test.sample", + ) + + # Simulate tokenization UUID assignment for both events + _simulate_tokenization_uuid_assignment(event1) + _simulate_tokenization_uuid_assignment(event2) + + assert ( + event1.unique_identifier != event2.unique_identifier + ), "Identical events should receive different UUIDs" + + def test_uuid_persistence_through_event_lifecycle(self): + """Verify UUID remains constant throughout event lifecycle""" + metadata = {"splunk_ep": True, "index": "main"} + event = SampleEvent( + event_string="test event", metadata=metadata, sample_name="test.sample" + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + original_uuid = event.unique_identifier + + # Modify event properties + event.update("modified event string") + event.key_fields = {"field1": "value1"} + event.time_values = [1234567890] + + # UUID should remain the same + assert ( + event.unique_identifier == original_uuid + ), "UUID should remain constant after event modifications" + + +# TestUUIDCaseSensitivity class removed - no longer relevant with boolean flags + + +class TestUUIDWithRequirementData: + """Test suite for UUID generation with requirement test data""" + + def test_uuid_with_requirement_test_data(self): + """Verify UUID is generated when requirement_test_data is present""" + metadata = {"splunk_ep": True, "index": "main"} + requirement_data = {"cim_fields": {"severity": "low", "signature_id": "12345"}} + + event = SampleEvent( + event_string="test event", + metadata=metadata, + sample_name="test.sample", + requirement_test_data=requirement_data, + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + assert hasattr( + event, "unique_identifier" + ), "UUID should be generated even with requirement_test_data" + assert ( + event.requirement_test_data == requirement_data + ), "requirement_test_data should be preserved" + + def test_uuid_without_requirement_test_data(self): + """Verify UUID is generated when requirement_test_data is None""" + metadata = {"splunk_ep": True, "index": "main"} + + event = SampleEvent( + event_string="test event", + metadata=metadata, + sample_name="test.sample", + requirement_test_data=None, + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + assert hasattr( + event, "unique_identifier" + ), "UUID should be generated even without requirement_test_data" + assert event.requirement_test_data is None + + +class TestUUIDMocking: + """Test suite for UUID mocking and deterministic testing""" + + def test_uuid_can_be_mocked(self): + """Verify UUID generation can be mocked for deterministic tests""" + expected_uuid = "12345678-1234-5678-1234-567812345678" + + with patch("uuid.uuid4") as mock_uuid: + mock_uuid.return_value = expected_uuid + + metadata = {"splunk_ep": True, "index": "main"} + event = SampleEvent( + event_string="test event", metadata=metadata, sample_name="test.sample" + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + mock_uuid.assert_called_once() + assert event.unique_identifier == expected_uuid + + def test_uuid_generation_called_once_per_event(self): + """Verify UUID is generated exactly once per event""" + with patch("uuid.uuid4") as mock_uuid: + mock_uuid.return_value = "test-uuid" + + metadata = {"splunk_ep": True, "index": "main"} + event = SampleEvent( + event_string="test event", metadata=metadata, sample_name="test.sample" + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + # Access the UUID multiple times + _ = event.unique_identifier + _ = event.unique_identifier + + # uuid4 should only be called once during tokenization + mock_uuid.assert_called_once() + + +class TestUUIDWithDifferentMetadata: + """Test suite for UUID generation with various metadata configurations""" + + def test_uuid_with_minimal_metadata(self): + """Verify UUID is generated with minimal metadata""" + metadata = {"splunk_ep": True} + event = SampleEvent( + event_string="test event", metadata=metadata, sample_name="test.sample" + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + assert hasattr(event, "unique_identifier") + + def test_uuid_with_extensive_metadata(self): + """Verify UUID is generated with extensive metadata""" + metadata = { + "splunk_ep": True, + "index": "main", + "sourcetype": "test:sourcetype", + "source": "/var/log/test.log", + "host": "test-host", + "host_type": "plugin", + "timestamp_type": "event", + "input_type": "modinput", + "sample_count": "10", + } + + event = SampleEvent( + event_string="test event with extensive metadata", + metadata=metadata, + sample_name="test.sample", + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + assert hasattr(event, "unique_identifier") + assert event.metadata == metadata + + @pytest.mark.parametrize( + "input_type", ["modinput", "file_monitor", "syslog_tcp", "default", "sc4s"] + ) + def test_uuid_with_different_input_types(self, input_type): + """Verify UUID is generated regardless of input type""" + metadata = { + "splunk_ep": True, + "input_type": input_type, + "index": "main", + } + + event = SampleEvent( + event_string="test event", metadata=metadata, sample_name="test.sample" + ) + + # Simulate tokenization UUID assignment + _simulate_tokenization_uuid_assignment(event) + + assert hasattr( + event, "unique_identifier" + ), f"UUID should be generated for input_type={input_type}" diff --git a/tests/unit/tests_standard_lib/tests_sample_generation/test_uuid_tokenization_timing.py b/tests/unit/tests_standard_lib/tests_sample_generation/test_uuid_tokenization_timing.py new file mode 100644 index 000000000..b30884b90 --- /dev/null +++ b/tests/unit/tests_standard_lib/tests_sample_generation/test_uuid_tokenization_timing.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +""" +Test for UUID timing fix - ensures UUIDs are assigned per finalized event after tokenization. +""" +import pytest +import tempfile +import os +from pytest_splunk_addon.sample_generation.sample_stanza import SampleStanza + + +class TestUUIDTokenizationTiming: + """Test UUID assignment timing during tokenization""" + + def test_uuid_assigned_post_tokenization(self): + """Test that UUIDs are assigned after tokenization, ensuring uniqueness per event""" + + with tempfile.TemporaryDirectory() as tmpdir: + sample_path = os.path.join(tmpdir, "test.sample") + with open(sample_path, "w") as f: + # Multiple events to ensure we get multiple UUIDs + f.write("Event 1\nEvent 2\nEvent 3\n") + + psa_data_params = { + "sourcetype": "test:sourcetype", + "input_type": "modinput", # This will create one event per line + "tokens": {}, # No tokens needed for this test + } + + stanza = SampleStanza(sample_path, psa_data_params) + stanza.metadata["splunk_ep"] = True # Enable UUID generation + + # Tokenize to generate events with UUIDs + stanza.tokenize("psa-data-gen") + + events = stanza.tokenized_events + + # Verify events were created + assert ( + len(events) >= 3 + ), f"Should create events for each line, got {len(events)}" + + # Collect UUIDs + uuids = [] + for event in events: + assert hasattr( + event, "unique_identifier" + ), f"Event should have unique_identifier: {event.event}" + assert event.unique_identifier is not None, "UUID should not be None" + assert ( + len(event.unique_identifier) == 36 + ), "UUID should be 36 characters (standard UUID format)" + uuids.append(event.unique_identifier) + + # Verify all UUIDs are unique (this was the core bug we fixed) + unique_uuids = set(uuids) + assert len(unique_uuids) == len( + uuids + ), f"All UUIDs should be unique! Got {len(unique_uuids)} unique out of {len(uuids)} total" + + print( + f"✓ Generated {len(events)} events with {len(unique_uuids)} unique UUIDs" + ) + print(f"✓ UUID timing fix successful - no duplication across events") + + def test_uuid_not_assigned_when_disabled(self): + """Test that UUIDs are not assigned when flag is disabled""" + + with tempfile.TemporaryDirectory() as tmpdir: + sample_path = os.path.join(tmpdir, "test.sample") + with open(sample_path, "w") as f: + f.write("Test event without UUID\n") + + psa_data_params = { + "sourcetype": "test:sourcetype", + "input_type": "modinput", + "tokens": {}, + } + + stanza = SampleStanza(sample_path, psa_data_params) + stanza.metadata["splunk_ep"] = False # UUID disabled + + stanza.tokenize("psa-data-gen") + events = stanza.tokenized_events + + for event in events: + assert not hasattr( + event, "unique_identifier" + ), "Event should not have unique_identifier when UUID is disabled"