From ec69da8d3755c222d39ad771c21999c7f11e30a4 Mon Sep 17 00:00:00 2001 From: Jake Bromberg Date: Mon, 9 Mar 2026 20:12:57 -0700 Subject: [PATCH] perf: use UNLOGGED tables during bulk import to halve WAL I/O Tables are set UNLOGGED immediately after schema creation (skipping WAL writes during the COPY-intensive import, index, dedup, and prune phases) and converted back to LOGGED after vacuum so consumers get durable tables. The optimization is localized entirely in run_pipeline.py; the canonical schema and standalone scripts remain unchanged. - Add PIPELINE_TABLES constant shared by run_vacuum, set_tables_unlogged, set_tables_logged - Add set_tables_unlogged/set_tables_logged with FK-ordered two-phase execution (children first for UNLOGGED, parent first for LOGGED) - Wire into both _run_database_build (CSV mode) and _run_database_build_post_import (direct-PG mode) - Add "set_logged" to pipeline state (v3) with v2 migration support - Add integration tests verifying pg_class.relpersistence transitions - Add E2E test verifying all tables are LOGGED after pipeline completion --- CLAUDE.md | 7 +- lib/pipeline_state.py | 55 ++++++++++- scripts/run_pipeline.py | 72 +++++++++++--- tests/e2e/test_pipeline.py | 22 ++++- tests/integration/test_unlogged_tables.py | 92 +++++++++++++++++ tests/unit/test_pipeline_state.py | 78 ++++++++++++++- tests/unit/test_run_pipeline.py | 115 ++++++++++++++++++++++ 7 files changed, 419 insertions(+), 22 deletions(-) create mode 100644 tests/integration/test_unlogged_tables.py diff --git a/CLAUDE.md b/CLAUDE.md index c5b9904..078d09e 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -14,7 +14,7 @@ ETL pipeline for building and maintaining a PostgreSQL cache of Discogs release 1. **Download** Discogs monthly data dumps (XML) from https://discogs-data-dumps.s3.us-west-2.amazonaws.com/index.html 2. **Enrich** `library_artists.txt` with WXYC cross-references (`scripts/enrich_library_artists.py`, optional) 3. **Convert and filter** XML to CSV using [discogs-xml-converter](https://github.com/WXYC/discogs-xml-converter) (Rust binary), with optional artist filtering via `--library-artists`. Accepts a single XML file or a directory containing releases.xml, artists.xml, and labels.xml. When artists.xml is present, alias-enhanced filtering is enabled automatically. When labels.xml is present, `label_hierarchy.csv` is produced for sublabel-aware dedup. -4. **Create schema** (`schema/create_database.sql`) and **functions** (`schema/create_functions.sql`) +4. **Create schema** (`schema/create_database.sql`) and **functions** (`schema/create_functions.sql`), then **SET UNLOGGED** on all tables to skip WAL writes during bulk import 5. **Import** filtered CSVs into PostgreSQL (`scripts/import_csv.py`) 6. **Create indexes** including accent-insensitive trigram GIN indexes (`schema/create_indexes.sql`) 7. **Deduplicate** by master_id (`scripts/dedup_releases.py`) -- prefers label match (with sublabel resolution via `--label-hierarchy`), then US releases, then most tracks, then lowest ID @@ -22,10 +22,11 @@ ETL pipeline for building and maintaining a PostgreSQL cache of Discogs release - `--prune`: delete non-matching releases in place (~89% data reduction, 3 GB -> 340 MB) - `--copy-to`/`--target-db-url`: copy matched releases to a separate database, preserving the full import 9. **Vacuum** to reclaim disk space (`VACUUM FULL`) +10. **SET LOGGED** to restore WAL durability for consumers `scripts/run_pipeline.py` supports two modes: -- `--xml` mode: runs steps 2-9 (enrich, convert+filter, database build through vacuum). `--xml` accepts a single file or a directory. -- `--csv-dir` mode: runs steps 4-9 (database build from pre-filtered CSVs) +- `--xml` mode: runs steps 2-10 (enrich, convert+filter, database build through SET LOGGED). `--xml` accepts a single file or a directory. +- `--csv-dir` mode: runs steps 4-10 (database build from pre-filtered CSVs) Both modes support `--target-db-url` to copy matched releases to a separate database instead of pruning in place, and `--resume` (csv-dir only) to skip already-completed steps. `--keep-csv` (xml mode only) writes converted CSVs to a persistent directory instead of a temp dir, so they survive pipeline failures. diff --git a/lib/pipeline_state.py b/lib/pipeline_state.py index 60da0a6..4aa42e0 100644 --- a/lib/pipeline_state.py +++ b/lib/pipeline_state.py @@ -9,7 +9,7 @@ import json from pathlib import Path -VERSION = 2 +VERSION = 3 STEP_NAMES = [ "create_schema", @@ -20,11 +20,24 @@ "create_track_indexes", "prune", "vacuum", + "set_logged", ] # Mapping from v1 step names to v2 equivalents for migration _V1_STEP_NAMES = ["create_schema", "import_csv", "create_indexes", "dedup", "prune", "vacuum"] +# V2 step names for migration +_V2_STEP_NAMES = [ + "create_schema", + "import_csv", + "create_indexes", + "dedup", + "import_tracks", + "create_track_indexes", + "prune", + "vacuum", +] + class PipelineState: """Track pipeline step completion status.""" @@ -78,13 +91,15 @@ def save(self, path: Path) -> None: def load(cls, path: Path) -> PipelineState: """Load state from a JSON file. - Supports v1 state files by migrating them to v2 format. + Supports v1 and v2 state files by migrating them to v3 format. """ data = json.loads(path.read_text()) version = data.get("version") if version == 1: return cls._migrate_v1(data) + if version == 2: + return cls._migrate_v2(data) if version != VERSION: raise ValueError(f"Unsupported state file version {version} (expected {VERSION})") @@ -94,9 +109,10 @@ def load(cls, path: Path) -> PipelineState: @classmethod def _migrate_v1(cls, data: dict) -> PipelineState: - """Migrate a v1 state file to v2 format. + """Migrate a v1 state file to v3 format (via v2 migration rules). V2 adds import_tracks and create_track_indexes between dedup and prune. + V3 adds set_logged after vacuum. Migration rules: - All v1 steps map directly to their v2 equivalents @@ -104,11 +120,13 @@ def _migrate_v1(cls, data: dict) -> PipelineState: (v1 imported tracks as part of import_csv) - If create_indexes or dedup was completed in v1, create_track_indexes is also completed (v1 created track indexes during those steps) + - If vacuum was completed in v1, set_logged is also completed + (v1 used LOGGED tables throughout, so no conversion needed) """ state = cls(db_url=data["database_url"], csv_dir=data["csv_dir"]) v1_steps = data.get("steps", {}) - # Copy v1 steps that exist in v2 + # Copy v1 steps that exist in v3 for step_name in _V1_STEP_NAMES: if step_name in v1_steps: state._steps[step_name] = v1_steps[step_name] @@ -123,4 +141,33 @@ def _migrate_v1(cls, data: dict) -> PipelineState: elif v1_steps.get("create_indexes", {}).get("status") == "completed": state._steps["create_track_indexes"] = {"status": "completed"} + # Infer set_logged from vacuum (v1 used LOGGED tables throughout) + if v1_steps.get("vacuum", {}).get("status") == "completed": + state._steps["set_logged"] = {"status": "completed"} + + return state + + @classmethod + def _migrate_v2(cls, data: dict) -> PipelineState: + """Migrate a v2 state file to v3 format. + + V3 adds set_logged after vacuum. + + Migration rules: + - All v2 steps map directly to their v3 equivalents + - If vacuum was completed in v2, set_logged is also completed + (v2 used LOGGED tables throughout, so no conversion needed) + """ + state = cls(db_url=data["database_url"], csv_dir=data["csv_dir"]) + v2_steps = data.get("steps", {}) + + # Copy v2 steps that exist in v3 + for step_name in _V2_STEP_NAMES: + if step_name in v2_steps: + state._steps[step_name] = v2_steps[step_name] + + # Infer set_logged from vacuum (v2 used LOGGED tables throughout) + if v2_steps.get("vacuum", {}).get("status") == "completed": + state._steps["set_logged"] = {"status": "completed"} + return state diff --git a/scripts/run_pipeline.py b/scripts/run_pipeline.py index ab4a8ee..b03032a 100644 --- a/scripts/run_pipeline.py +++ b/scripts/run_pipeline.py @@ -3,7 +3,7 @@ Two modes of operation: - Full pipeline from XML (steps 1-9): + Full pipeline from XML (steps 1-10): python scripts/run_pipeline.py \\ --xml \\ --library-artists \\ @@ -12,7 +12,7 @@ [--wxyc-db-url ] \\ [--database-url ] - Database build from pre-filtered CSVs (steps 4-9): + Database build from pre-filtered CSVs (steps 4-10): python scripts/run_pipeline.py \\ --csv-dir \\ [--library-db ] \\ @@ -52,6 +52,16 @@ # Maximum seconds to wait for Postgres to become ready. PG_CONNECT_TIMEOUT = 30 +# Tables managed by the pipeline (shared by run_vacuum, set_tables_unlogged, set_tables_logged). +PIPELINE_TABLES = [ + "release", + "release_artist", + "release_label", + "release_track", + "release_track_artist", + "cache_metadata", +] + def parse_args(argv: list[str] | None = None) -> argparse.Namespace: """Parse command-line arguments.""" @@ -337,18 +347,40 @@ def run_vacuum(db_url: str) -> None: run_sql_statements_parallel (which opens a separate autocommit connection per statement) to vacuum all tables concurrently. """ - tables = [ - "release", - "release_artist", - "release_label", - "release_track", - "release_track_artist", - "cache_metadata", - ] - statements = [f"VACUUM FULL {table}" for table in tables] + statements = [f"VACUUM FULL {table}" for table in PIPELINE_TABLES] run_sql_statements_parallel(db_url, statements, description="VACUUM FULL") +def set_tables_unlogged(db_url: str) -> None: + """Set all pipeline tables to UNLOGGED to skip WAL writes during bulk import. + + FK ordering: child tables first (parallel), then the parent ``release`` + table, because PostgreSQL requires all tables in a FK relationship to + share the same persistence mode. + """ + child_tables = [t for t in PIPELINE_TABLES if t != "release"] + child_stmts = [f"ALTER TABLE {t} SET UNLOGGED" for t in child_tables] + run_sql_statements_parallel(db_url, child_stmts, description="SET UNLOGGED (children)") + run_sql_statements_parallel( + db_url, ["ALTER TABLE release SET UNLOGGED"], description="SET UNLOGGED (release)" + ) + + +def set_tables_logged(db_url: str) -> None: + """Set all pipeline tables back to LOGGED for durable storage after import. + + FK ordering: parent ``release`` table first, then child tables (parallel), + because PostgreSQL requires all tables in a FK relationship to share + the same persistence mode. + """ + run_sql_statements_parallel( + db_url, ["ALTER TABLE release SET LOGGED"], description="SET LOGGED (release)" + ) + child_tables = [t for t in PIPELINE_TABLES if t != "release"] + child_stmts = [f"ALTER TABLE {t} SET LOGGED" for t in child_tables] + run_sql_statements_parallel(db_url, child_stmts, description="SET LOGGED (children)") + + def report_sizes(db_url: str) -> None: """Log final table row counts and sizes.""" logger.info("Final database state:") @@ -634,6 +666,9 @@ def _run_database_build_post_import( Skips create_schema (already done), import_csv, and import_tracks (converter loaded all data directly). Runs create_indexes through vacuum. """ + # -- set_tables_unlogged (skip WAL writes during bulk operations) + set_tables_unlogged(db_url) + # -- create_indexes (base trigram indexes, run in parallel) conn = psycopg.connect(db_url, autocommit=True) with conn.cursor() as cur: @@ -719,6 +754,9 @@ def _run_database_build_post_import( # -- vacuum run_vacuum(db_url) + # -- set_tables_logged (restore WAL durability for consumers) + set_tables_logged(db_url) + # -- report report_sizes(db_url) @@ -765,6 +803,9 @@ def _save_state() -> None: state.mark_completed("create_schema") _save_state() + # -- set_tables_unlogged (skip WAL writes during bulk import) + set_tables_unlogged(db_url) + # -- import_csv (base tables, artwork, cache_metadata, track counts) if state and state.is_completed("import_csv"): logger.info("Skipping import_csv (already completed)") @@ -926,6 +967,15 @@ def _save_state() -> None: state.mark_completed("vacuum") _save_state() + # -- set_tables_logged (restore WAL durability for consumers) + if state and state.is_completed("set_logged"): + logger.info("Skipping set_logged (already completed)") + else: + set_tables_logged(vacuum_db) + if state: + state.mark_completed("set_logged") + _save_state() + # -- report report_sizes(vacuum_db) diff --git a/tests/e2e/test_pipeline.py b/tests/e2e/test_pipeline.py index 086f30c..98ef32c 100644 --- a/tests/e2e/test_pipeline.py +++ b/tests/e2e/test_pipeline.py @@ -231,6 +231,25 @@ def test_null_title_release_not_imported(self) -> None: conn.close() assert count == 0 + def test_tables_are_logged(self) -> None: + """All tables are LOGGED after pipeline completion (not UNLOGGED).""" + conn = self._connect() + with conn.cursor() as cur: + cur.execute(""" + SELECT relname, relpersistence + FROM pg_class + WHERE relname IN ( + 'release', 'release_artist', 'release_label', + 'release_track', 'release_track_artist', 'cache_metadata' + ) + """) + results = cur.fetchall() + conn.close() + for relname, relpersistence in results: + assert relpersistence == "p", ( + f"Table {relname} should be LOGGED (p) after pipeline, got {relpersistence}" + ) + FIXTURE_LIBRARY_LABELS = CSV_DIR / "library_labels.csv" @@ -574,7 +593,7 @@ def test_all_steps_completed(self) -> None: def test_state_file_has_correct_metadata(self) -> None: """State file contains correct database URL and version.""" data = json.loads(self.__class__._state_file.read_text()) - assert data["version"] == 2 + assert data["version"] == 3 assert data["database_url"] == self.__class__._db_url @@ -649,6 +668,7 @@ def test_resume_skips_all_steps(self) -> None: assert "Skipping create_track_indexes" in stderr assert "Skipping prune" in stderr assert "Skipping vacuum" in stderr + assert "Skipping set_logged" in stderr def test_resume_completes_successfully(self) -> None: """Resume run exits with code 0.""" diff --git a/tests/integration/test_unlogged_tables.py b/tests/integration/test_unlogged_tables.py new file mode 100644 index 0000000..cc16375 --- /dev/null +++ b/tests/integration/test_unlogged_tables.py @@ -0,0 +1,92 @@ +"""Integration tests for UNLOGGED/LOGGED table conversion during pipeline. + +Verifies that set_tables_unlogged() and set_tables_logged() correctly change +table persistence via pg_class.relpersistence ('p' = LOGGED, 'u' = UNLOGGED). +""" + +from __future__ import annotations + +import importlib.util +from pathlib import Path + +import psycopg +import pytest + +SCHEMA_DIR = Path(__file__).parent.parent.parent / "schema" + +# Load run_pipeline as a module (it's a script, not a package). +_spec = importlib.util.spec_from_file_location( + "run_pipeline", + Path(__file__).parent.parent.parent / "scripts" / "run_pipeline.py", +) +run_pipeline = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(run_pipeline) + +pytestmark = pytest.mark.postgres + + +def _get_table_persistence(db_url: str, table_name: str) -> str: + """Return relpersistence for a table ('p' = LOGGED, 'u' = UNLOGGED, 't' = temp).""" + conn = psycopg.connect(db_url) + with conn.cursor() as cur: + cur.execute( + "SELECT relpersistence FROM pg_class WHERE relname = %s", + (table_name,), + ) + result = cur.fetchone() + conn.close() + assert result is not None, f"Table {table_name} not found" + return result[0] + + +class TestSetTablesUnlogged: + """set_tables_unlogged() converts LOGGED tables to UNLOGGED.""" + + @pytest.fixture(autouse=True) + def _apply_schema(self, db_url): + self.db_url = db_url + conn = psycopg.connect(db_url, autocommit=True) + with conn.cursor() as cur: + cur.execute(SCHEMA_DIR.joinpath("create_database.sql").read_text()) + conn.close() + + def test_tables_start_logged_then_become_unlogged(self) -> None: + """Schema creates LOGGED tables; set_tables_unlogged converts to UNLOGGED.""" + for table in run_pipeline.PIPELINE_TABLES: + assert _get_table_persistence(self.db_url, table) == "p", ( + f"Table {table} should start as LOGGED" + ) + run_pipeline.set_tables_unlogged(self.db_url) + for table in run_pipeline.PIPELINE_TABLES: + assert _get_table_persistence(self.db_url, table) == "u", ( + f"Table {table} should be UNLOGGED" + ) + + +class TestSetTablesLogged: + """set_tables_logged() converts UNLOGGED tables back to LOGGED.""" + + @pytest.fixture(autouse=True) + def _apply_schema_and_set_unlogged(self, db_url): + self.db_url = db_url + conn = psycopg.connect(db_url, autocommit=True) + with conn.cursor() as cur: + cur.execute(SCHEMA_DIR.joinpath("create_database.sql").read_text()) + conn.close() + run_pipeline.set_tables_unlogged(db_url) + + def test_tables_become_logged(self) -> None: + """All pipeline tables have relpersistence = 'p' after set_tables_logged.""" + run_pipeline.set_tables_logged(self.db_url) + for table in run_pipeline.PIPELINE_TABLES: + assert _get_table_persistence(self.db_url, table) == "p", ( + f"Table {table} should be LOGGED" + ) + + def test_idempotent_on_already_logged(self) -> None: + """set_tables_logged on already-LOGGED tables does not error.""" + run_pipeline.set_tables_logged(self.db_url) + # Call again on already-LOGGED tables + run_pipeline.set_tables_logged(self.db_url) + for table in run_pipeline.PIPELINE_TABLES: + assert _get_table_persistence(self.db_url, table) == "p" diff --git a/tests/unit/test_pipeline_state.py b/tests/unit/test_pipeline_state.py index 9e3541d..213ada6 100644 --- a/tests/unit/test_pipeline_state.py +++ b/tests/unit/test_pipeline_state.py @@ -23,8 +23,8 @@ def test_no_steps_completed(self) -> None: assert not state.is_completed(step) def test_step_count(self) -> None: - """V2 pipeline has 8 steps.""" - assert len(STEPS) == 8 + """V3 pipeline has 9 steps.""" + assert len(STEPS) == 9 def test_step_order(self) -> None: """Steps are in correct execution order.""" @@ -37,6 +37,7 @@ def test_step_order(self) -> None: "create_track_indexes", "prune", "vacuum", + "set_logged", ] @@ -83,7 +84,7 @@ def test_save_creates_valid_json(self, tmp_path) -> None: state.save(state_file) data = json.loads(state_file.read_text()) - assert data["version"] == 2 + assert data["version"] == 3 assert data["database_url"] == "postgresql://localhost/test" assert data["csv_dir"] == "/tmp/csv" assert data["steps"]["create_schema"]["status"] == "completed" @@ -223,6 +224,77 @@ def test_v1_preserves_metadata(self, tmp_path) -> None: assert state.csv_dir == "/tmp/csv" +class TestV2Migration: + """load() migrates v2 state files to v3.""" + + def _make_v2_state(self, tmp_path, completed_steps: list[str]) -> None: + """Create a v2 state file.""" + v2_steps = { + name: {"status": "pending"} + for name in [ + "create_schema", + "import_csv", + "create_indexes", + "dedup", + "import_tracks", + "create_track_indexes", + "prune", + "vacuum", + ] + } + for step in completed_steps: + v2_steps[step] = {"status": "completed"} + + data = { + "version": 2, + "database_url": "postgresql://localhost/test", + "csv_dir": "/tmp/csv", + "steps": v2_steps, + } + state_file = tmp_path / "state.json" + state_file.write_text(json.dumps(data)) + + def test_all_completed_v2(self, tmp_path) -> None: + """All v2 steps completed -> all v3 steps completed (set_logged inferred from vacuum).""" + self._make_v2_state( + tmp_path, + [ + "create_schema", + "import_csv", + "create_indexes", + "dedup", + "import_tracks", + "create_track_indexes", + "prune", + "vacuum", + ], + ) + state = PipelineState.load(tmp_path / "state.json") + + for step in STEPS: + assert state.is_completed(step), f"Step {step} should be completed after v2 migration" + + def test_vacuum_not_completed_leaves_set_logged_pending(self, tmp_path) -> None: + """V2 with vacuum not completed -> set_logged is pending.""" + self._make_v2_state( + tmp_path, + ["create_schema", "import_csv", "create_indexes", "dedup"], + ) + state = PipelineState.load(tmp_path / "state.json") + + assert state.is_completed("dedup") + assert not state.is_completed("vacuum") + assert not state.is_completed("set_logged") + + def test_v2_preserves_metadata(self, tmp_path) -> None: + """V2 migration preserves db_url and csv_dir.""" + self._make_v2_state(tmp_path, []) + state = PipelineState.load(tmp_path / "state.json") + + assert state.db_url == "postgresql://localhost/test" + assert state.csv_dir == "/tmp/csv" + + class TestValidateResume: """validate_resume() rejects mismatched db_url or csv_dir.""" diff --git a/tests/unit/test_run_pipeline.py b/tests/unit/test_run_pipeline.py index 89a6039..1fe55bb 100644 --- a/tests/unit/test_run_pipeline.py +++ b/tests/unit/test_run_pipeline.py @@ -292,6 +292,121 @@ def test_vacuum_uses_parallel_execution(self) -> None: assert kwargs.get("description") or args[2] if len(args) > 2 else True +class TestPipelineTables: + """PIPELINE_TABLES constant is shared between run_vacuum and set_tables_*.""" + + def test_pipeline_tables_matches_vacuum_tables(self) -> None: + """PIPELINE_TABLES should contain the same tables used by run_vacuum.""" + expected = { + "release", + "release_artist", + "release_label", + "release_track", + "release_track_artist", + "cache_metadata", + } + assert set(run_pipeline.PIPELINE_TABLES) == expected + + def test_run_vacuum_uses_pipeline_tables(self) -> None: + """run_vacuum should generate VACUUM FULL from PIPELINE_TABLES.""" + from unittest.mock import patch + + with patch.object(run_pipeline, "run_sql_statements_parallel") as mock_parallel: + run_pipeline.run_vacuum("postgresql:///test") + + statements = mock_parallel.call_args[0][1] + vacuum_tables = {s.replace("VACUUM FULL ", "") for s in statements} + assert vacuum_tables == set(run_pipeline.PIPELINE_TABLES) + + +class TestSetTablesUnlogged: + """set_tables_unlogged() generates ALTER TABLE SET UNLOGGED in FK order.""" + + def test_children_first_then_parent(self) -> None: + """Children are set UNLOGGED before the parent (release) for FK ordering.""" + from unittest.mock import patch + + with patch.object(run_pipeline, "run_sql_statements_parallel") as mock_parallel: + run_pipeline.set_tables_unlogged("postgresql:///test") + + assert mock_parallel.call_count == 2 + # First call: child tables + child_stmts = mock_parallel.call_args_list[0][0][1] + assert all("SET UNLOGGED" in s for s in child_stmts) + assert not any( + "release" == s.split()[-2] for s in child_stmts if s.endswith("SET UNLOGGED") + ) + # Second call: parent table + parent_stmts = mock_parallel.call_args_list[1][0][1] + assert parent_stmts == ["ALTER TABLE release SET UNLOGGED"] + + def test_all_tables_covered(self) -> None: + """All PIPELINE_TABLES are included across both phases.""" + from unittest.mock import patch + + with patch.object(run_pipeline, "run_sql_statements_parallel") as mock_parallel: + run_pipeline.set_tables_unlogged("postgresql:///test") + + all_stmts = [] + for c in mock_parallel.call_args_list: + all_stmts.extend(c[0][1]) + tables = {s.split()[2] for s in all_stmts} + assert tables == set(run_pipeline.PIPELINE_TABLES) + + def test_descriptions_contain_unlogged(self) -> None: + from unittest.mock import patch + + with patch.object(run_pipeline, "run_sql_statements_parallel") as mock_parallel: + run_pipeline.set_tables_unlogged("postgresql:///test") + + for c in mock_parallel.call_args_list: + desc = c[1].get("description", c[0][2] if len(c[0]) > 2 else "") + assert "UNLOGGED" in desc + + +class TestSetTablesLogged: + """set_tables_logged() generates ALTER TABLE SET LOGGED in FK order.""" + + def test_parent_first_then_children(self) -> None: + """Parent (release) is set LOGGED before children for FK ordering.""" + from unittest.mock import patch + + with patch.object(run_pipeline, "run_sql_statements_parallel") as mock_parallel: + run_pipeline.set_tables_logged("postgresql:///test") + + assert mock_parallel.call_count == 2 + # First call: parent table + parent_stmts = mock_parallel.call_args_list[0][0][1] + assert parent_stmts == ["ALTER TABLE release SET LOGGED"] + # Second call: child tables + child_stmts = mock_parallel.call_args_list[1][0][1] + assert all("SET LOGGED" in s for s in child_stmts) + assert len(child_stmts) == len(run_pipeline.PIPELINE_TABLES) - 1 + + def test_all_tables_covered(self) -> None: + """All PIPELINE_TABLES are included across both phases.""" + from unittest.mock import patch + + with patch.object(run_pipeline, "run_sql_statements_parallel") as mock_parallel: + run_pipeline.set_tables_logged("postgresql:///test") + + all_stmts = [] + for c in mock_parallel.call_args_list: + all_stmts.extend(c[0][1]) + tables = {s.split()[2] for s in all_stmts} + assert tables == set(run_pipeline.PIPELINE_TABLES) + + def test_descriptions_contain_logged(self) -> None: + from unittest.mock import patch + + with patch.object(run_pipeline, "run_sql_statements_parallel") as mock_parallel: + run_pipeline.set_tables_logged("postgresql:///test") + + for c in mock_parallel.call_args_list: + desc = c[1].get("description", c[0][2] if len(c[0]) > 2 else "") + assert "LOGGED" in desc + + class TestXmlModeEnrichment: """In --xml mode, library_artists.txt is generated from library.db when not provided."""