Skip to content

Commit 494bc61

Browse files
authored
Merge pull request #226 from openedx/bmtcril/vector_backend
feat: Add Vector backend
2 parents 1c03a39 + 7cd70cf commit 494bc61

8 files changed

Lines changed: 300 additions & 8 deletions

File tree

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# Use the vector backend to emit log statements that Vector will use to load the data
2+
# to ClickHouse with insert statements
3+
4+
# Vector Backend configuration
5+
# ################################
6+
backend: vector
7+
db_host: localhost
8+
db_port: 8123
9+
db_name: xapi
10+
db_event_sink_name: event_sink
11+
db_username: ch_admin
12+
db_password: ...
13+
s3_key: ...
14+
s3_secret: ...
15+
16+
# Run options
17+
log_dir: logs
18+
num_xapi_batches: 3
19+
batch_size: 100
20+
21+
# This number is used for each QueueBackend that use workers, so the number of threads if
22+
# multiplicative. Generally this performs best less than 10, as more threads will cost more
23+
# in context switching than they save.
24+
num_workers: 4
25+
26+
# Overall start and end date for the entire run
27+
start_date: 2014-01-01
28+
end_date: 2023-11-27
29+
30+
# All courses will be this long, and be fit into the start / end dates
31+
# This must be less than end_date - start_date days.
32+
course_length_days: 120
33+
34+
# The size of the test
35+
num_organizations: 3
36+
num_actors: 10
37+
38+
# This replicates users updating their profiles several times, creating
39+
# more rows
40+
num_actor_profile_changes: 5
41+
42+
# How many of each size course to create. The sum of these is the total number
43+
# of courses created for the test.
44+
num_course_sizes:
45+
small: 1
46+
medium: 1
47+
large: 1
48+
huge: 1
49+
50+
# How many times each course will be "published", this creates a more realistic
51+
# distribution of course blocks where each course can be published dozens or
52+
# hundreds of times while it is being developed.
53+
num_course_publishes: 100
54+
55+
# Course size configurations, how many of each type of object are created for
56+
# each course of this size. "actors" must be less than or equal to "num_actors".
57+
# For a course of this size to be created it needs to exist both here and in
58+
# "num_course_sizes".
59+
course_size_makeup:
60+
small:
61+
actors: 5
62+
problems: 20
63+
videos: 10
64+
chapters: 3
65+
sequences: 10
66+
verticals: 20
67+
forum_posts: 20
68+
medium:
69+
actors: 7
70+
problems: 40
71+
videos: 20
72+
chapters: 4
73+
sequences: 20
74+
verticals: 30
75+
forum_posts: 40
76+
large:
77+
actors: 10
78+
problems: 80
79+
videos: 30
80+
chapters: 5
81+
sequences: 40
82+
verticals: 80
83+
forum_posts: 200
84+
huge:
85+
actors: 10
86+
problems: 160
87+
videos: 40
88+
chapters: 10
89+
sequences: 50
90+
verticals: 100
91+
forum_posts: 1000

