From f1ba9d5d2737fa94eaba97748929f4bb70230b0a Mon Sep 17 00:00:00 2001 From: Jake Bromberg Date: Wed, 11 Mar 2026 13:02:22 -0700 Subject: [PATCH 1/2] fix: filter artist tables by known IDs and add unique_key dedup to prevent FK violations Artist alias and member CSVs from discogs-xml-converter contain all Discogs artists, but the artist table only has stub rows for filtered releases. This caused FK violations on artist_alias.artist_id during import. Now import_artist_details queries known artist IDs and filters artist_alias/artist_member rows during COPY. Also adds unique_key dedup configs for artist_alias, artist_member, and release_track to prevent duplicate row errors. --- scripts/import_csv.py | 54 ++++++++++++++++++++++++++-- tests/unit/test_import_csv.py | 68 +++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 2 deletions(-) diff --git a/scripts/import_csv.py b/scripts/import_csv.py index 9bf16f1..98f445e 100644 --- a/scripts/import_csv.py +++ b/scripts/import_csv.py @@ -115,6 +115,7 @@ class TableConfig(TypedDict, total=False): "db_columns": ["release_id", "sequence", "position", "title", "duration"], "required": ["release_id", "title"], "transforms": {}, + "unique_key": ["release_id", "sequence"], }, { "csv_file": "release_track_artist.csv", @@ -135,6 +136,7 @@ class TableConfig(TypedDict, total=False): "db_columns": ["artist_id", "alias_name"], "required": ["artist_id", "alias_name"], "transforms": {}, + "unique_key": ["artist_id", "alias_name"], }, { "csv_file": "artist_member.csv", @@ -143,6 +145,7 @@ class TableConfig(TypedDict, total=False): "db_columns": ["artist_id", "member_id", "member_name"], "required": ["group_artist_id", "member_artist_id", "member_name"], "transforms": {}, + "unique_key": ["group_artist_id", "member_artist_id"], }, ] @@ -159,6 +162,8 @@ def import_csv( transforms: dict, unique_key: list[str] | None = None, release_id_filter: set[int] | None = None, + id_filter: set[int] | None = None, + id_filter_column: str | None = None, ) -> int: """Import a CSV file into a table, selecting only needed columns. @@ -171,6 +176,9 @@ def import_csv( If release_id_filter is provided, only rows whose release_id is in the set are imported. The CSV must have a 'release_id' or 'id' column. + + If id_filter and id_filter_column are provided, only rows where the + specified column's integer value is in id_filter are imported. """ logger.info(f"Importing {csv_path.name} into {table}...") @@ -209,6 +217,12 @@ def import_csv( release_id_idx = col_idx[col_name] break + # Determine generic id_filter column index + id_filter_idx: int | None = None + if id_filter is not None and id_filter_column is not None: + if id_filter_column in header: + id_filter_idx = header.index(id_filter_column) + with conn.cursor() as cur: with cur.copy(f"COPY {table} ({db_col_list}) FROM STDIN") as copy: count = 0 @@ -227,6 +241,17 @@ def import_csv( filtered += 1 continue + # Filter by generic id column if specified + if id_filter is not None and id_filter_idx is not None: + try: + fid = int(row[id_filter_idx]) + except (ValueError, IndexError): + filtered += 1 + continue + if fid not in id_filter: + filtered += 1 + continue + # Extract only the columns we need values: list[str | None] = [] skip = False @@ -430,8 +455,14 @@ def _import_tables( csv_dir: Path, table_list: list[TableConfig], release_id_filter: set[int] | None = None, + artist_id_filter: set[int] | None = None, ) -> int: - """Import a list of table configs, returning total row count.""" + """Import a list of table configs, returning total row count. + + If artist_id_filter is provided, rows are filtered by the first column + in csv_columns that contains 'artist_id' (e.g., 'artist_id' or + 'group_artist_id'). + """ total = 0 for table_config in table_list: csv_path = csv_dir / table_config["csv_file"] @@ -439,6 +470,16 @@ def _import_tables( logger.warning(f"Skipping {table_config['csv_file']} (not found)") continue + # Determine artist_id filter column for this table + id_filter = None + id_filter_column = None + if artist_id_filter is not None: + for col in table_config["csv_columns"]: + if "artist_id" in col: + id_filter = artist_id_filter + id_filter_column = col + break + count = import_csv( conn, csv_path, @@ -449,6 +490,8 @@ def _import_tables( table_config["transforms"], unique_key=table_config.get("unique_key"), release_id_filter=release_id_filter, + id_filter=id_filter, + id_filter_column=id_filter_column, ) total += count return total @@ -546,7 +589,14 @@ def import_artist_details(conn, csv_dir: Path) -> int: logger.info(f" Created {count:,} stub artist rows") total = count - total += _import_tables(conn, csv_dir, ARTIST_TABLES) + + # Query known artist IDs for filtering artist_alias and artist_member + with conn.cursor() as cur: + cur.execute("SELECT id FROM artist") + artist_ids = {row[0] for row in cur.fetchall()} + logger.info(f" Filtering artist tables to {len(artist_ids):,} known artists") + + total += _import_tables(conn, csv_dir, ARTIST_TABLES, artist_id_filter=artist_ids) return total diff --git a/tests/unit/test_import_csv.py b/tests/unit/test_import_csv.py index 15b37b0..ff20358 100644 --- a/tests/unit/test_import_csv.py +++ b/tests/unit/test_import_csv.py @@ -20,8 +20,10 @@ TABLES = _ic.TABLES BASE_TABLES = _ic.BASE_TABLES TRACK_TABLES = _ic.TRACK_TABLES +ARTIST_TABLES = _ic.ARTIST_TABLES TableConfig = _ic.TableConfig _import_tables_parallel = _ic._import_tables_parallel +import_artist_details = _ic.import_artist_details FIXTURES_DIR = Path(__file__).parent.parent / "fixtures" CSV_DIR = FIXTURES_DIR / "csv" @@ -516,3 +518,69 @@ def test_tracks_only_mode_uses_release_id_filter(self, tmp_path) -> None: assert call_args[1]["parent_tables"] == [] assert call_args[1]["child_tables"] == TRACK_TABLES assert call_args[1]["release_id_filter"] == {5001, 5002, 5003} + + +# --------------------------------------------------------------------------- +# Artist table dedup and filtering +# --------------------------------------------------------------------------- + + +class TestArtistTablesConfig: + """ARTIST_TABLES must have unique_key for dedup and be filtered by artist ID.""" + + def test_artist_alias_has_unique_key(self) -> None: + """artist_alias must dedup on (artist_id, alias_name).""" + config = next(t for t in ARTIST_TABLES if t["table"] == "artist_alias") + assert "unique_key" in config + assert config["unique_key"] == ["artist_id", "alias_name"] + + def test_artist_member_has_unique_key(self) -> None: + """artist_member must dedup on (group_artist_id, member_artist_id).""" + config = next(t for t in ARTIST_TABLES if t["table"] == "artist_member") + assert "unique_key" in config + assert config["unique_key"] == ["group_artist_id", "member_artist_id"] + + +class TestReleaseTrackUniqueKey: + """release_track must have unique_key for dedup.""" + + def test_release_track_has_unique_key(self) -> None: + """release_track must dedup on (release_id, sequence).""" + config = next(t for t in TRACK_TABLES if t["table"] == "release_track") + assert "unique_key" in config + assert config["unique_key"] == ["release_id", "sequence"] + + +class TestImportArtistDetailsFiltersById: + """import_artist_details must filter artist tables to known artist IDs.""" + + def test_filters_artist_tables_by_artist_id(self, tmp_path) -> None: + """Only rows with artist_id in the artist table should be imported.""" + from unittest.mock import MagicMock, patch + + # Create dummy CSVs + alias_csv = tmp_path / "artist_alias.csv" + alias_csv.write_text("artist_id,alias_name\n1,Known Alias\n999,Unknown Alias\n") + member_csv = tmp_path / "artist_member.csv" + member_csv.write_text( + "group_artist_id,member_artist_id,member_name\n1,2,Member A\n999,3,Member B\n" + ) + + mock_conn = MagicMock() + mock_cursor = MagicMock() + # Simulate artist table with only artist_id=1 + mock_cursor.rowcount = 1 + mock_cursor.fetchall.return_value = [(1,)] + mock_conn.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor) + mock_conn.cursor.return_value.__exit__ = MagicMock(return_value=False) + + with patch.object(_ic, "_import_tables") as mock_import: + mock_import.return_value = 1 + import_artist_details(mock_conn, tmp_path) + + # _import_tables should be called with an artist_id_filter + mock_import.assert_called_once() + call_kwargs = mock_import.call_args + assert "artist_id_filter" in call_kwargs[1] or ( + len(call_kwargs[0]) > 3 and call_kwargs[0][3] is not None + ) From 2836d200e7261a8e3be31ea05ab77a917491fb70 Mon Sep 17 00:00:00 2001 From: Jake Bromberg Date: Wed, 11 Mar 2026 13:02:53 -0700 Subject: [PATCH 2/2] docs: document artist ID filtering and unique_key dedup in import_csv --- CLAUDE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CLAUDE.md b/CLAUDE.md index 2b25329..861709d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -68,7 +68,7 @@ docker compose up db -d # just the database (for tests) - `scripts/run_pipeline.py` -- Pipeline orchestrator (--xml for steps 2-9, --csv-dir for steps 4-9) - `scripts/enrich_library_artists.py` -- Enrich artist list with WXYC cross-references (pymysql) - `scripts/filter_csv.py` -- Filter Discogs CSVs to library artists (standalone, used outside the pipeline) -- `scripts/import_csv.py` -- Import CSVs into PostgreSQL (psycopg COPY). Child tables are imported in parallel via ThreadPoolExecutor after parent tables. +- `scripts/import_csv.py` -- Import CSVs into PostgreSQL (psycopg COPY). Child tables are imported in parallel via ThreadPoolExecutor after parent tables. Artist detail tables (artist_alias, artist_member) are filtered to known artist IDs to prevent FK violations, since the converter's CSVs contain all Discogs artists. Tables with `unique_key` configs are deduped in-memory during COPY. - `scripts/dedup_releases.py` -- Deduplicate releases by master_id, preferring label match + sublabel resolution, US releases (copy-swap with `DROP CASCADE`). Index/constraint creation is parallelized via ThreadPoolExecutor. - `scripts/verify_cache.py` -- Multi-index fuzzy matching for KEEP/PRUNE classification; `--copy-to` streams matches to a target DB. Fuzzy matching is parallelized via ThreadPoolExecutor (rapidfuzz releases the GIL). Large prune sets (>10K IDs) use copy-and-swap instead of CASCADE DELETE. - `scripts/csv_to_tsv.py` -- CSV to TSV conversion utility