Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ docker compose up db -d # just the database (for tests)
- `scripts/run_pipeline.py` -- Pipeline orchestrator (--xml for steps 2-9, --csv-dir for steps 4-9)
- `scripts/enrich_library_artists.py` -- Enrich artist list with WXYC cross-references (pymysql)
- `scripts/filter_csv.py` -- Filter Discogs CSVs to library artists (standalone, used outside the pipeline)
- `scripts/import_csv.py` -- Import CSVs into PostgreSQL (psycopg COPY). Child tables are imported in parallel via ThreadPoolExecutor after parent tables.
- `scripts/import_csv.py` -- Import CSVs into PostgreSQL (psycopg COPY). Child tables are imported in parallel via ThreadPoolExecutor after parent tables. Artist detail tables (artist_alias, artist_member) are filtered to known artist IDs to prevent FK violations, since the converter's CSVs contain all Discogs artists. Tables with `unique_key` configs are deduped in-memory during COPY.
- `scripts/dedup_releases.py` -- Deduplicate releases by master_id, preferring label match + sublabel resolution, US releases (copy-swap with `DROP CASCADE`). Index/constraint creation is parallelized via ThreadPoolExecutor.
- `scripts/verify_cache.py` -- Multi-index fuzzy matching for KEEP/PRUNE classification; `--copy-to` streams matches to a target DB. Fuzzy matching is parallelized via ThreadPoolExecutor (rapidfuzz releases the GIL). Large prune sets (>10K IDs) use copy-and-swap instead of CASCADE DELETE.
- `scripts/csv_to_tsv.py` -- CSV to TSV conversion utility
Expand Down
54 changes: 52 additions & 2 deletions scripts/import_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class TableConfig(TypedDict, total=False):
"db_columns": ["release_id", "sequence", "position", "title", "duration"],
"required": ["release_id", "title"],
"transforms": {},
"unique_key": ["release_id", "sequence"],
},
{
"csv_file": "release_track_artist.csv",
Expand All @@ -135,6 +136,7 @@ class TableConfig(TypedDict, total=False):
"db_columns": ["artist_id", "alias_name"],
"required": ["artist_id", "alias_name"],
"transforms": {},
"unique_key": ["artist_id", "alias_name"],
},
{
"csv_file": "artist_member.csv",
Expand All @@ -143,6 +145,7 @@ class TableConfig(TypedDict, total=False):
"db_columns": ["artist_id", "member_id", "member_name"],
"required": ["group_artist_id", "member_artist_id", "member_name"],
"transforms": {},
"unique_key": ["group_artist_id", "member_artist_id"],
},
]

Expand All @@ -159,6 +162,8 @@ def import_csv(
transforms: dict,
unique_key: list[str] | None = None,
release_id_filter: set[int] | None = None,
id_filter: set[int] | None = None,
id_filter_column: str | None = None,
) -> int:
"""Import a CSV file into a table, selecting only needed columns.

Expand All @@ -171,6 +176,9 @@ def import_csv(

If release_id_filter is provided, only rows whose release_id is in the
set are imported. The CSV must have a 'release_id' or 'id' column.

If id_filter and id_filter_column are provided, only rows where the
specified column's integer value is in id_filter are imported.
"""
logger.info(f"Importing {csv_path.name} into {table}...")

