From 1b74d63c63831b2e2ec3ab89015dc9faebf28645 Mon Sep 17 00:00:00 2001 From: Felipe Montoya Date: Wed, 22 Apr 2026 16:00:22 -0500 Subject: [PATCH 1/2] feat: first pass at direct gzip to vector --- docs/howto/how_to_bulk_transform.rst | 21 ++- .../commands/helpers/queued_sender.py | 21 ++- .../tests/test_transform_tracking_logs.py | 129 +++++++++++++++++- .../commands/transform_tracking_logs.py | 42 +++--- 4 files changed, 190 insertions(+), 23 deletions(-) diff --git a/docs/howto/how_to_bulk_transform.rst b/docs/howto/how_to_bulk_transform.rst index 1164054d..cdf2c993 100644 --- a/docs/howto/how_to_bulk_transform.rst +++ b/docs/howto/how_to_bulk_transform.rst @@ -12,6 +12,8 @@ For most sources and destinations we use `Apache Libcloud Object storage `__) reads those loggers directly and you do not want to configure a dummy LRS. + For the ``LOCAL`` provider, the path to the file(s) is a concatenation of the ``key``, which is the path to a top level directory, a ``container`` which is a single subdirectory name inside the ``key`` directory, and a ``prefix`` (if provided) will be appended to the container to determine the final path. :: @@ -41,7 +43,9 @@ The command can work in a few distinct ways. Additionally all generated statements are written to a Python logger which can be configured to be ignored, save to a file, write standard out, or a log forwarder like `Vector `__ for more statement handling options. The two loggers are named ``xapi_tracking`` and ``caliper_tracking``, and are always running. -**File(s) to logger** - For any destination you can use the ``--dry_run`` flag to perform tests on finding and transforming data before attempting to store it. Used in conjunction with loggers mentioned above, you can use Python log forwarding without the additional overhead of storing full files. +**File(s) to logger only** - Use ``--destination_provider LOGGER`` to run the full transformation pipeline and emit all events through the ``xapi_tracking`` / ``caliper_tracking`` Python loggers without sending to any LRS. No ``--lrs-urls`` flag is needed, and no ``RouterConfiguration`` database entry is required. This is the recommended mode when an external log forwarder such as Vector reads the logger output directly. + +**File(s) to logger (dry run)** - For any destination you can use the ``--dry_run`` flag to perform tests on finding and transforming data before attempting to store it. Used in conjunction with loggers mentioned above, you can use Python log forwarding without the additional overhead of storing full files. .. warning:: Events may be filtered differently in this command than in normal operation. Normally events pass through two layers of filters as described in `getting started `_. @@ -108,6 +112,21 @@ You can also run these commands using a tutor wrapper: :: tutor local run lms python manage.py lms ..... +**Files to Logger (via xapi_tracking logger, no LRS required)** + +:: + + # Transform all events from S3 and emit them via the xapi_tracking logger. + # Vector (or any other log forwarder) can pick up the logger output directly. + # No --lrs-urls, no RouterConfiguration entry required. + python manage.py lms transform_tracking_logs \ + --source_provider S3 \ + --source_config '{"key": "...", "secret": "...", "container": "my-bucket", "prefix": "tracking/"}' \ + --destination_provider LOGGER \ + --transformer_type xapi \ + --batch_size 1000 \ + --sleep_between_batches_secs 3 + **Files to Files** :: diff --git a/event_routing_backends/management/commands/helpers/queued_sender.py b/event_routing_backends/management/commands/helpers/queued_sender.py index b2b5a045..ae36710e 100644 --- a/event_routing_backends/management/commands/helpers/queued_sender.py +++ b/event_routing_backends/management/commands/helpers/queued_sender.py @@ -76,6 +76,17 @@ def transform_and_queue(self, line): self.queue(event) self.queued_lines += 1 + def _process_for_logger(self): + """ + Run events through the transformation pipeline so the xapi_tracking/caliper_tracking loggers fire. + + No data is dispatched to any external system; the side-effect of calling the processor + (which writes to the Python logger) is the only goal. + """ + print(f"Transforming {len(self.event_queue)} events for logger...") + for event in self.event_queue: + self.engine.processors[0](event) + def queue(self, event): """ Add an event to the queue, try to send if we've reached our batch size. @@ -88,6 +99,8 @@ def queue(self, event): print(f"Max queue size of {self.max_queue_size} reached, sending.") if self.destination == "LRS": self.send() + elif self.destination == "LOGGER": + self._process_for_logger() else: self.store() @@ -97,13 +110,16 @@ def queue(self, event): def send(self): """ - Send to the LRS if we're configured for that, otherwise a no-op. + Send to the LRS if we're configured for that. Events are converted to the output xAPI / Caliper format in the router. + A no-op for LOGGER destination (logging fires through processors instead). """ if self.destination == "LRS": print(f"Sending {len(self.event_queue)} events to LRS...") self.backend.bulk_send(self.event_queue, self.lrs_urls) + elif self.destination == "LOGGER": + pass else: print("Skipping send, we're storing with libcloud instead of an LRS.") @@ -153,6 +169,9 @@ def finalize(self): if self.destination is None or self.destination == "LRS": print("Sending to LRS!") self.send() + elif self.destination == "LOGGER": + print("Processing for logger!") + self._process_for_logger() else: print("Storing via Libcloud!") self.store() diff --git a/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py b/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py index fb5c6d59..9ec7e42f 100644 --- a/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py +++ b/event_routing_backends/management/commands/tests/test_transform_tracking_logs.py @@ -1,6 +1,7 @@ """ Tests for the transform_tracking_logs management command. """ +import gzip import json import os from unittest.mock import MagicMock, patch @@ -181,6 +182,25 @@ def command_options(): }, "whitelist": ["problem_check"], }, + # LOGGER destination - no LRS or libcloud destination needed + { + "transformer_type": "xapi", + "source_provider": "MINIO", + "source_config": REMOTE_CONFIG, + "destination_provider": "LOGGER", + "sleep_between_batches_secs": 0, + "expected_results": { + "expected_batches_sent": 1, + "log_lines": [ + "Looking for log files in test_bucket/xapi_statements/*", + "Finalizing 2 events to LOGGER", + "Processing for logger!", + "Transforming 2 events for logger...", + "Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines, sent 1 batches.", + ], + }, + "registry_mapping": {"problem_check": 1}, + }, ] for option in options: @@ -206,6 +226,15 @@ def _get_raw_log_stream(_, start_bytes, chunk_size): yield current.read() +def _get_gzip_log_stream(_, start_bytes, chunk_size): + """ + Return gzip-compressed event json from current fixtures. + """ + tracking_log_path = _get_tracking_log_file_path() + with open(tracking_log_path, "rb") as current: + yield gzip.compress(current.read()) + + @pytest.mark.parametrize("command_opts", command_options()) @patch("event_routing_backends.management.commands.transform_tracking_logs.RouterConfiguration") def test_transform_command(MockRouterConfiguration, command_opts, mock_common_calls, caplog, capsys): @@ -423,8 +452,8 @@ def test_required_dest_libcloud_keys(capsys): captured = capsys.readouterr() print(captured.out) - assert "If not using the 'LRS' destination, the following keys must be defined in destination_config: " \ - "'prefix', 'container'" in captured.out + assert "If not using the 'LRS' or 'LOGGER' destination, the following keys must be defined in " \ + "destination_config: 'prefix', 'container'" in captured.out def test_get_source_config(): @@ -475,6 +504,16 @@ def test_get_dest_config_lrs(): assert prefix is None +def test_get_dest_config_logger(): + """ + Check that a LOGGER destination config returns appropriate None values (no libcloud config needed). + """ + config, container, prefix = get_dest_config_from_options("LOGGER", None) + assert config is None + assert container is None + assert prefix is None + + def test_get_chunks(): """ Tests the retry functionality of the get_chunks function. @@ -500,3 +539,89 @@ def test_get_chunks(): # Make sure we got the correct number of retries assert fake_source_err.download_object_range_as_stream.call_count == 3 + + +@patch("event_routing_backends.management.commands.transform_tracking_logs.RouterConfiguration") +def test_gzip_file_support(MockRouterConfiguration, mock_common_calls, capsys): + """ + Test that gzip-compressed source files are transparently decompressed and processed. + """ + mock_libcloud_provider, mock_libcloud_get_driver = mock_common_calls + + MockRouterConfiguration.objects.filter.return_value.values_list.return_value = [] + + mm = MagicMock() + mock_log_object = MagicMock() + mock_log_object.__str__.return_value = "tracking.log.gz" + mock_log_object.name = "tracking.log.gz" + mock_log_object.size = 100 + + mm.return_value.iterate_container_objects.return_value = [mock_log_object] + mm.return_value.download_object_range_as_stream = _get_gzip_log_stream + mock_libcloud_get_driver.return_value = mm + + mm2 = MagicMock() + mm2.registry.mapping = {"problem_check": 1} + mm2.return_value = {"foo": "bar"} + tracker.backends["event_transformer"].processors = [mm2] + for backend in tracker.backends["event_transformer"].backends.values(): + backend.bulk_send = MagicMock() + + call_command( + 'transform_tracking_logs', + transformer_type="xapi", + source_provider="MINIO", + source_config=REMOTE_CONFIG, + sleep_between_batches_secs=0, + ) + + captured = capsys.readouterr() + assert "Streaming file tracking.log.gz..." in captured.out + # Same events should be parsed as from the uncompressed fixture + assert "Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines" in captured.out + + +def test_queued_sender_logger_send_is_noop(mock_common_calls, capsys): + """ + Test that send() produces no output and dispatches nothing for LOGGER destination. + """ + qs = QueuedSender("LOGGER", None, None, "xapi") + qs.event_queue = [{"name": "test_event"}] + qs.send() + + captured = capsys.readouterr() + assert captured.out == "" + + +def test_queued_sender_logger_process_for_logger(mock_common_calls, capsys): + """ + Test that _process_for_logger calls the first processor for each queued event. + """ + qs = QueuedSender("LOGGER", None, None, "xapi") + mock_processor = MagicMock(return_value={"foo": "bar"}) + qs.engine.processors = [mock_processor] + qs.event_queue = [{"name": "event1"}, {"name": "event2"}] + + qs._process_for_logger() + + assert mock_processor.call_count == 2 + captured = capsys.readouterr() + assert "Transforming 2 events for logger..." in captured.out + + +def test_queued_sender_logger_finalize(mock_common_calls, capsys): + """ + Test that finalize() routes through _process_for_logger for LOGGER destination. + """ + qs = QueuedSender("LOGGER", None, None, "xapi") + mock_processor = MagicMock(return_value={"foo": "bar"}) + qs.engine.processors = [mock_processor] + qs.queued_lines = 1 + qs.event_queue = [{"name": "test_event"}] + + qs.finalize() + + captured = capsys.readouterr() + assert "Processing for logger!" in captured.out + assert "Transforming 1 events for logger..." in captured.out + assert "sent 1 batches" in captured.out diff --git a/event_routing_backends/management/commands/transform_tracking_logs.py b/event_routing_backends/management/commands/transform_tracking_logs.py index 74c35dce..977a269d 100644 --- a/event_routing_backends/management/commands/transform_tracking_logs.py +++ b/event_routing_backends/management/commands/transform_tracking_logs.py @@ -1,6 +1,7 @@ """ Management command for transforming tracking log files. """ +import gzip import json import os from io import BytesIO @@ -77,17 +78,17 @@ def transform_tracking_logs( chunks = _get_chunks(source, file) - for chunk in chunks: - chunk = chunk.decode('utf-8') + raw_bytes = b"".join(chunks) + if raw_bytes[:2] == b'\x1f\x8b': + raw_bytes = gzip.decompress(raw_bytes) - # Loop through this chunk, if we find a newline it's time to process - # otherwise just keep appending. - for char in chunk: - if char == "\n" and line: - sender.transform_and_queue(line) - line = "" - else: - line += char + # Loop through file content; when we find a newline it's time to process + for char in raw_bytes.decode('utf-8'): + if char == "\n" and line: + sender.transform_and_queue(line) + line = "" + else: + line += char # Sometimes the file doesn't end with a newline, we try to use # any remaining bytes as a final line. @@ -116,15 +117,15 @@ def get_dest_config_from_options(destination_provider, dest_config_options): """ Prepare our destination configuration. - All None's if these are being sent to an LRS, or use values from the destination_configuration JSON option. + All None's if these are being sent to an LRS or LOGGER, or use values from the destination_configuration JSON option. """ - if destination_provider != "LRS": + if destination_provider not in ("LRS", "LOGGER"): dest_config = json.loads(dest_config_options) try: dest_container = dest_config.pop("container") dest_prefix = dest_config.pop("prefix") except KeyError as e: - print("If not using the 'LRS' destination, the following keys must be defined in " + print("If not using the 'LRS' or 'LOGGER' destination, the following keys must be defined in " "destination_config: 'prefix', 'container'") raise e else: @@ -191,9 +192,9 @@ def get_libcloud_drivers(source_provider, source_config, destination_provider, d print(f"{source_provider} is not a valid source Libcloud provider.") raise - # There is no driver for LRS - destination_driver = "LRS" - if destination_provider != "LRS": + # There is no libcloud driver for LRS or LOGGER + destination_driver = destination_provider # "LRS" or "LOGGER" + if destination_provider not in ("LRS", "LOGGER"): try: destination_provider = getattr(Provider, destination_provider) destination_cls = get_driver(destination_provider) @@ -235,7 +236,9 @@ def add_arguments(self, parser): '--destination_provider', type=str, default="LRS", - help="Either 'LRS' to use the default configured xAPI and/or Caliper servers" + help="Either 'LRS' to use the default configured xAPI and/or Caliper servers, " + "'LOGGER' to only emit events via the xapi_tracking/caliper_tracking Python loggers " + "(no LRS or RouterConfiguration required), " "or an Apache Libcloud 'provider constant' from this list: " "https://libcloud.readthedocs.io/en/stable/storage/supported_providers.html . " "Ex: LOCAL for local storage or S3 for AWS S3.", @@ -304,10 +307,11 @@ def handle(self, *args, **options): lrs_urls = options.get('lrs_urls') source_file_list = validate_source_and_files(source_driver, source_container, source_prefix) - if dest_driver != "LRS": + if dest_driver not in ("LRS", "LOGGER"): validate_destination(dest_driver, dest_container, dest_prefix, source_file_list) else: - validate_lrs_routes(lrs_urls) + if dest_driver == "LRS": + validate_lrs_routes(lrs_urls) print(f"Found {len(source_file_list)} source files: ", *source_file_list, sep="\n") sender = QueuedSender( From eb5c9ea9e06dc6026c0070cd6ae577080e8561d5 Mon Sep 17 00:00:00 2001 From: Felipe Montoya Date: Wed, 22 Apr 2026 17:06:01 -0500 Subject: [PATCH 2/2] feat: what actually worked in the debug process Co-Authored-By: Claude Sonnet 4.6 --- .../commands/helpers/queued_sender.py | 41 +++++++++++++++---- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/event_routing_backends/management/commands/helpers/queued_sender.py b/event_routing_backends/management/commands/helpers/queued_sender.py index ae36710e..0242e999 100644 --- a/event_routing_backends/management/commands/helpers/queued_sender.py +++ b/event_routing_backends/management/commands/helpers/queued_sender.py @@ -3,7 +3,9 @@ """ import datetime import json +import logging import os +import sys from io import BytesIO from time import sleep @@ -78,14 +80,39 @@ def transform_and_queue(self, line): def _process_for_logger(self): """ - Run events through the transformation pipeline so the xapi_tracking/caliper_tracking loggers fire. + Transform queued events and write each xAPI/Caliper statement as a JSON line to stdout. - No data is dispatched to any external system; the side-effect of calling the processor - (which writes to the Python logger) is the only goal. + Bypasses the processor chain (which is gated by XAPI_EVENTS_ENABLED / RouterConfiguration) + and calls the registry directly — the only goal is serialised statements on stdout for + Vector (or any line-oriented consumer) to pick up. Status messages go to stderr. """ - print(f"Transforming {len(self.event_queue)} events for logger...") + from eventtracking.processors.exceptions import NoTransformerImplemented + + registry = next( + (p.registry for p in self.backend.processors if getattr(p, 'registry', None)), + None, + ) + if not registry: + print(f"No transformer registry found for backend {self.transformer_type}", file=sys.stderr) + return + + print(f"Transforming {len(self.event_queue)} events for logger...", file=sys.stderr) for event in self.event_queue: - self.engine.processors[0](event) + try: + transformer = registry.get_transformer(event) + transformed = transformer.transform() + except NoTransformerImplemented: + continue + except Exception as exc: + print(f"Error transforming {event.get('name')}: {exc}", file=sys.stderr) + continue + + if not isinstance(transformed, list): + transformed = [transformed] + xapi_logger = logging.getLogger('xapi_tracking') + for stmt in transformed: + if stmt and getattr(getattr(stmt, 'object', None), 'id', None): + xapi_logger.info(stmt.to_json()) def queue(self, event): """ @@ -96,7 +123,7 @@ def queue(self, event): if self.dry_run: print("Dry run, skipping, but still clearing the queue.") else: - print(f"Max queue size of {self.max_queue_size} reached, sending.") + print(f"Max queue size of {self.max_queue_size} reached, sending.", file=sys.stderr) if self.destination == "LRS": self.send() elif self.destination == "LOGGER": @@ -170,7 +197,7 @@ def finalize(self): print("Sending to LRS!") self.send() elif self.destination == "LOGGER": - print("Processing for logger!") + print("Processing for logger!", file=sys.stderr) self._process_for_logger() else: print("Storing via Libcloud!")