Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion docs/howto/how_to_bulk_transform.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ For most sources and destinations we use `Apache Libcloud Object storage <https:

The ``LRS`` destination provider is a special case that uses the usual event-routing-backends logic for sending events to Caliper and/or xAPI learning record stores.

The ``LOGGER`` destination provider is another special case that skips all LRS and ``RouterConfiguration`` validation entirely. Events are still run through the full transformation pipeline so the ``xapi_tracking`` / ``caliper_tracking`` Python loggers fire, but no HTTP dispatch occurs. This is useful when an external log forwarder (e.g. `Vector <https://vector.dev/>`__) reads those loggers directly and you do not want to configure a dummy LRS.

For the ``LOCAL`` provider, the path to the file(s) is a concatenation of the ``key``, which is the path to a top level directory, a ``container`` which is a single subdirectory name inside the ``key`` directory, and a ``prefix`` (if provided) will be appended to the container to determine the final path.

::
Expand Down Expand Up @@ -41,7 +43,9 @@ The command can work in a few distinct ways.

Additionally all generated statements are written to a Python logger which can be configured to be ignored, save to a file, write standard out, or a log forwarder like `Vector <https://vector.dev/>`__ for more statement handling options. The two loggers are named ``xapi_tracking`` and ``caliper_tracking``, and are always running.

**File(s) to logger** - For any destination you can use the ``--dry_run`` flag to perform tests on finding and transforming data before attempting to store it. Used in conjunction with loggers mentioned above, you can use Python log forwarding without the additional overhead of storing full files.
**File(s) to logger only** - Use ``--destination_provider LOGGER`` to run the full transformation pipeline and emit all events through the ``xapi_tracking`` / ``caliper_tracking`` Python loggers without sending to any LRS. No ``--lrs-urls`` flag is needed, and no ``RouterConfiguration`` database entry is required. This is the recommended mode when an external log forwarder such as Vector reads the logger output directly.

**File(s) to logger (dry run)** - For any destination you can use the ``--dry_run`` flag to perform tests on finding and transforming data before attempting to store it. Used in conjunction with loggers mentioned above, you can use Python log forwarding without the additional overhead of storing full files.

.. warning::
Events may be filtered differently in this command than in normal operation. Normally events pass through two layers of filters as described in `getting started <docs/getting_started.rst>`_.
Expand Down Expand Up @@ -108,6 +112,21 @@ You can also run these commands using a tutor wrapper:
::
tutor local run lms python manage.py lms .....

**Files to Logger (via xapi_tracking logger, no LRS required)**

::

# Transform all events from S3 and emit them via the xapi_tracking logger.
# Vector (or any other log forwarder) can pick up the logger output directly.
# No --lrs-urls, no RouterConfiguration entry required.
python manage.py lms transform_tracking_logs \
--source_provider S3 \
--source_config '{"key": "...", "secret": "...", "container": "my-bucket", "prefix": "tracking/"}' \
--destination_provider LOGGER \
--transformer_type xapi \
--batch_size 1000 \
--sleep_between_batches_secs 3

**Files to Files**

::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
"""
import datetime
import json
import logging
import os
import sys
from io import BytesIO
from time import sleep

Expand Down Expand Up @@ -76,6 +78,42 @@ def transform_and_queue(self, line):
self.queue(event)
self.queued_lines += 1

def _process_for_logger(self):
"""
Transform queued events and write each xAPI/Caliper statement as a JSON line to stdout.

Bypasses the processor chain (which is gated by XAPI_EVENTS_ENABLED / RouterConfiguration)
and calls the registry directly — the only goal is serialised statements on stdout for
Vector (or any line-oriented consumer) to pick up. Status messages go to stderr.
"""
from eventtracking.processors.exceptions import NoTransformerImplemented

registry = next(
(p.registry for p in self.backend.processors if getattr(p, 'registry', None)),
None,
)
if not registry:
print(f"No transformer registry found for backend {self.transformer_type}", file=sys.stderr)
return

print(f"Transforming {len(self.event_queue)} events for logger...", file=sys.stderr)
for event in self.event_queue:
try:
transformer = registry.get_transformer(event)
transformed = transformer.transform()
except NoTransformerImplemented:
continue
except Exception as exc:
print(f"Error transforming {event.get('name')}: {exc}", file=sys.stderr)
continue

if not isinstance(transformed, list):
transformed = [transformed]
xapi_logger = logging.getLogger('xapi_tracking')
for stmt in transformed:
if stmt and getattr(getattr(stmt, 'object', None), 'id', None):
xapi_logger.info(stmt.to_json())

def queue(self, event):
"""
Add an event to the queue, try to send if we've reached our batch size.
Expand All @@ -85,9 +123,11 @@ def queue(self, event):
if self.dry_run:
print("Dry run, skipping, but still clearing the queue.")
else:
print(f"Max queue size of {self.max_queue_size} reached, sending.")
print(f"Max queue size of {self.max_queue_size} reached, sending.", file=sys.stderr)
if self.destination == "LRS":
self.send()
elif self.destination == "LOGGER":
self._process_for_logger()
else:
self.store()