xapi_db_load/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22
Scripts to generate fake xAPI data against various backends.
33
"""
44

5-
__version__ = "3.0.0"
5+
__version__ = "3.1.0"

xapi_db_load/backends/vector.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""
2+
A backend that simply logs the statements to a xapi_tracking logger.
3+
4+
Vector just reads the log statements, so all we need to do is emit them.
5+
All other tasks use the raw Clickhouse inserts.
6+
"""
7+
8+
import logging
9+
import sys
10+
from logging import Logger, getLogger
11+
from typing import List
12+
13+
from xapi_db_load.backends.base_async_backend import (
14+
BaseBackendTasks,
15+
)
16+
from xapi_db_load.backends.clickhouse import (
17+
InsertBlocks,
18+
InsertCourses,
19+
InsertExternalIDs,
20+
InsertInitialEnrollments,
21+
InsertObjectTags,
22+
InsertProfiles,
23+
InsertTags,
24+
InsertTaxonomies,
25+
InsertXAPIEvents,
26+
)
27+
from xapi_db_load.generate_load_async import EventGenerator
28+
29+
30+
class AsyncVectorTasks(BaseBackendTasks):
31+
def __repr__(self) -> str:
32+
return f"AsyncVectorTasks: {self.config['db_host']}"
33+
34+
def get_test_data_tasks(self):
35+
"""
36+
Return the tasks to be run.
37+
"""
38+
return [
39+
self.event_generator,
40+
InsertInitialEnrollments(self.config, self.logger, self.event_generator),
41+
InsertCourses(self.config, self.logger, self.event_generator),
42+
InsertBlocks(self.config, self.logger, self.event_generator),
43+
InsertObjectTags(self.config, self.logger, self.event_generator),
44+
InsertTaxonomies(self.config, self.logger, self.event_generator),
45+
InsertTags(self.config, self.logger, self.event_generator),
46+
InsertExternalIDs(self.config, self.logger, self.event_generator),
47+
InsertProfiles(self.config, self.logger, self.event_generator),
48+
# This is the only change from the ClickHouse backend
49+
InsertXAPIEventsVector(self.config, self.logger, self.event_generator),
50+
]
51+
52+
53+
class InsertXAPIEventsVector(InsertXAPIEvents):
54+
"""
55+
Wraps the ClickHouse direct backend so that the rest of the metadata can be sent while using
56+
Ralph to do the xAPI the insertion.
57+
"""
58+
59+
def __init__(self, config: dict, logger: Logger, event_generator: EventGenerator):
60+
super().__init__(config, logger, event_generator)
61+
62+
stream_handler = logging.StreamHandler(sys.stdout)
63+
# This formatter is different from what the LMS uses, but is the smallest possible
64+
# format that passes Vector's regex
65+
formatter = logging.Formatter(" [{name}] [] {message}", style="{")
66+
stream_handler.setFormatter(formatter)
67+
self.xapi_logger = getLogger("xapi_tracking")
68+
self.xapi_logger.setLevel(logging.INFO)
69+
self.xapi_logger.addHandler(stream_handler)
70+
71+
def _format_row(self, row: dict):
72+
"""
73+
This overrides the ClickHouse backend's method to format the row for Ralph.
74+
"""
75+
return row["event"]
76+
77+
async def _do_insert(self, out_data: List):
78+
"""
79+
POST a batch of rows to Ralph instead of inserting directly to ClickHouse.
80+
"""
81+
for event_json in out_data:
82+
self.xapi_logger.info(event_json)

xapi_db_load/course_configs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,11 @@ def get_random_emission_time(self, actor=None):
182182
assert self.end_date
183183

184184
# Make sure we're passing in a datetime, not a date
185-
start = datetime.datetime.combine(start, datetime.time())
185+
start = datetime.datetime.combine(start, datetime.time(), tzinfo=datetime.UTC)
186186

187187
# time() is midnight, so make sure we get that last day in there
188188
end = datetime.datetime.combine(
189-
self.end_date, datetime.time()
189+
self.end_date, datetime.time(), tzinfo=datetime.UTC
190190
) + datetime.timedelta(days=1)
191191

192192
return self._random_datetime(start_datetime=start, end_datetime=end)

xapi_db_load/runner.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from xapi_db_load.backends.clickhouse import AsyncClickHouseTasks
88
from xapi_db_load.backends.csv import AsyncCSVTasks
99
from xapi_db_load.backends.ralph import AsyncRalphTasks
10+
from xapi_db_load.backends.vector import AsyncVectorTasks
1011
from xapi_db_load.generate_load_async import EventGenerator
1112

1213

@@ -42,6 +43,10 @@ def set_backend(self, backend):
4243
self.backend = AsyncRalphTasks(
4344
self.config, self.logger, self.event_generator
4445
)
46+
elif backend == "vector":
47+
self.backend = AsyncVectorTasks(
48+
self.config, self.logger, self.event_generator
49+
)
4550
else:
4651
raise ValueError("Invalid backend")
4752

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Test configuration for Ralph / ClickHouse
2+
# #########################################
3+
backend: vector
4+
db_host: localhost
5+
db_port: 8123
6+
db_name: xapi
7+
db_username: ch_admin
8+
db_password: foo
9+
10+
# Run options
11+
log_dir: logs
12+
num_xapi_batches: 3
13+
batch_size: 100
14+
15+
# This number is used for each QueueBackend that use workers, so the number of threads if
16+
# multiplicative. Generally this performs best less than 10, as more threads will cost more
17+
# in context switching than they save.
18+
num_workers: 4
19+
20+
# Overall start and end date for the entire run
21+
start_date: 2014-01-01
22+
end_date: 2023-11-27
23+
24+
# All courses will be this long, and be fit into the start / end dates
25+
# This must be less than end_date - start_date days.
26+
course_length_days: 120
27+
28+
# The size of the test
29+
num_organizations: 3
30+
num_actors: 10
31+
32+
# This replicates users updating their profiles several times, creating
33+
# more rows
34+
num_actor_profile_changes: 5
35+
36+
# How many of each size course to create. The sum of these is the total number
37+
# of courses created for the test.
38+
num_course_sizes:
39+
small: 1
40+
41+
# How many times each course will be "published", this creates a more realistic
42+
# distribution of course blocks where each course can be published dozens or
43+
# hundreds of times while it is being developed.
44+
num_course_publishes: 10
45+
46+
course_size_makeup:
47+
small:
48+
actors: 5
49+
problems: 20
50+
videos: 10
51+
chapters: 3
52+
sequences: 10
53+
verticals: 20
54+
forum_posts: 20