Expand Down Expand Up @@ -209,6 +217,12 @@ def import_csv(
release_id_idx = col_idx[col_name]
break

# Determine generic id_filter column index
id_filter_idx: int | None = None
if id_filter is not None and id_filter_column is not None:
if id_filter_column in header:
id_filter_idx = header.index(id_filter_column)

with conn.cursor() as cur:
with cur.copy(f"COPY {table} ({db_col_list}) FROM STDIN") as copy:
count = 0
Expand All @@ -227,6 +241,17 @@ def import_csv(
filtered += 1
continue

# Filter by generic id column if specified
if id_filter is not None and id_filter_idx is not None:
try:
fid = int(row[id_filter_idx])
except (ValueError, IndexError):
filtered += 1
continue
if fid not in id_filter:
filtered += 1
continue

# Extract only the columns we need
values: list[str | None] = []
skip = False
Expand Down Expand Up @@ -430,15 +455,31 @@ def _import_tables(
csv_dir: Path,
table_list: list[TableConfig],
release_id_filter: set[int] | None = None,
artist_id_filter: set[int] | None = None,
) -> int:
"""Import a list of table configs, returning total row count."""
"""Import a list of table configs, returning total row count.

If artist_id_filter is provided, rows are filtered by the first column
in csv_columns that contains 'artist_id' (e.g., 'artist_id' or
'group_artist_id').
"""
total = 0
for table_config in table_list:
csv_path = csv_dir / table_config["csv_file"]
if not csv_path.exists():
logger.warning(f"Skipping {table_config['csv_file']} (not found)")
continue

# Determine artist_id filter column for this table
id_filter = None
id_filter_column = None
if artist_id_filter is not None:
for col in table_config["csv_columns"]:
if "artist_id" in col:
id_filter = artist_id_filter
id_filter_column = col
break

count = import_csv(
conn,
csv_path,
Expand All @@ -449,6 +490,8 @@ def _import_tables(
table_config["transforms"],
unique_key=table_config.get("unique_key"),
release_id_filter=release_id_filter,
id_filter=id_filter,
id_filter_column=id_filter_column,
)
total += count
return total
Expand Down Expand Up @@ -546,7 +589,14 @@ def import_artist_details(conn, csv_dir: Path) -> int:
logger.info(f" Created {count:,} stub artist rows")

total = count
total += _import_tables(conn, csv_dir, ARTIST_TABLES)

# Query known artist IDs for filtering artist_alias and artist_member
with conn.cursor() as cur:
cur.execute("SELECT id FROM artist")
artist_ids = {row[0] for row in cur.fetchall()}
logger.info(f" Filtering artist tables to {len(artist_ids):,} known artists")

total += _import_tables(conn, csv_dir, ARTIST_TABLES, artist_id_filter=artist_ids)
return total


Expand Down
68 changes: 68 additions & 0 deletions tests/unit/test_import_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
TABLES = _ic.TABLES
BASE_TABLES = _ic.BASE_TABLES
TRACK_TABLES = _ic.TRACK_TABLES
ARTIST_TABLES = _ic.ARTIST_TABLES
TableConfig = _ic.TableConfig
_import_tables_parallel = _ic._import_tables_parallel
import_artist_details = _ic.import_artist_details

FIXTURES_DIR = Path(__file__).parent.parent / "fixtures"
CSV_DIR = FIXTURES_DIR / "csv"
Expand Down Expand Up @@ -516,3 +518,69 @@ def test_tracks_only_mode_uses_release_id_filter(self, tmp_path) -> None:
assert call_args[1]["parent_tables"] == []
assert call_args[1]["child_tables"] == TRACK_TABLES
assert call_args[1]["release_id_filter"] == {5001, 5002, 5003}


# ---------------------------------------------------------------------------
# Artist table dedup and filtering
# ---------------------------------------------------------------------------


class TestArtistTablesConfig:
"""ARTIST_TABLES must have unique_key for dedup and be filtered by artist ID."""

def test_artist_alias_has_unique_key(self) -> None:
"""artist_alias must dedup on (artist_id, alias_name)."""
config = next(t for t in ARTIST_TABLES if t["table"] == "artist_alias")
assert "unique_key" in config
assert config["unique_key"] == ["artist_id", "alias_name"]

def test_artist_member_has_unique_key(self) -> None:
"""artist_member must dedup on (group_artist_id, member_artist_id)."""
config = next(t for t in ARTIST_TABLES if t["table"] == "artist_member")
assert "unique_key" in config
assert config["unique_key"] == ["group_artist_id", "member_artist_id"]


class TestReleaseTrackUniqueKey:
"""release_track must have unique_key for dedup."""

def test_release_track_has_unique_key(self) -> None:
"""release_track must dedup on (release_id, sequence)."""
config = next(t for t in TRACK_TABLES if t["table"] == "release_track")
assert "unique_key" in config
assert config["unique_key"] == ["release_id", "sequence"]


class TestImportArtistDetailsFiltersById:
"""import_artist_details must filter artist tables to known artist IDs."""

def test_filters_artist_tables_by_artist_id(self, tmp_path) -> None:
"""Only rows with artist_id in the artist table should be imported."""
from unittest.mock import MagicMock, patch

# Create dummy CSVs
alias_csv = tmp_path / "artist_alias.csv"
alias_csv.write_text("artist_id,alias_name\n1,Known Alias\n999,Unknown Alias\n")
member_csv = tmp_path / "artist_member.csv"
member_csv.write_text(
"group_artist_id,member_artist_id,member_name\n1,2,Member A\n999,3,Member B\n"
)

mock_conn = MagicMock()
mock_cursor = MagicMock()
# Simulate artist table with only artist_id=1
mock_cursor.rowcount = 1
mock_cursor.fetchall.return_value = [(1,)]
mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor)
mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False)

with patch.object(_ic, "_import_tables") as mock_import:
mock_import.return_value = 1
import_artist_details(mock_conn, tmp_path)

# _import_tables should be called with an artist_id_filter
mock_import.assert_called_once()
call_kwargs = mock_import.call_args
assert "artist_id_filter" in call_kwargs[1] or (
len(call_kwargs[0]) > 3 and call_kwargs[0][3] is not None
)
Loading