Expand All @@ -97,13 +137,16 @@ def queue(self, event):

def send(self):
"""
Send to the LRS if we're configured for that, otherwise a no-op.
Send to the LRS if we're configured for that.

Events are converted to the output xAPI / Caliper format in the router.
A no-op for LOGGER destination (logging fires through processors instead).
"""
if self.destination == "LRS":
print(f"Sending {len(self.event_queue)} events to LRS...")
self.backend.bulk_send(self.event_queue, self.lrs_urls)
elif self.destination == "LOGGER":
pass
else:
print("Skipping send, we're storing with libcloud instead of an LRS.")

Expand Down Expand Up @@ -153,6 +196,9 @@ def finalize(self):
if self.destination is None or self.destination == "LRS":
print("Sending to LRS!")
self.send()
elif self.destination == "LOGGER":
print("Processing for logger!", file=sys.stderr)
self._process_for_logger()
else:
print("Storing via Libcloud!")
self.store()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Tests for the transform_tracking_logs management command.
"""
import gzip
import json
import os
from unittest.mock import MagicMock, patch
Expand Down Expand Up @@ -181,6 +182,25 @@ def command_options():
},
"whitelist": ["problem_check"],
},
# LOGGER destination - no LRS or libcloud destination needed
{
"transformer_type": "xapi",
"source_provider": "MINIO",
"source_config": REMOTE_CONFIG,
"destination_provider": "LOGGER",
"sleep_between_batches_secs": 0,
"expected_results": {
"expected_batches_sent": 1,
"log_lines": [
"Looking for log files in test_bucket/xapi_statements/*",
"Finalizing 2 events to LOGGER",
"Processing for logger!",
"Transforming 2 events for logger...",
"Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines, sent 1 batches.",
],
},
"registry_mapping": {"problem_check": 1},
},
]

for option in options:
Expand All @@ -206,6 +226,15 @@ def _get_raw_log_stream(_, start_bytes, chunk_size):
yield current.read()


def _get_gzip_log_stream(_, start_bytes, chunk_size):
"""
Return gzip-compressed event json from current fixtures.
"""
tracking_log_path = _get_tracking_log_file_path()
with open(tracking_log_path, "rb") as current:
yield gzip.compress(current.read())


@pytest.mark.parametrize("command_opts", command_options())
@patch("event_routing_backends.management.commands.transform_tracking_logs.RouterConfiguration")
def test_transform_command(MockRouterConfiguration, command_opts, mock_common_calls, caplog, capsys):
Expand Down Expand Up @@ -423,8 +452,8 @@ def test_required_dest_libcloud_keys(capsys):

captured = capsys.readouterr()
print(captured.out)
assert "If not using the 'LRS' destination, the following keys must be defined in destination_config: " \
"'prefix', 'container'" in captured.out
assert "If not using the 'LRS' or 'LOGGER' destination, the following keys must be defined in " \
"destination_config: 'prefix', 'container'" in captured.out


def test_get_source_config():
Expand Down Expand Up @@ -475,6 +504,16 @@ def test_get_dest_config_lrs():
assert prefix is None


def test_get_dest_config_logger():
"""
Check that a LOGGER destination config returns appropriate None values (no libcloud config needed).
"""
config, container, prefix = get_dest_config_from_options("LOGGER", None)
assert config is None
assert container is None
assert prefix is None


def test_get_chunks():
"""
Tests the retry functionality of the get_chunks function.
Expand All @@ -500,3 +539,89 @@ def test_get_chunks():

# Make sure we got the correct number of retries
assert fake_source_err.download_object_range_as_stream.call_count == 3


@patch("event_routing_backends.management.commands.transform_tracking_logs.RouterConfiguration")
def test_gzip_file_support(MockRouterConfiguration, mock_common_calls, capsys):
"""
Test that gzip-compressed source files are transparently decompressed and processed.
"""
mock_libcloud_provider, mock_libcloud_get_driver = mock_common_calls

MockRouterConfiguration.objects.filter.return_value.values_list.return_value = []

mm = MagicMock()
mock_log_object = MagicMock()
mock_log_object.__str__.return_value = "tracking.log.gz"
mock_log_object.name = "tracking.log.gz"
mock_log_object.size = 100

mm.return_value.iterate_container_objects.return_value = [mock_log_object]
mm.return_value.download_object_range_as_stream = _get_gzip_log_stream
mock_libcloud_get_driver.return_value = mm

mm2 = MagicMock()
mm2.registry.mapping = {"problem_check": 1}
mm2.return_value = {"foo": "bar"}
tracker.backends["event_transformer"].processors = [mm2]
for backend in tracker.backends["event_transformer"].backends.values():
backend.bulk_send = MagicMock()

call_command(
'transform_tracking_logs',
transformer_type="xapi",
source_provider="MINIO",
source_config=REMOTE_CONFIG,
sleep_between_batches_secs=0,
)

captured = capsys.readouterr()
assert "Streaming file tracking.log.gz..." in captured.out
# Same events should be parsed as from the uncompressed fixture
assert "Queued 2 log lines, could not parse 2 log lines, skipped 8 log lines" in captured.out


