Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions scripts/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,10 @@ def _run_with_dirs(tmp_dir: Path, csv_out: Path) -> None:
cur.execute("TRUNCATE release CASCADE")
conn.close()

# Set tables UNLOGGED before the converter streams data via COPY.
# This skips WAL writes during the bulk import phase.
set_tables_unlogged(db_url)

# Converter streams releases into PG; supplementary CSVs still
# go to csv_out (artist_alias.csv, label_hierarchy.csv).
convert_and_filter(
Expand Down Expand Up @@ -664,11 +668,10 @@ def _run_database_build_post_import(
"""Post-import database build for --direct-pg mode.

Skips create_schema (already done), import_csv, and import_tracks
(converter loaded all data directly). Runs create_indexes through vacuum.
(converter loaded all data directly). Tables are already UNLOGGED
(set in _run_xml_pipeline before the converter). Runs create_indexes
through SET LOGGED.
"""
# -- set_tables_unlogged (skip WAL writes during bulk operations)
set_tables_unlogged(db_url)

# -- create_indexes (base trigram indexes, run in parallel)
conn = psycopg.connect(db_url, autocommit=True)
with conn.cursor() as cur:
Expand Down
42 changes: 42 additions & 0 deletions tests/unit/test_run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,48 @@ def test_descriptions_contain_logged(self) -> None:
assert "LOGGED" in desc


class TestDirectPgUnloggedBeforeConverter:
"""In --direct-pg mode, set_tables_unlogged is called before the converter."""

def test_unlogged_before_convert_and_filter(self, tmp_path) -> None:
"""set_tables_unlogged must be called before convert_and_filter in direct-PG mode."""
xml_file = tmp_path / "releases.xml.gz"
xml_file.touch()

args = run_pipeline.parse_args(["--xml", str(xml_file), "--direct-pg"])

call_order = []

def track_set_unlogged(db_url):
call_order.append("set_tables_unlogged")

def track_convert(xml, output_dir, converter, library_artists=None, database_url=None):
call_order.append("convert_and_filter")

with (
patch.object(run_pipeline, "parse_args", return_value=args),
patch.object(run_pipeline, "wait_for_postgres"),
patch.object(run_pipeline, "run_sql_file"),
patch.object(run_pipeline.psycopg, "connect") as mock_conn,
patch.object(run_pipeline, "set_tables_unlogged", side_effect=track_set_unlogged),
patch.object(run_pipeline, "convert_and_filter", side_effect=track_convert),
patch.object(run_pipeline, "_run_database_build_post_import"),
):
mock_cursor = mock_conn.return_value.__enter__.return_value.cursor.return_value
mock_cursor.__enter__ = lambda self: self
mock_cursor.__exit__ = lambda self, *a: None
run_pipeline.main()

assert "set_tables_unlogged" in call_order, "set_tables_unlogged should be called"
assert "convert_and_filter" in call_order, "convert_and_filter should be called"
unlogged_idx = call_order.index("set_tables_unlogged")
convert_idx = call_order.index("convert_and_filter")
assert unlogged_idx < convert_idx, (
f"set_tables_unlogged (index {unlogged_idx}) must come before "
f"convert_and_filter (index {convert_idx})"
)


class TestXmlModeEnrichment:
"""In --xml mode, library_artists.txt is generated from library.db when not provided."""

Expand Down