xapi_db_load/tests/test_backends.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
"""
44

55
import gzip
6+
import json
67
import os
8+
import re
79
from contextlib import contextmanager
810
from unittest.mock import AsyncMock, MagicMock, patch
911

@@ -120,6 +122,66 @@ def test_clickhouse_backend(_, tmp_path):
120122
assert "Run duration was" in result.output
121123

122124

125+
@patch(
126+
"xapi_db_load.backends.base_async_backend.clickhouse_connect",
127+
new_callable=AsyncMock,
128+
)
129+
@patch(
130+
"xapi_db_load.backends.vector.getLogger",
131+
new_callable=MagicMock,
132+
)
133+
def test_vector_backend(mock_get_logger, _, tmp_path):
134+
"""
135+
Run a test through the Vector backend, currently this just checks that the
136+
output indicates success.
137+
"""
138+
test_path = "xapi_db_load/tests/fixtures/small_vector_config.yaml"
139+
140+
runner = CliRunner()
141+
142+
with override_config(test_path, tmp_path):
143+
result = runner.invoke(
144+
load_db,
145+
f"--config_file {test_path}",
146+
catch_exceptions=False,
147+
)
148+
149+
# This test should create 300 xAPI log statemetns
150+
assert mock_get_logger.return_value.info.call_count == 300
151+
152+
last_logged_statement = mock_get_logger.return_value.info.call_args.args[0]
153+
154+
# We check to make sure Vector's regex will parse what we're sending. We want it to match both
155+
# the LMS and our local logger formatter.
156+
# This is how things are generally formatted in the LMS
157+
test_str_1 = f"2026-02-24 20:26:13,006 INFO 42 [xapi_tracking] [user None] [ip 172.19.0.1] logger.py:41 - {last_logged_statement}"
158+
159+
# This returns our message formatted with the abbreviated version we use for size and speed purposes
160+
formatter = mock_get_logger.return_value.addHandler.call_args.args[0].formatter
161+
test_str_2 = formatter._fmt.format(
162+
name="xapi_tracking", message=last_logged_statement
163+
)
164+
165+
# This is a direct copy and paste from Aspects' Vector common-post.toml
166+
msg_regex = r"^.* \[xapi_tracking\] [^{}]* (?P<tracking_message>\{.*\})$"
167+
168+
# Quick test to make sure that what's being stored is at least parseable
169+
for s in (test_str_1, test_str_2):
170+
try:
171+
statement = re.match(msg_regex, s).groups()[0]
172+
json.loads(statement)
173+
except Exception as e:
174+
print(e)
175+
print("Exception! Regex testing: ")
176+
print(s)
177+
raise
178+
179+
assert "Insert xAPI Events complete." in result.output
180+
assert "Insert Initial Enrollments complete." in result.output
181+
assert "ALL TASKS DONE!" in result.output
182+
assert "Run duration was" in result.output
183+
184+
123185
@patch("xapi_db_load.backends.ralph.requests", new_callable=AsyncMock)
124186
@patch(
125187
"xapi_db_load.backends.base_async_backend.clickhouse_connect",

xapi_db_load/ui/load_ui.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,9 @@ def __init__(self, app):
2828
s = app.runner.backend.get_backend_summary()
2929

3030
self.summary = urwid.Text(
31-
"""{backend}
32-
- {num_xapi_batches} batches of {batch_size} events for {total_events:,} events
33-
- {num_actors} actors, with profiles saved {num_actor_profile_changes} times
34-
- {num_courses} courses, with {num_course_publishes} publishes
35-
""".format(**s)
31+
"{backend}- {num_xapi_batches} x {batch_size} events for {total_events:,} events".format(
32+
**s
33+
)
3634
)
3735
self.go_button = urwid.Button(GO_TEXT, self.go_pressed)
3836
self.load_button = urwid.Button(LOAD_TEXT, self.load_pressed)

0 commit comments

Comments
 (0)