def test_queued_sender_logger_send_is_noop(mock_common_calls, capsys):
"""
Test that send() produces no output and dispatches nothing for LOGGER destination.
"""
qs = QueuedSender("LOGGER", None, None, "xapi")
qs.event_queue = [{"name": "test_event"}]
qs.send()

captured = capsys.readouterr()
assert captured.out == ""


def test_queued_sender_logger_process_for_logger(mock_common_calls, capsys):
"""
Test that _process_for_logger calls the first processor for each queued event.
"""
qs = QueuedSender("LOGGER", None, None, "xapi")
mock_processor = MagicMock(return_value={"foo": "bar"})
qs.engine.processors = [mock_processor]
qs.event_queue = [{"name": "event1"}, {"name": "event2"}]

qs._process_for_logger()

assert mock_processor.call_count == 2
captured = capsys.readouterr()
assert "Transforming 2 events for logger..." in captured.out


def test_queued_sender_logger_finalize(mock_common_calls, capsys):
"""
Test that finalize() routes through _process_for_logger for LOGGER destination.
"""
qs = QueuedSender("LOGGER", None, None, "xapi")
mock_processor = MagicMock(return_value={"foo": "bar"})
qs.engine.processors = [mock_processor]
qs.queued_lines = 1
qs.event_queue = [{"name": "test_event"}]

qs.finalize()

captured = capsys.readouterr()
assert "Processing for logger!" in captured.out
assert "Transforming 1 events for logger..." in captured.out
assert "sent 1 batches" in captured.out
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Management command for transforming tracking log files.
"""
import gzip
import json
import os
from io import BytesIO
Expand Down Expand Up @@ -77,17 +78,17 @@ def transform_tracking_logs(

chunks = _get_chunks(source, file)

for chunk in chunks:
chunk = chunk.decode('utf-8')
raw_bytes = b"".join(chunks)
if raw_bytes[:2] == b'\x1f\x8b':
raw_bytes = gzip.decompress(raw_bytes)

# Loop through this chunk, if we find a newline it's time to process
# otherwise just keep appending.
for char in chunk:
if char == "\n" and line:
sender.transform_and_queue(line)
line = ""
else:
line += char
# Loop through file content; when we find a newline it's time to process
for char in raw_bytes.decode('utf-8'):
if char == "\n" and line:
sender.transform_and_queue(line)
line = ""
else:
line += char

# Sometimes the file doesn't end with a newline, we try to use
# any remaining bytes as a final line.
Expand Down Expand Up @@ -116,15 +117,15 @@ def get_dest_config_from_options(destination_provider, dest_config_options):
"""
Prepare our destination configuration.

All None's if these are being sent to an LRS, or use values from the destination_configuration JSON option.
All None's if these are being sent to an LRS or LOGGER, or use values from the destination_configuration JSON option.
"""
if destination_provider != "LRS":
if destination_provider not in ("LRS", "LOGGER"):
dest_config = json.loads(dest_config_options)
try:
dest_container = dest_config.pop("container")
dest_prefix = dest_config.pop("prefix")
except KeyError as e:
print("If not using the 'LRS' destination, the following keys must be defined in "
print("If not using the 'LRS' or 'LOGGER' destination, the following keys must be defined in "
"destination_config: 'prefix', 'container'")
raise e
else:
Expand Down Expand Up @@ -191,9 +192,9 @@ def get_libcloud_drivers(source_provider, source_config, destination_provider, d
print(f"{source_provider} is not a valid source Libcloud provider.")
raise

# There is no driver for LRS
destination_driver = "LRS"
if destination_provider != "LRS":
# There is no libcloud driver for LRS or LOGGER
destination_driver = destination_provider # "LRS" or "LOGGER"
if destination_provider not in ("LRS", "LOGGER"):
try:
destination_provider = getattr(Provider, destination_provider)
destination_cls = get_driver(destination_provider)
Expand Down Expand Up @@ -235,7 +236,9 @@ def add_arguments(self, parser):
'--destination_provider',
type=str,
default="LRS",
help="Either 'LRS' to use the default configured xAPI and/or Caliper servers"
help="Either 'LRS' to use the default configured xAPI and/or Caliper servers, "
"'LOGGER' to only emit events via the xapi_tracking/caliper_tracking Python loggers "
"(no LRS or RouterConfiguration required), "
"or an Apache Libcloud 'provider constant' from this list: "
"https://libcloud.readthedocs.io/en/stable/storage/supported_providers.html . "
"Ex: LOCAL for local storage or S3 for AWS S3.",
Expand Down Expand Up @@ -304,10 +307,11 @@ def handle(self, *args, **options):
lrs_urls = options.get('lrs_urls')

source_file_list = validate_source_and_files(source_driver, source_container, source_prefix)
if dest_driver != "LRS":
if dest_driver not in ("LRS", "LOGGER"):
validate_destination(dest_driver, dest_container, dest_prefix, source_file_list)
else:
validate_lrs_routes(lrs_urls)
if dest_driver == "LRS":
validate_lrs_routes(lrs_urls)
print(f"Found {len(source_file_list)} source files: ", *source_file_list, sep="\n")

sender = QueuedSender(
Expand Down
Loading