Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@ ETL pipeline for building and maintaining a PostgreSQL cache of Discogs release
1. **Download** Discogs monthly data dumps (XML) from https://discogs-data-dumps.s3.us-west-2.amazonaws.com/index.html
2. **Enrich** `library_artists.txt` with WXYC cross-references (`scripts/enrich_library_artists.py`, optional)
3. **Convert and filter** XML to CSV using [discogs-xml-converter](https://github.com/WXYC/discogs-xml-converter) (Rust binary), with optional artist filtering via `--library-artists`. Accepts a single XML file or a directory containing releases.xml, artists.xml, and labels.xml. When artists.xml is present, alias-enhanced filtering is enabled automatically. When labels.xml is present, `label_hierarchy.csv` is produced for sublabel-aware dedup.
4. **Create schema** (`schema/create_database.sql`) and **functions** (`schema/create_functions.sql`)
4. **Create schema** (`schema/create_database.sql`) and **functions** (`schema/create_functions.sql`), then **SET UNLOGGED** on all tables to skip WAL writes during bulk import
5. **Import** filtered CSVs into PostgreSQL (`scripts/import_csv.py`)
6. **Create indexes** including accent-insensitive trigram GIN indexes (`schema/create_indexes.sql`)
7. **Deduplicate** by master_id (`scripts/dedup_releases.py`) -- prefers label match (with sublabel resolution via `--label-hierarchy`), then US releases, then most tracks, then lowest ID
8. **Prune or Copy-to** -- one of:
- `--prune`: delete non-matching releases in place (~89% data reduction, 3 GB -> 340 MB)
- `--copy-to`/`--target-db-url`: copy matched releases to a separate database, preserving the full import
9. **Vacuum** to reclaim disk space (`VACUUM FULL`)
10. **SET LOGGED** to restore WAL durability for consumers

`scripts/run_pipeline.py` supports two modes:
- `--xml` mode: runs steps 2-9 (enrich, convert+filter, database build through vacuum). `--xml` accepts a single file or a directory.
- `--csv-dir` mode: runs steps 4-9 (database build from pre-filtered CSVs)
- `--xml` mode: runs steps 2-10 (enrich, convert+filter, database build through SET LOGGED). `--xml` accepts a single file or a directory.
- `--csv-dir` mode: runs steps 4-10 (database build from pre-filtered CSVs)

Both modes support `--target-db-url` to copy matched releases to a separate database instead of pruning in place, and `--resume` (csv-dir only) to skip already-completed steps. `--keep-csv` (xml mode only) writes converted CSVs to a persistent directory instead of a temp dir, so they survive pipeline failures.

Expand Down
55 changes: 51 additions & 4 deletions lib/pipeline_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import json
from pathlib import Path

VERSION = 2
VERSION = 3

STEP_NAMES = [
"create_schema",
Expand All @@ -20,11 +20,24 @@
"create_track_indexes",
"prune",
"vacuum",
"set_logged",
]

# Mapping from v1 step names to v2 equivalents for migration
_V1_STEP_NAMES = ["create_schema", "import_csv", "create_indexes", "dedup", "prune", "vacuum"]

# V2 step names for migration
_V2_STEP_NAMES = [
"create_schema",
"import_csv",
"create_indexes",
"dedup",
"import_tracks",
"create_track_indexes",
"prune",
"vacuum",
]


class PipelineState:
"""Track pipeline step completion status."""
Expand Down Expand Up @@ -78,13 +91,15 @@ def save(self, path: Path) -> None:
def load(cls, path: Path) -> PipelineState:
"""Load state from a JSON file.

Supports v1 state files by migrating them to v2 format.
Supports v1 and v2 state files by migrating them to v3 format.
"""
data = json.loads(path.read_text())
version = data.get("version")

if version == 1:
return cls._migrate_v1(data)
if version == 2:
return cls._migrate_v2(data)
if version != VERSION:
raise ValueError(f"Unsupported state file version {version} (expected {VERSION})")

Expand All @@ -94,21 +109,24 @@ def load(cls, path: Path) -> PipelineState:

@classmethod
def _migrate_v1(cls, data: dict) -> PipelineState:
"""Migrate a v1 state file to v2 format.
"""Migrate a v1 state file to v3 format (via v2 migration rules).

V2 adds import_tracks and create_track_indexes between dedup and prune.
V3 adds set_logged after vacuum.

Migration rules:
- All v1 steps map directly to their v2 equivalents
- If import_csv was completed in v1, import_tracks is also completed
(v1 imported tracks as part of import_csv)
- If create_indexes or dedup was completed in v1, create_track_indexes
is also completed (v1 created track indexes during those steps)
- If vacuum was completed in v1, set_logged is also completed
(v1 used LOGGED tables throughout, so no conversion needed)
"""
state = cls(db_url=data["database_url"], csv_dir=data["csv_dir"])
v1_steps = data.get("steps", {})

# Copy v1 steps that exist in v2
# Copy v1 steps that exist in v3
for step_name in _V1_STEP_NAMES:
if step_name in v1_steps:
state._steps[step_name] = v1_steps[step_name]
Expand All @@ -123,4 +141,33 @@ def _migrate_v1(cls, data: dict) -> PipelineState:
elif v1_steps.get("create_indexes", {}).get("status") == "completed":
state._steps["create_track_indexes"] = {"status": "completed"}

# Infer set_logged from vacuum (v1 used LOGGED tables throughout)
if v1_steps.get("vacuum", {}).get("status") == "completed":
state._steps["set_logged"] = {"status": "completed"}

return state

@classmethod
def _migrate_v2(cls, data: dict) -> PipelineState:
"""Migrate a v2 state file to v3 format.

V3 adds set_logged after vacuum.

Migration rules:
- All v2 steps map directly to their v3 equivalents
- If vacuum was completed in v2, set_logged is also completed
(v2 used LOGGED tables throughout, so no conversion needed)
"""
state = cls(db_url=data["database_url"], csv_dir=data["csv_dir"])
v2_steps = data.get("steps", {})

# Copy v2 steps that exist in v3
for step_name in _V2_STEP_NAMES:
if step_name in v2_steps:
state._steps[step_name] = v2_steps[step_name]

# Infer set_logged from vacuum (v2 used LOGGED tables throughout)
if v2_steps.get("vacuum", {}).get("status") == "completed":
state._steps["set_logged"] = {"status": "completed"}

return state
72 changes: 61 additions & 11 deletions scripts/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

Two modes of operation:

Full pipeline from XML (steps 1-9):
Full pipeline from XML (steps 1-10):
python scripts/run_pipeline.py \\
--xml <releases.xml.gz> \\
--library-artists <library_artists.txt> \\
Expand All @@ -12,7 +12,7 @@
[--wxyc-db-url <mysql://user:pass@host:port/db>] \\
[--database-url <url>]

Database build from pre-filtered CSVs (steps 4-9):
Database build from pre-filtered CSVs (steps 4-10):
python scripts/run_pipeline.py \\
--csv-dir <path/to/filtered/> \\
[--library-db <library.db>] \\
Expand Down Expand Up @@ -52,6 +52,16 @@
# Maximum seconds to wait for Postgres to become ready.
PG_CONNECT_TIMEOUT = 30

# Tables managed by the pipeline (shared by run_vacuum, set_tables_unlogged, set_tables_logged).
PIPELINE_TABLES = [
"release",
"release_artist",
"release_label",
"release_track",
"release_track_artist",
"cache_metadata",
]


def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
"""Parse command-line arguments."""
Expand Down Expand Up @@ -337,18 +347,40 @@ def run_vacuum(db_url: str) -> None:
run_sql_statements_parallel (which opens a separate autocommit
connection per statement) to vacuum all tables concurrently.
"""
tables = [
"release",
"release_artist",
"release_label",
"release_track",
"release_track_artist",
"cache_metadata",
]
statements = [f"VACUUM FULL {table}" for table in tables]
statements = [f"VACUUM FULL {table}" for table in PIPELINE_TABLES]
run_sql_statements_parallel(db_url, statements, description="VACUUM FULL")


def set_tables_unlogged(db_url: str) -> None:
"""Set all pipeline tables to UNLOGGED to skip WAL writes during bulk import.

FK ordering: child tables first (parallel), then the parent ``release``
table, because PostgreSQL requires all tables in a FK relationship to
share the same persistence mode.
"""
child_tables = [t for t in PIPELINE_TABLES if t != "release"]
child_stmts = [f"ALTER TABLE {t} SET UNLOGGED" for t in child_tables]
run_sql_statements_parallel(db_url, child_stmts, description="SET UNLOGGED (children)")
run_sql_statements_parallel(
db_url, ["ALTER TABLE release SET UNLOGGED"], description="SET UNLOGGED (release)"
)


def set_tables_logged(db_url: str) -> None:
"""Set all pipeline tables back to LOGGED for durable storage after import.

FK ordering: parent ``release`` table first, then child tables (parallel),
because PostgreSQL requires all tables in a FK relationship to share
the same persistence mode.
"""
run_sql_statements_parallel(
db_url, ["ALTER TABLE release SET LOGGED"], description="SET LOGGED (release)"
)
child_tables = [t for t in PIPELINE_TABLES if t != "release"]
child_stmts = [f"ALTER TABLE {t} SET LOGGED" for t in child_tables]
run_sql_statements_parallel(db_url, child_stmts, description="SET LOGGED (children)")


def report_sizes(db_url: str) -> None:
"""Log final table row counts and sizes."""
logger.info("Final database state:")
Expand Down Expand Up @@ -634,6 +666,9 @@ def _run_database_build_post_import(
Skips create_schema (already done), import_csv, and import_tracks
(converter loaded all data directly). Runs create_indexes through vacuum.
"""
# -- set_tables_unlogged (skip WAL writes during bulk operations)
set_tables_unlogged(db_url)

# -- create_indexes (base trigram indexes, run in parallel)
conn = psycopg.connect(db_url, autocommit=True)
with conn.cursor() as cur:
Expand Down Expand Up @@ -719,6 +754,9 @@ def _run_database_build_post_import(
# -- vacuum
run_vacuum(db_url)

# -- set_tables_logged (restore WAL durability for consumers)
set_tables_logged(db_url)

# -- report
report_sizes(db_url)

Expand Down Expand Up @@ -765,6 +803,9 @@ def _save_state() -> None:
state.mark_completed("create_schema")
_save_state()

# -- set_tables_unlogged (skip WAL writes during bulk import)
set_tables_unlogged(db_url)

# -- import_csv (base tables, artwork, cache_metadata, track counts)
if state and state.is_completed("import_csv"):
logger.info("Skipping import_csv (already completed)")
Expand Down Expand Up @@ -926,6 +967,15 @@ def _save_state() -> None:
state.mark_completed("vacuum")
_save_state()

# -- set_tables_logged (restore WAL durability for consumers)
if state and state.is_completed("set_logged"):
logger.info("Skipping set_logged (already completed)")
else:
set_tables_logged(vacuum_db)
if state:
state.mark_completed("set_logged")
_save_state()

# -- report
report_sizes(vacuum_db)

Expand Down
22 changes: 21 additions & 1 deletion tests/e2e/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,25 @@ def test_null_title_release_not_imported(self) -> None:
conn.close()
assert count == 0

def test_tables_are_logged(self) -> None:
"""All tables are LOGGED after pipeline completion (not UNLOGGED)."""
conn = self._connect()
with conn.cursor() as cur:
cur.execute("""
SELECT relname, relpersistence
FROM pg_class
WHERE relname IN (
'release', 'release_artist', 'release_label',
'release_track', 'release_track_artist', 'cache_metadata'
)
""")
results = cur.fetchall()
conn.close()
for relname, relpersistence in results:
assert relpersistence == "p", (
f"Table {relname} should be LOGGED (p) after pipeline, got {relpersistence}"
)


FIXTURE_LIBRARY_LABELS = CSV_DIR / "library_labels.csv"

Expand Down Expand Up @@ -574,7 +593,7 @@ def test_all_steps_completed(self) -> None:
def test_state_file_has_correct_metadata(self) -> None:
"""State file contains correct database URL and version."""
data = json.loads(self.__class__._state_file.read_text())
assert data["version"] == 2
assert data["version"] == 3
assert data["database_url"] == self.__class__._db_url


Expand Down Expand Up @@ -649,6 +668,7 @@ def test_resume_skips_all_steps(self) -> None:
assert "Skipping create_track_indexes" in stderr
assert "Skipping prune" in stderr
assert "Skipping vacuum" in stderr
assert "Skipping set_logged" in stderr

def test_resume_completes_successfully(self) -> None:
"""Resume run exits with code 0."""
Expand Down
Loading