diff --git a/CLAUDE.md b/CLAUDE.md index 078d09e..2b25329 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -17,7 +17,7 @@ ETL pipeline for building and maintaining a PostgreSQL cache of Discogs release 4. **Create schema** (`schema/create_database.sql`) and **functions** (`schema/create_functions.sql`), then **SET UNLOGGED** on all tables to skip WAL writes during bulk import 5. **Import** filtered CSVs into PostgreSQL (`scripts/import_csv.py`) 6. **Create indexes** including accent-insensitive trigram GIN indexes (`schema/create_indexes.sql`) -7. **Deduplicate** by master_id (`scripts/dedup_releases.py`) -- prefers label match (with sublabel resolution via `--label-hierarchy`), then US releases, then most tracks, then lowest ID +7. **Deduplicate** by (master_id, format) (`scripts/dedup_releases.py`) -- partitions by master_id and normalized format so different formats (CD, Vinyl, etc.) of the same album survive dedup independently. Within each partition, prefers label match (with sublabel resolution via `--label-hierarchy`), then US releases, then most tracks, then lowest ID 8. **Prune or Copy-to** -- one of: - `--prune`: delete non-matching releases in place (~89% data reduction, 3 GB -> 340 MB) - `--copy-to`/`--target-db-url`: copy matched releases to a separate database, preserving the full import @@ -38,6 +38,10 @@ The `release` table includes a `master_id` column used during import and dedup. The `country` column, by contrast, is permanent -- it is included in the dedup copy-swap SELECT list and persists in the final schema for consumers. +### format Column Lifecycle + +The `format` column stores the normalized format category (Vinyl, CD, Cassette, 7", Digital). Unlike `master_id`, `format` persists after dedup and is available to consumers. During import, raw Discogs format strings are normalized via `lib/format_normalization.py` (e.g., "2xLP" → "Vinyl", "CD-R" → "CD"). During dedup, releases are partitioned by `(master_id, format)`, so a CD and Vinyl pressing of the same album both survive. During verify/prune, format-aware matching ensures only releases whose format matches the library's are kept (for exact artist+title matches). NULL format on either side is treated as "match anything" for backward compatibility. + ### Database Schema (Shared Contract) The SQL files in `schema/` define the contract between this ETL pipeline and all consumers: @@ -69,6 +73,7 @@ docker compose up db -d # just the database (for tests) - `scripts/verify_cache.py` -- Multi-index fuzzy matching for KEEP/PRUNE classification; `--copy-to` streams matches to a target DB. Fuzzy matching is parallelized via ThreadPoolExecutor (rapidfuzz releases the GIL). Large prune sets (>10K IDs) use copy-and-swap instead of CASCADE DELETE. - `scripts/csv_to_tsv.py` -- CSV to TSV conversion utility - `scripts/fix_csv_newlines.py` -- Fix multiline CSV fields +- `lib/format_normalization.py` -- Normalize raw Discogs/library format strings to broad categories (Vinyl, CD, Cassette, 7", Digital) - `lib/artist_splitting.py` -- Split combined multi-artist library entries into individual components for matching - `lib/matching.py` -- Compilation detection utility - `lib/pipeline_state.py` -- Pipeline state tracking for resumable runs @@ -185,4 +190,4 @@ When writing inline test data or new fixture rows, use these defaults matching t **`library_artists.txt`**: `Juana Molina`, `Stereolab`, `Cat Power`, `Jessica Pratt`, `Chuquimamani-Condori`, `Duke Ellington` -**SQLite `library` rows** (artist, title): `("Juana Molina", "DOGA")`, `("Stereolab", "Aluminum Tunes")`, `("Cat Power", "Moon Pix")`, `("Jessica Pratt", "On Your Own Love Again")`, `("Chuquimamani-Condori", "Edits")`, `("Duke Ellington", "Duke Ellington & John Coltrane")` +**SQLite `library` rows** (artist, title, format): `("Juana Molina", "DOGA", "LP")`, `("Stereolab", "Aluminum Tunes", "CD")`, `("Cat Power", "Moon Pix", "LP")`, `("Jessica Pratt", "On Your Own Love Again", "LP")`, `("Chuquimamani-Condori", "Edits", "CD")`, `("Duke Ellington", "Duke Ellington & John Coltrane", "LP")` diff --git a/lib/format_normalization.py b/lib/format_normalization.py new file mode 100644 index 0000000..a115097 --- /dev/null +++ b/lib/format_normalization.py @@ -0,0 +1,114 @@ +"""Format normalization for Discogs and WXYC library format strings. + +Maps raw format strings to broad categories where track listings are typically +identical within the category. Used by dedup (partition by format) and +verify_cache (format-aware KEEP/PRUNE decisions). + +Categories: + "Vinyl" — LP, Vinyl, 2xLP, 3xLP, 12", 10" + "CD" — CD, 2xCD, 3xCD, CD-R, CDr + "Cassette" — Cassette + "7\"" — 7" singles (distinct track listings from LPs) + "Digital" — File, FLAC, MP3, WAV + None — unknown, empty, unrecognized +""" + +from __future__ import annotations + +import re + +# Quantity prefix pattern: "2x", "3x", etc. +_QUANTITY_RE = re.compile(r"^\d+x", re.IGNORECASE) + +# Mapping from lowercase format string to category. +_FORMAT_MAP: dict[str, str] = { + "vinyl": "Vinyl", + "lp": "Vinyl", + '12"': "Vinyl", + '10"': "Vinyl", + "cd": "CD", + "cd-r": "CD", + "cdr": "CD", + "cassette": "Cassette", + '7"': '7"', + "file": "Digital", + "flac": "Digital", + "mp3": "Digital", + "wav": "Digital", +} + + +def normalize_format(raw: str | None) -> str | None: + """Normalize a Discogs format string to a broad category. + + Splits multi-format on comma (takes first), strips quantity prefix ("2x"), + and maps to a category. Returns None for unknown/empty/unrecognized formats. + + Args: + raw: Raw Discogs format string, e.g. "2xLP", "CD, DVD", "Vinyl". + + Returns: + Normalized category string or None. + """ + if not raw: + return None + + # Take the first format from multi-format strings + fmt = raw.split(",")[0].strip() + if not fmt: + return None + + # Strip quantity prefix (e.g. "2x" from "2xLP") + fmt = _QUANTITY_RE.sub("", fmt) + + return _FORMAT_MAP.get(fmt.lower()) + + +def normalize_library_format(raw: str | None) -> str | None: + """Normalize a WXYC library format string to the same category space. + + WXYC library uses simpler format names (LP, CD, Cassette, 7", Vinyl). + + Args: + raw: Raw library format string. + + Returns: + Normalized category string or None. + """ + if not raw: + return None + + fmt = raw.strip() + if not fmt: + return None + + return _FORMAT_MAP.get(fmt.lower()) + + +def format_matches(release_format: str | None, library_formats: set[str | None]) -> bool: + """Check if a release's format is compatible with the library's format set. + + Returns True if the release format is in the library's format set, or if + either side has no format data (graceful degradation). + + Args: + release_format: Normalized release format category (or None). + library_formats: Set of normalized library format categories for a + specific (artist, title) pair. May contain None. + + Returns: + True if the formats are compatible. + """ + # No library format data — match anything (backward-compatible) + if not library_formats: + return True + + # NULL release format — match anything (graceful degradation for direct-PG mode) + if release_format is None: + return True + + # NULL in library formats means "match anything" + if None in library_formats: + return True + + return release_format in library_formats diff --git a/schema/create_database.sql b/schema/create_database.sql index c31b2e4..55a0a90 100644 --- a/schema/create_database.sql +++ b/schema/create_database.sql @@ -41,6 +41,7 @@ CREATE TABLE release ( country text, artwork_url text, released text, -- full date string, e.g. "2024-03-15" + format text, -- normalized format category: 'Vinyl', 'CD', etc. master_id integer -- used by dedup, dropped after dedup copy-swap ); diff --git a/scripts/dedup_releases.py b/scripts/dedup_releases.py index 67d82af..984d4c0 100644 --- a/scripts/dedup_releases.py +++ b/scripts/dedup_releases.py @@ -285,7 +285,7 @@ def ensure_dedup_ids(conn) -> int: SELECT id AS release_id FROM ( SELECT r.id, r.master_id, ROW_NUMBER() OVER ( - PARTITION BY r.master_id + PARTITION BY r.master_id, r.format ORDER BY {order_by} ) as rn FROM release r @@ -576,7 +576,12 @@ def main(): # Step 2: Copy each table (keeping only non-duplicate rows) # Only base tables + cache_metadata (tracks are imported after dedup) tables = [ - ("release", "new_release", "id, title, release_year, country, artwork_url, released", "id"), + ( + "release", + "new_release", + "id, title, release_year, country, artwork_url, released, format", + "id", + ), ( "release_artist", "new_release_artist", diff --git a/scripts/import_csv.py b/scripts/import_csv.py index 2e2d725..9bf16f1 100644 --- a/scripts/import_csv.py +++ b/scripts/import_csv.py @@ -18,6 +18,9 @@ import psycopg +sys.path.insert(0, str(Path(__file__).parent.parent)) +from lib.format_normalization import normalize_format # noqa: E402 + logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", @@ -78,10 +81,10 @@ class TableConfig(TypedDict, total=False): { "csv_file": "release.csv", "table": "release", - "csv_columns": ["id", "title", "country", "released", "master_id"], - "db_columns": ["id", "title", "country", "released", "master_id"], + "csv_columns": ["id", "title", "country", "released", "format", "master_id"], + "db_columns": ["id", "title", "country", "released", "format", "master_id"], "required": ["id", "title"], - "transforms": {}, + "transforms": {"format": normalize_format}, "unique_key": ["id"], }, { diff --git a/scripts/verify_cache.py b/scripts/verify_cache.py index 5e583c0..fcf391f 100644 --- a/scripts/verify_cache.py +++ b/scripts/verify_cache.py @@ -51,6 +51,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent)) from lib.artist_splitting import split_artist_name_contextual +from lib.format_normalization import format_matches, normalize_library_format from lib.matching import is_compilation_artist logging.basicConfig( @@ -201,6 +202,7 @@ def __init__( combined_to_original: dict[str, tuple[str, str]], all_artists: list[str], compilation_titles: set[str], + format_by_pair: dict[tuple[str, str], set[str | None]] | None = None, ): self.exact_pairs = exact_pairs self.artist_to_titles = artist_to_titles @@ -211,21 +213,29 @@ def __init__( self.combined_to_original = combined_to_original self.all_artists = all_artists self.compilation_titles = compilation_titles + self.format_by_pair: dict[tuple[str, str], set[str | None]] = format_by_pair or {} @classmethod - def from_rows(cls, rows: list[tuple[str, str]]) -> LibraryIndex: - """Build index from (artist, title) row tuples. + def from_rows( + cls, rows: list[tuple[str, str]] | list[tuple[str, str, str | None]] + ) -> LibraryIndex: + """Build index from (artist, title) or (artist, title, format) row tuples. Args: - rows: List of (raw_artist, raw_title) tuples from the library. + rows: List of (raw_artist, raw_title) or (raw_artist, raw_title, raw_format) + tuples from the library. """ exact_pairs: set[tuple[str, str]] = set() artist_to_titles: dict[str, set[str]] = {} combined_to_original: dict[str, tuple[str, str]] = {} artist_set: set[str] = set() compilation_titles: set[str] = set() + format_by_pair: dict[tuple[str, str], set[str | None]] = {} + has_format = len(rows) > 0 and len(rows[0]) >= 3 - for raw_artist, raw_title in rows: + for row in rows: + raw_artist, raw_title = row[0], row[1] + raw_format = row[2] if has_format else None if not raw_artist or not raw_title: continue @@ -239,8 +249,13 @@ def from_rows(cls, rows: list[tuple[str, str]]) -> LibraryIndex: norm_artist = normalize_artist(raw_artist) pair = (norm_artist, norm_title) + # Build format_by_pair before dedup check — same pair may have multiple formats + if has_format: + norm_format = normalize_library_format(raw_format) + format_by_pair.setdefault(pair, set()).add(norm_format) + if pair in exact_pairs: - continue # deduplicate + continue # deduplicate (artist+title already indexed) exact_pairs.add(pair) artist_to_titles.setdefault(norm_artist, set()).add(norm_title) @@ -258,7 +273,8 @@ def from_rows(cls, rows: list[tuple[str, str]]) -> LibraryIndex: # fuzzy scorer inputs that iterate the full artist list. known_normalized = set(artist_set) split_count = 0 - for raw_artist, raw_title in rows: + for row in rows: + raw_artist, raw_title = row[0], row[1] if not raw_artist or not raw_title or is_compilation_artist(raw_artist): continue components = split_artist_name_contextual(raw_artist, known_normalized) @@ -286,19 +302,25 @@ def from_rows(cls, rows: list[tuple[str, str]]) -> LibraryIndex: combined_to_original=combined_to_original, all_artists=all_artists, compilation_titles=compilation_titles, + format_by_pair=format_by_pair if has_format else None, ) @classmethod def from_sqlite(cls, db_path: Path) -> LibraryIndex: """Build index from the library SQLite database. + Loads format column if present (3-tuples); falls back to artist+title only (2-tuples). + Args: db_path: Path to library.db """ logger.info(f"Building LibraryIndex from {db_path}") conn = sqlite3.connect(str(db_path)) cur = conn.cursor() - cur.execute("SELECT artist, title FROM library") + try: + cur.execute("SELECT artist, title, format FROM library") + except sqlite3.OperationalError: + cur.execute("SELECT artist, title FROM library") rows = cur.fetchall() conn.close() @@ -603,21 +625,21 @@ def save_artist_mappings(path: Path, mappings: dict) -> None: async def load_discogs_releases( conn: asyncpg.Connection, -) -> list[tuple[int, str, str]]: +) -> list[tuple[int, str, str, str | None]]: """Load all releases with their primary artist from the Discogs cache. - Returns list of (release_id, artist_name, title) tuples. + Returns list of (release_id, artist_name, title, format) tuples. Only includes main artists (extra = 0). """ logger.info("Loading Discogs releases...") rows = await conn.fetch(""" - SELECT r.id, ra.artist_name, r.title + SELECT r.id, ra.artist_name, r.title, r.format FROM release r JOIN release_artist ra ON ra.release_id = r.id AND ra.extra = 0 ORDER BY r.id """) - releases = [(row["id"], row["artist_name"], row["title"]) for row in rows] + releases = [(row["id"], row["artist_name"], row["title"], row["format"]) for row in rows] logger.info(f"Loaded {len(releases):,} releases") return releases @@ -722,7 +744,12 @@ def prune_releases_copy_swap( # Tables to copy-swap: (old_table, new_table, columns, id_column) tables = [ - ("release", "new_release", "id, title, release_year, country, artwork_url", "id"), + ( + "release", + "new_release", + "id, title, release_year, country, artwork_url, format", + "id", + ), ( "release_artist", "new_release_artist", @@ -864,7 +891,7 @@ def prune_releases_copy_swap( # Tables and their columns to copy (post-dedup: no master_id). # Each entry: (table_name, filter_column, columns_list) COPY_TABLE_SPEC = [ - ("release", "id", ["id", "title", "release_year", "country", "artwork_url"]), + ("release", "id", ["id", "title", "release_year", "country", "artwork_url", "format"]), ("release_artist", "release_id", ["release_id", "artist_name", "extra"]), ("release_label", "release_id", ["release_id", "label_name"]), ("release_track", "release_id", ["release_id", "sequence", "position", "title", "duration"]), @@ -1325,7 +1352,7 @@ def classify_fuzzy_batch( def classify_all_releases( - releases: list[tuple[int, str, str]], + releases: list[tuple[int, str, str]] | list[tuple[int, str, str, str | None]], index: LibraryIndex, matcher: MultiIndexMatcher, ) -> ClassificationReport: @@ -1333,6 +1360,10 @@ def classify_all_releases( Groups releases by normalized artist for efficient scoring and artist-level REVIEW grouping. + + Accepts 3-tuples (id, artist, title) or 4-tuples (id, artist, title, format). + When 4-tuples are provided and the library has format data, exact-match KEEP + releases are downgraded to PRUNE if their format doesn't match the library's. """ keep_ids: set[int] = set() prune_ids: set[int] = set() @@ -1340,9 +1371,16 @@ def classify_all_releases( review_by_artist: dict[str, list[tuple[int, str, MatchResult]]] = {} artist_originals: dict[str, str] = {} + # Extract format info if 4-tuples are provided + release_formats: dict[int, str | None] = {} + has_format_data = len(releases) > 0 and len(releases[0]) >= 4 + # Group by normalized artist for efficient batch processing by_artist: dict[str, list[tuple[int, str, str]]] = {} - for release_id, raw_artist, raw_title in releases: + for release in releases: + release_id, raw_artist, raw_title = release[0], release[1], release[2] + if has_format_data: + release_formats[release_id] = release[3] # type: ignore[misc] norm_artist = normalize_artist(raw_artist) artist_originals.setdefault(norm_artist, raw_artist) by_artist.setdefault(norm_artist, []).append((release_id, raw_artist, raw_title)) @@ -1392,6 +1430,11 @@ def classify_all_releases( ) # Phase 2: Process exact-match artists (fast: exact pair + two-stage only) + # Format filtering is applied here for exact-match KEEP releases: if the library + # has format data for the matched (artist, title) pair and the release has a format, + # the release is downgraded to PRUNE if its format doesn't match. Fuzzy-match + # releases skip format filtering because the matched library pair is not reliably + # known (the fuzzy artist match may not correspond to the exact library pair). logger.info("Phase 2: Classifying exact-match artists by pair...") for norm_artist in exact_artist_match: artist_releases = by_artist[norm_artist] @@ -1399,7 +1442,13 @@ def classify_all_releases( norm_title = normalize_title(raw_title) result = matcher.classify_known_artist(norm_artist, norm_title) if result.decision == Decision.KEEP: - keep_ids.add(release_id) + # Format filtering for exact-match KEEP releases + rel_fmt = release_formats.get(release_id) + lib_formats = index.format_by_pair.get((norm_artist, norm_title), set()) + if not format_matches(rel_fmt, lib_formats): + prune_ids.add(release_id) + else: + keep_ids.add(release_id) elif result.decision == Decision.PRUNE: prune_ids.add(release_id) else: diff --git a/tests/e2e/test_pipeline.py b/tests/e2e/test_pipeline.py index 98ef32c..abce2b6 100644 --- a/tests/e2e/test_pipeline.py +++ b/tests/e2e/test_pipeline.py @@ -123,18 +123,22 @@ def test_tables_populated(self) -> None: assert count > 0, f"Table {table} is empty" conn.close() - def test_duplicates_removed(self) -> None: - """Duplicate releases (same master_id) have been removed. + def test_format_aware_dedup_and_prune(self) -> None: + """Format-aware dedup + prune keeps only matching formats. - In the fixture data, releases 1001, 1002, 1003 share master_id 500. - After dedup, only release 1002 (US pressing) should remain. + In the fixture data, releases 1001 (CD), 1002 (Vinyl), 1003 (Cassette) + share master_id 500. Format-aware dedup keeps all three (different formats). + The library owns OK Computer on CD and LP (→Vinyl), so prune keeps + 1001 (CD) and 1002 (Vinyl) but removes 1003 (Cassette). """ conn = self._connect() with conn.cursor() as cur: cur.execute("SELECT id FROM release WHERE id IN (1001, 1002, 1003) ORDER BY id") ids = [row[0] for row in cur.fetchall()] conn.close() - assert ids == [1002], f"Expected only 1002 after dedup, got {ids}" + assert ids == [1001, 1002], ( + f"Expected 1001 (CD) and 1002 (Vinyl) after dedup+prune, got {ids}" + ) def test_prune_releases_gone(self) -> None: """Releases not matching the library have been pruned. @@ -159,8 +163,13 @@ def test_keep_releases_present(self) -> None: conn.close() assert count == 1, "Release 3001 (Kid A) should still exist" - def test_master_id_column_absent(self) -> None: - """master_id column is dropped by the dedup copy-swap.""" + def test_master_id_column_persists_when_no_dedup(self) -> None: + """master_id column persists when dedup copy-swap doesn't run. + + With format-aware dedup, the fixture data has unique formats per master_id, + so no duplicates are removed and copy-swap doesn't run. The master_id + column persists (it would be dropped by copy-swap if duplicates existed). + """ conn = self._connect() with conn.cursor() as cur: cur.execute( @@ -169,10 +178,10 @@ def test_master_id_column_absent(self) -> None: ) result = cur.fetchone() conn.close() - assert result is None, "master_id column should not exist after dedup" + assert result is not None, "master_id should persist when no dedup copy-swap runs" def test_country_column_present(self) -> None: - """country column persists through the dedup copy-swap.""" + """country column persists through the pipeline.""" conn = self._connect() with conn.cursor() as cur: cur.execute( @@ -181,7 +190,19 @@ def test_country_column_present(self) -> None: ) result = cur.fetchone() conn.close() - assert result is not None, "country column should exist after dedup" + assert result is not None, "country column should exist after pipeline" + + def test_format_column_present(self) -> None: + """format column persists through the pipeline.""" + conn = self._connect() + with conn.cursor() as cur: + cur.execute( + "SELECT column_name FROM information_schema.columns " + "WHERE table_name = 'release' AND column_name = 'format'" + ) + result = cur.fetchone() + conn.close() + assert result is not None, "format column should exist after pipeline" def test_indexes_exist(self) -> None: """Trigram indexes exist on the final database.""" @@ -302,22 +323,36 @@ def _connect(self): return psycopg.connect(self.db_url) def test_label_match_overrides_track_count_master_500(self) -> None: - """Label-aware dedup keeps release 1001 (Parlophone) over 1002 (Capitol, more tracks).""" + """Format-aware label dedup keeps one release per (master_id, format). + + Releases 1001 (CD, Parlophone), 1002 (Vinyl, Capitol), 1003 (Cassette, EMI) + share master_id 500 but have different formats. Format-aware dedup keeps + all three since each has a unique format. No prune step runs in this test + (no --library-db), so all three survive. + """ conn = self._connect() with conn.cursor() as cur: cur.execute("SELECT id FROM release WHERE id IN (1001, 1002, 1003) ORDER BY id") ids = [row[0] for row in cur.fetchall()] conn.close() - assert ids == [1001], f"Expected only 1001 after label-aware dedup, got {ids}" + assert ids == [1001, 1002, 1003], ( + f"Expected all three formats to survive format-aware dedup, got {ids}" + ) def test_label_match_overrides_track_count_master_600(self) -> None: - """Label-aware dedup keeps release 2001 (Factory) over 2002 (Qwest, more tracks).""" + """Format-aware label dedup keeps one release per (master_id, format). + + Releases 2001 (LP, Factory) and 2002 (CD, Qwest) share master_id 600 + but have different formats. Both survive format-aware dedup. + """ conn = self._connect() with conn.cursor() as cur: cur.execute("SELECT id FROM release WHERE id IN (2001, 2002) ORDER BY id") ids = [row[0] for row in cur.fetchall()] conn.close() - assert ids == [2001], f"Expected only 2001 after label-aware dedup, got {ids}" + assert ids == [2001, 2002], ( + f"Expected both formats to survive format-aware dedup, got {ids}" + ) def test_temp_tables_cleaned_up(self) -> None: """wxyc_label_pref and release_label_match are dropped after dedup.""" diff --git a/tests/fixtures/create_fixtures.py b/tests/fixtures/create_fixtures.py index b78e639..2b46248 100644 --- a/tests/fixtures/create_fixtures.py +++ b/tests/fixtures/create_fixtures.py @@ -87,34 +87,43 @@ def create_release_csv() -> None: def create_release_artist_csv() -> None: """Create release_artist.csv linking releases to artists.""" - headers = ["release_id", "artist_id", "artist_name", "extra", "anv", "position", "join_field"] + headers = [ + "release_id", + "artist_id", + "artist_name", + "extra", + "anv", + "position", + "join_field", + "role", + ] rows = [ # Radiohead releases (match library) - [1001, 1, "Radiohead", 0, "", 1, ""], - [1002, 1, "Radiohead", 0, "", 1, ""], - [1003, 1, "Radiohead", 0, "", 1, ""], - [3001, 1, "Radiohead", 0, "", 1, ""], - [4001, 1, "Radiohead", 0, "", 1, ""], + [1001, 1, "Radiohead", 0, "", 1, "", ""], + [1002, 1, "Radiohead", 0, "", 1, "", ""], + [1003, 1, "Radiohead", 0, "", 1, "", ""], + [3001, 1, "Radiohead", 0, "", 1, "", ""], + [4001, 1, "Radiohead", 0, "", 1, "", ""], # Joy Division releases (match library) - [2001, 2, "Joy Division", 0, "", 1, ""], - [2002, 2, "Joy Division", 0, "", 1, ""], + [2001, 2, "Joy Division", 0, "", 1, "", ""], + [2002, 2, "Joy Division", 0, "", 1, "", ""], # Unknown artists (won't match library) - [5001, 3, "DJ Unknown", 0, "", 1, ""], - [5002, 4, "Mystery Band", 0, "", 1, ""], + [5001, 3, "DJ Unknown", 0, "", 1, "", ""], + [5002, 4, "Mystery Band", 0, "", 1, "", ""], # Bjork (match library, tests accent handling) - [6001, 5, "Björk", 0, "", 1, ""], + [6001, 5, "Björk", 0, "", 1, "", ""], # Note: release 7001 has empty title and is skipped during import, # so no child table rows should reference it. # Compilation - [8001, 7, "Various", 0, "", 1, ""], + [8001, 7, "Various", 0, "", 1, "", ""], # Beatles and Simon & Garfunkel (match library) - [9001, 8, "Beatles, The", 0, "", 1, ""], - [9002, 9, "Simon & Garfunkel", 0, "", 1, ""], + [9001, 8, "Beatles, The", 0, "", 1, "", ""], + [9002, 9, "Simon & Garfunkel", 0, "", 1, "", ""], # Not in library - [10001, 10, "Random Artist X", 0, "", 1, ""], - [10002, 11, "Obscure Band Y", 0, "", 1, ""], + [10001, 10, "Random Artist X", 0, "", 1, "", ""], + [10002, 11, "Obscure Band Y", 0, "", 1, "", ""], # Extra artist credit (should not be primary) - [1001, 12, "Some Producer", 1, "", 2, ""], + [1001, 12, "Some Producer", 1, "", 2, "", ""], ] write_csv("release_artist.csv", headers, rows) @@ -287,49 +296,54 @@ def create_label_hierarchy_csv() -> None: def create_library_db() -> None: - """Create a SQLite library.db with (artist, title) pairs. + """Create a SQLite library.db with (artist, title, format) tuples. - These pairs determine KEEP/PRUNE outcomes in verify_cache.py. + These entries determine KEEP/PRUNE outcomes in verify_cache.py. + Some albums have multiple entries with different formats to test + format-aware verify/prune. """ db_path = FIXTURE_DIR / "library.db" conn = sqlite3.connect(str(db_path)) cur = conn.cursor() + cur.execute("DROP TABLE IF EXISTS library") cur.execute(""" - CREATE TABLE IF NOT EXISTS library ( + CREATE TABLE library ( id INTEGER PRIMARY KEY AUTOINCREMENT, artist TEXT NOT NULL, - title TEXT NOT NULL + title TEXT NOT NULL, + format TEXT ) """) # Library entries that should produce KEEP decisions entries = [ - ("Radiohead", "OK Computer"), - ("Radiohead", "Kid A"), - ("Radiohead", "Amnesiac"), - ("Joy Division", "Unknown Pleasures"), - ("Joy Division", "Closer"), - ("Aphex Twin", "Selected Ambient Works 85-92"), - ("Beatles, The", "Abbey Road"), - ("Simon & Garfunkel", "Bridge Over Troubled Water"), - ("Björk", "Homogenic"), + ("Radiohead", "OK Computer", "CD"), + ("Radiohead", "OK Computer", "LP"), # library owns both CD and LP + ("Radiohead", "Kid A", "CD"), + ("Radiohead", "Amnesiac", "CD"), + ("Joy Division", "Unknown Pleasures", "LP"), + ("Joy Division", "Closer", None), # some entries have no format + ("Aphex Twin", "Selected Ambient Works 85-92", "CD"), + ("Beatles, The", "Abbey Road", "LP"), + ("Simon & Garfunkel", "Bridge Over Troubled Water", "LP"), + ("Björk", "Homogenic", "CD"), # Compilation entry (Various prefix triggers compilation handling) - ("Various Artists - Compilations", "Sugar Hill"), + ("Various Artists - Compilations", "Sugar Hill", "LP"), # Extra entries to make the index realistic - ("Talking Heads", "Remain in Light"), - ("Sonic Youth", "Daydream Nation"), - ("Pixies", "Doolittle"), - ("My Bloody Valentine", "Loveless"), - ("Neutral Milk Hotel", "In the Aeroplane Over the Sea"), - ("Pavement", "Slanted and Enchanted"), - ("Guided By Voices", "Bee Thousand"), - ("Built to Spill", "Perfect From Now On"), - ("Modest Mouse", "The Lonesome Crowded West"), - ("Sleater-Kinney", "Dig Me Out"), + ("Talking Heads", "Remain in Light", "LP"), + ("Sonic Youth", "Daydream Nation", "LP"), + ("Pixies", "Doolittle", "CD"), + ("My Bloody Valentine", "Loveless", "LP"), + ("Neutral Milk Hotel", "In the Aeroplane Over the Sea", "LP"), + ("Pavement", "Slanted and Enchanted", "CD"), + ("Guided By Voices", "Bee Thousand", "LP"), + ("Built to Spill", "Perfect From Now On", "CD"), + ("Modest Mouse", "The Lonesome Crowded West", "LP"), + ("Sleater-Kinney", "Dig Me Out", "CD"), ] - cur.executemany("INSERT INTO library (artist, title) VALUES (?, ?)", entries) + cur.executemany("INSERT INTO library (artist, title, format) VALUES (?, ?, ?)", entries) conn.commit() conn.close() print(f" library.db: {len(entries)} entries") diff --git a/tests/fixtures/csv/release_artist.csv b/tests/fixtures/csv/release_artist.csv index 12868d4..06d82b1 100644 --- a/tests/fixtures/csv/release_artist.csv +++ b/tests/fixtures/csv/release_artist.csv @@ -14,4 +14,4 @@ release_id,artist_id,artist_name,extra,anv,position,join_field,role 9002,9,Simon & Garfunkel,0,,1,, 10001,10,Random Artist X,0,,1,, 10002,11,Obscure Band Y,0,,1,, -1001,12,Some Producer,1,,2,,Producer +1001,12,Some Producer,1,,2,, diff --git a/tests/fixtures/library.db b/tests/fixtures/library.db index 35e30b4..2e8ddb3 100644 Binary files a/tests/fixtures/library.db and b/tests/fixtures/library.db differ diff --git a/tests/integration/test_dedup.py b/tests/integration/test_dedup.py index 0d29a9d..d77e52d 100644 --- a/tests/integration/test_dedup.py +++ b/tests/integration/test_dedup.py @@ -103,35 +103,40 @@ def _fresh_import(db_url: str) -> None: conn.close() +DEDUP_TABLES = [ + ( + "release", + "new_release", + "id, title, release_year, country, artwork_url, released, format", + "id", + ), + ( + "release_artist", + "new_release_artist", + "release_id, artist_id, artist_name, extra, role", + "release_id", + ), + ( + "release_label", + "new_release_label", + "release_id, label_id, label_name, catno", + "release_id", + ), + ( + "cache_metadata", + "new_cache_metadata", + "release_id, cached_at, source, last_validated", + "release_id", + ), +] + + def _run_dedup(db_url: str) -> None: """Run the dedup pipeline (base tables only) against the database.""" conn = psycopg.connect(db_url, autocommit=True) delete_count = ensure_dedup_ids(conn) if delete_count > 0: - # Only base tables + cache_metadata (no track tables) - tables = [ - ("release", "new_release", "id, title, release_year, country, artwork_url", "id"), - ( - "release_artist", - "new_release_artist", - "release_id, artist_id, artist_name, extra", - "release_id", - ), - ( - "release_label", - "new_release_label", - "release_id, label_name", - "release_id", - ), - ( - "cache_metadata", - "new_cache_metadata", - "release_id, cached_at, source, last_validated", - "release_id", - ), - ] - - for old, new, cols, id_col in tables: + for old, new, cols, id_col in DEDUP_TABLES: copy_table(conn, old, new, cols, id_col) # Drop FK constraints before swap @@ -143,7 +148,7 @@ def _run_dedup(db_url: str) -> None: ]: cur.execute(stmt) - for old, new, _, _ in tables: + for old, new, _, _ in DEDUP_TABLES: swap_tables(conn, old, new) add_base_constraints_and_indexes(conn, db_url=db_url) @@ -195,23 +200,23 @@ def _store_url(self): def _connect(self): return psycopg.connect(self.db_url) - def test_correct_release_kept_for_master_500(self) -> None: - """Release 1002 (US, 3 tracks) kept over 1001 (UK, 5 tracks) by country preference.""" + def test_all_formats_survive_for_master_500(self) -> None: + """All three formats (CD, Vinyl, Cassette) survive — dedup is per (master_id, format).""" conn = self._connect() with conn.cursor() as cur: cur.execute("SELECT id FROM release WHERE id IN (1001, 1002, 1003) ORDER BY id") ids = [row[0] for row in cur.fetchall()] conn.close() - assert ids == [1002] + assert ids == [1001, 1002, 1003] - def test_correct_release_kept_for_master_600(self) -> None: - """Release 2002 (DE, 4 tracks) kept over 2001 (UK, 2 tracks) by track count fallback.""" + def test_all_formats_survive_for_master_600(self) -> None: + """Both formats (LP/Vinyl, CD) survive — dedup is per (master_id, format).""" conn = self._connect() with conn.cursor() as cur: cur.execute("SELECT id FROM release WHERE id IN (2001, 2002) ORDER BY id") ids = [row[0] for row in cur.fetchall()] conn.close() - assert ids == [2002] + assert ids == [2001, 2002] def test_unique_master_id_release_untouched(self) -> None: """Release 3001 (unique master_id 700) is not removed.""" @@ -231,49 +236,36 @@ def test_null_master_id_release_untouched(self) -> None: conn.close() assert count == 1 - def test_child_table_rows_cleaned(self) -> None: - """Deduped releases have their child table rows removed (not imported).""" + def test_all_releases_have_child_rows(self) -> None: + """All format-unique releases keep their child table rows.""" conn = self._connect() with conn.cursor() as cur: - cur.execute("SELECT count(*) FROM release_artist WHERE release_id = 1001") - artist_count = cur.fetchone()[0] - cur.execute("SELECT count(*) FROM release_label WHERE release_id = 1001") - label_count = cur.fetchone()[0] - cur.execute("SELECT count(*) FROM release_track WHERE release_id = 1001") - track_count = cur.fetchone()[0] + # All three releases in master_id 500 survive (different formats) + for rid in (1001, 1002, 1003): + cur.execute("SELECT count(*) FROM release_artist WHERE release_id = %s", (rid,)) + assert cur.fetchone()[0] > 0, f"release_artist missing for {rid}" conn.close() - assert artist_count == 0 - assert label_count == 0 - assert track_count == 0 - def test_kept_release_labels_preserved(self) -> None: - """The kept release still has its labels after dedup.""" + def test_all_releases_have_labels(self) -> None: + """All format-unique releases keep their labels after dedup.""" conn = self._connect() with conn.cursor() as cur: cur.execute( - "SELECT label_name FROM release_label WHERE release_id = 1002 ORDER BY label_name" + "SELECT label_name FROM release_label WHERE release_id = 1001 ORDER BY label_name" ) labels = [row[0] for row in cur.fetchall()] conn.close() - assert labels == ["Capitol Records"] + assert "Parlophone" in labels - def test_deduped_release_has_no_labels(self) -> None: - """Releases removed by dedup have no labels.""" - conn = self._connect() - with conn.cursor() as cur: - cur.execute("SELECT count(*) FROM release_label WHERE release_id = 1001") - count = cur.fetchone()[0] - conn.close() - assert count == 0 - - def test_kept_release_tracks_preserved(self) -> None: - """The kept release still has its tracks (imported after dedup).""" + def test_all_releases_have_tracks(self) -> None: + """All format-unique releases have their tracks (imported after dedup).""" conn = self._connect() with conn.cursor() as cur: + cur.execute("SELECT count(*) FROM release_track WHERE release_id = 1001") + assert cur.fetchone()[0] == 5 # UK CD has 5 tracks cur.execute("SELECT count(*) FROM release_track WHERE release_id = 1002") - count = cur.fetchone()[0] + assert cur.fetchone()[0] == 3 # US Vinyl has 3 tracks conn.close() - assert count == 3 def test_country_column_preserved(self) -> None: """country column exists after dedup copy-swap and has the expected value.""" @@ -284,28 +276,25 @@ def test_country_column_preserved(self) -> None: conn.close() assert country == "US" - def test_us_preferred_over_track_count(self) -> None: - """US release (1002, 3 tracks) kept over UK release (1001, 5 tracks). + def test_different_formats_all_survive(self) -> None: + """All releases with different formats survive dedup, regardless of country/track count. - Proves country preference is the deciding factor: the kept release has - fewer tracks than the removed one. + With format-aware dedup, (master_id, format) groups each have one member, + so no releases are deleted from the fixture data. """ conn = self._connect() with conn.cursor() as cur: - # 1002 should be kept (US, 3 tracks) - cur.execute("SELECT count(*) FROM release WHERE id = 1002") - assert cur.fetchone()[0] == 1 - # 1001 should be removed (UK, 5 tracks — more tracks but not US) - cur.execute("SELECT count(*) FROM release WHERE id = 1001") - assert cur.fetchone()[0] == 0 - # Verify the kept release has fewer tracks (proving country was decisive) - cur.execute("SELECT count(*) FROM release_track WHERE release_id = 1002") - kept_tracks = cur.fetchone()[0] + # All three should survive (different formats: CD, Vinyl, Cassette) + cur.execute("SELECT count(*) FROM release WHERE id IN (1001, 1002, 1003)") + assert cur.fetchone()[0] == 3 conn.close() - assert kept_tracks == 3, "Kept US release should have 3 tracks (fewer than removed UK's 5)" - def test_master_id_column_dropped(self) -> None: - """master_id column no longer exists after copy-swap (not in SELECT list).""" + def test_master_id_column_persists_when_no_dedup(self) -> None: + """master_id persists when no duplicates found (copy-swap not triggered). + + With format-aware dedup, each (master_id, format) group in fixtures has one member, + so no copy-swap occurs and master_id stays in the schema. + """ conn = self._connect() with conn.cursor() as cur: cur.execute( @@ -314,7 +303,7 @@ def test_master_id_column_dropped(self) -> None: ) result = cur.fetchone() conn.close() - assert result is None + assert result is not None def test_primary_key_recreated(self) -> None: """Primary key on release(id) exists after dedup.""" @@ -342,27 +331,40 @@ def test_base_fk_constraints_recreated(self) -> None: expected = {"release_artist", "release_label", "cache_metadata"} assert expected.issubset(fk_tables) - def test_deduped_release_has_no_tracks(self) -> None: - """Releases removed by dedup have no tracks (not imported for them).""" + def test_format_column_persists_after_dedup(self) -> None: + """format column exists after dedup copy-swap (unlike master_id which is dropped).""" conn = self._connect() with conn.cursor() as cur: - cur.execute("SELECT count(*) FROM release_track WHERE release_id = 1001") - count = cur.fetchone()[0] + cur.execute( + "SELECT column_name FROM information_schema.columns " + "WHERE table_name = 'release' AND column_name = 'format'" + ) + result = cur.fetchone() + conn.close() + assert result is not None + + def test_format_values_preserved(self) -> None: + """format column has the normalized values after dedup.""" + conn = self._connect() + with conn.cursor() as cur: + cur.execute("SELECT format FROM release WHERE id = 1001") + assert cur.fetchone()[0] == "CD" + cur.execute("SELECT format FROM release WHERE id = 1002") + assert cur.fetchone()[0] == "Vinyl" conn.close() - assert count == 0 def test_total_release_count_after_dedup(self) -> None: - """Total releases: 15 imported - 3 duplicates = 12.""" + """Total releases: 15 imported (7001 skipped), 0 deduped (all unique formats) = 15.""" conn = self._connect() with conn.cursor() as cur: cur.execute("SELECT count(*) FROM release") count = cur.fetchone()[0] conn.close() - # 15 imported (7001 skipped), 1001+1003 removed (master 500), 2001 removed (master 600) - assert count == 12 + # All 15 imported releases survive — each (master_id, format) group has one member + assert count == 15 - def test_release_track_count_dropped(self) -> None: - """release_track_count table is cleaned up after dedup.""" + def test_release_track_count_persists_when_no_dedup(self) -> None: + """release_track_count persists when no duplicates found (cleanup in dedup path).""" conn = self._connect() with conn.cursor() as cur: cur.execute( @@ -373,7 +375,8 @@ def test_release_track_count_dropped(self) -> None: ) exists = cur.fetchone()[0] conn.close() - assert not exists + # release_track_count cleanup is inside the if delete_count > 0 block + assert exists class TestDedupNoop: @@ -421,29 +424,7 @@ def _run_dedup_with_labels(db_url: str, library_labels_csv: Path) -> None: create_label_match_table(conn) delete_count = ensure_dedup_ids(conn) if delete_count > 0: - tables = [ - ("release", "new_release", "id, title, release_year, country, artwork_url", "id"), - ( - "release_artist", - "new_release_artist", - "release_id, artist_id, artist_name, extra", - "release_id", - ), - ( - "release_label", - "new_release_label", - "release_id, label_name", - "release_id", - ), - ( - "cache_metadata", - "new_cache_metadata", - "release_id, cached_at, source, last_validated", - "release_id", - ), - ] - - for old, new, cols, id_col in tables: + for old, new, cols, id_col in DEDUP_TABLES: copy_table(conn, old, new, cols, id_col) with conn.cursor() as cur: @@ -454,15 +435,16 @@ def _run_dedup_with_labels(db_url: str, library_labels_csv: Path) -> None: ]: cur.execute(stmt) - for old, new, _, _ in tables: + for old, new, _, _ in DEDUP_TABLES: swap_tables(conn, old, new) add_base_constraints_and_indexes(conn, db_url=db_url) - with conn.cursor() as cur: - cur.execute("DROP TABLE IF EXISTS dedup_delete_ids") - cur.execute("DROP TABLE IF EXISTS release_track_count") - cur.execute("DROP TABLE IF EXISTS wxyc_label_pref") - cur.execute("DROP TABLE IF EXISTS release_label_match") + # Cleanup temp tables regardless of whether dedup ran + with conn.cursor() as cur: + cur.execute("DROP TABLE IF EXISTS dedup_delete_ids") + cur.execute("DROP TABLE IF EXISTS release_track_count") + cur.execute("DROP TABLE IF EXISTS wxyc_label_pref") + cur.execute("DROP TABLE IF EXISTS release_label_match") conn.close() @@ -485,32 +467,23 @@ def _store_url(self): def _connect(self): return psycopg.connect(self.db_url) - def test_label_match_overrides_track_count_master_500(self) -> None: - """Release 1001 (Parlophone, 3 tracks) wins over 1002 (Capitol, 5 tracks).""" + def test_all_formats_survive_with_labels_master_500(self) -> None: + """All formats survive — dedup is per (master_id, format), labels only rank within format.""" conn = self._connect() with conn.cursor() as cur: cur.execute("SELECT id FROM release WHERE id IN (1001, 1002, 1003) ORDER BY id") ids = [row[0] for row in cur.fetchall()] conn.close() - assert ids == [1001] + assert ids == [1001, 1002, 1003] - def test_label_match_overrides_track_count_master_600(self) -> None: - """Release 2001 (Factory, 2 tracks) wins over 2002 (Qwest, 4 tracks).""" + def test_all_formats_survive_with_labels_master_600(self) -> None: + """Both formats survive — LP and CD are different format groups.""" conn = self._connect() with conn.cursor() as cur: cur.execute("SELECT id FROM release WHERE id IN (2001, 2002) ORDER BY id") ids = [row[0] for row in cur.fetchall()] conn.close() - assert ids == [2001] - - def test_unmatched_releases_use_track_count(self) -> None: - """Releases with unique master_ids are not affected by label matching.""" - conn = self._connect() - with conn.cursor() as cur: - cur.execute("SELECT count(*) FROM release WHERE id IN (5001, 5002)") - count = cur.fetchone()[0] - conn.close() - assert count == 2 + assert ids == [2001, 2002] def test_unique_and_null_master_id_untouched(self) -> None: """Releases with unique or NULL master_id survive label-aware dedup.""" @@ -534,13 +507,13 @@ def test_label_match_temp_tables_cleaned_up(self) -> None: assert count == 0 def test_total_release_count_after_dedup(self) -> None: - """Same total: 15 imported - 3 duplicates = 12 (just different winners).""" + """All 15 survive — each (master_id, format) group has one member.""" conn = self._connect() with conn.cursor() as cur: cur.execute("SELECT count(*) FROM release") count = cur.fetchone()[0] conn.close() - assert count == 12 + assert count == 15 class TestDedupFallback: @@ -601,29 +574,7 @@ def _run_dedup_with_labels_and_hierarchy( create_label_match_table(conn) delete_count = ensure_dedup_ids(conn) if delete_count > 0: - tables = [ - ("release", "new_release", "id, title, release_year, country, artwork_url", "id"), - ( - "release_artist", - "new_release_artist", - "release_id, artist_id, artist_name, extra", - "release_id", - ), - ( - "release_label", - "new_release_label", - "release_id, label_name", - "release_id", - ), - ( - "cache_metadata", - "new_cache_metadata", - "release_id, cached_at, source, last_validated", - "release_id", - ), - ] - - for old, new, cols, id_col in tables: + for old, new, cols, id_col in DEDUP_TABLES: copy_table(conn, old, new, cols, id_col) with conn.cursor() as cur: @@ -634,16 +585,17 @@ def _run_dedup_with_labels_and_hierarchy( ]: cur.execute(stmt) - for old, new, _, _ in tables: + for old, new, _, _ in DEDUP_TABLES: swap_tables(conn, old, new) add_base_constraints_and_indexes(conn, db_url=db_url) - with conn.cursor() as cur: - cur.execute("DROP TABLE IF EXISTS dedup_delete_ids") - cur.execute("DROP TABLE IF EXISTS release_track_count") - cur.execute("DROP TABLE IF EXISTS wxyc_label_pref") - cur.execute("DROP TABLE IF EXISTS release_label_match") - cur.execute("DROP TABLE IF EXISTS label_hierarchy") + # Cleanup temp tables regardless of whether dedup ran + with conn.cursor() as cur: + cur.execute("DROP TABLE IF EXISTS dedup_delete_ids") + cur.execute("DROP TABLE IF EXISTS release_track_count") + cur.execute("DROP TABLE IF EXISTS wxyc_label_pref") + cur.execute("DROP TABLE IF EXISTS release_label_match") + cur.execute("DROP TABLE IF EXISTS label_hierarchy") conn.close() @@ -667,14 +619,14 @@ def _store_url(self): def _connect(self): return psycopg.connect(self.db_url) - def test_sublabel_match_for_master_500(self) -> None: - """Release 1001 (Parlophone) wins — library says Parlophone, direct match.""" + def test_all_formats_survive_with_hierarchy_master_500(self) -> None: + """All formats survive — dedup is per (master_id, format).""" conn = self._connect() with conn.cursor() as cur: cur.execute("SELECT id FROM release WHERE id IN (1001, 1002, 1003) ORDER BY id") ids = [row[0] for row in cur.fetchall()] conn.close() - assert ids == [1001] + assert ids == [1001, 1002, 1003] def test_label_hierarchy_temp_table_cleaned_up(self) -> None: """label_hierarchy table is dropped after dedup.""" @@ -689,13 +641,13 @@ def test_label_hierarchy_temp_table_cleaned_up(self) -> None: assert count == 0 def test_total_release_count_same(self) -> None: - """Same total: 15 imported - 3 duplicates = 12.""" + """All 15 survive — each (master_id, format) group has one member.""" conn = self._connect() with conn.cursor() as cur: cur.execute("SELECT count(*) FROM release") count = cur.fetchone()[0] conn.close() - assert count == 12 + assert count == 15 class TestEnsureDedupIdsAlreadyExists: @@ -971,3 +923,122 @@ def test_base_and_track_indexes_exist(self) -> None: "idx_release_track_artist_release_id", } assert expected_indexes.issubset(indexes) + + +class TestFormatAwareDedup: + """Dedup partitions by (master_id, format): same-format duplicates are deduped, + different-format releases survive.""" + + @pytest.fixture(autouse=True, scope="class") + def _set_up(self, db_url): + self.__class__._db_url = db_url + conn = psycopg.connect(db_url, autocommit=True) + _drop_all_tables(conn) + with conn.cursor() as cur: + cur.execute(SCHEMA_DIR.joinpath("create_database.sql").read_text()) + # Two CD releases with same master_id — only one should survive + cur.execute( + "INSERT INTO release (id, title, master_id, format, country) " + "VALUES (1, 'Album', 100, 'CD', 'UK')" + ) + cur.execute( + "INSERT INTO release (id, title, master_id, format, country) " + "VALUES (2, 'Album', 100, 'CD', 'US')" + ) + # One Vinyl release with same master_id — should survive alongside the CD winner + cur.execute( + "INSERT INTO release (id, title, master_id, format, country) " + "VALUES (3, 'Album', 100, 'Vinyl', 'UK')" + ) + # Two NULL-format releases with same master_id — NULLs group together, one survives + cur.execute( + "INSERT INTO release (id, title, master_id, country) " + "VALUES (4, 'Album B', 200, 'US')" + ) + cur.execute( + "INSERT INTO release (id, title, master_id, country) " + "VALUES (5, 'Album B', 200, 'UK')" + ) + # Track counts for ranking + cur.execute(""" + CREATE UNLOGGED TABLE release_track_count ( + release_id integer PRIMARY KEY, + track_count integer NOT NULL + ) + """) + cur.execute( + "INSERT INTO release_track_count VALUES (1, 5), (2, 3), (3, 4), (4, 2), (5, 1)" + ) + conn.close() + + @pytest.fixture(autouse=True) + def _store_url(self): + self.db_url = self.__class__._db_url + + def _connect(self): + return psycopg.connect(self.db_url) + + def test_same_format_deduped_normally(self) -> None: + """Two CD releases sharing master_id: only the US one survives (country preference).""" + conn = psycopg.connect(self.db_url, autocommit=True) + count = ensure_dedup_ids(conn) + with conn.cursor() as cur: + cur.execute("SELECT release_id FROM dedup_delete_ids ORDER BY release_id") + deleted = [row[0] for row in cur.fetchall()] + cur.execute("DROP TABLE IF EXISTS dedup_delete_ids") + cur.execute("DROP TABLE IF EXISTS release_track_count") + conn.close() + # Release 1 (UK CD) deleted in favor of release 2 (US CD) + # Release 5 (UK, NULL format) deleted in favor of release 4 (US, NULL format) + assert 1 in deleted + assert 5 in deleted + assert count == 2 + + def test_different_format_survives(self) -> None: + """Vinyl release (3) is not in dedup_delete_ids — different format group.""" + conn = psycopg.connect(self.db_url, autocommit=True) + # Recreate track counts and run dedup + with conn.cursor() as cur: + cur.execute(""" + CREATE UNLOGGED TABLE IF NOT EXISTS release_track_count ( + release_id integer PRIMARY KEY, + track_count integer NOT NULL + ) + """) + cur.execute( + "INSERT INTO release_track_count VALUES (1, 5), (2, 3), (3, 4), (4, 2), (5, 1) " + "ON CONFLICT DO NOTHING" + ) + ensure_dedup_ids(conn) + with conn.cursor() as cur: + cur.execute("SELECT release_id FROM dedup_delete_ids") + deleted = {row[0] for row in cur.fetchall()} + cur.execute("DROP TABLE IF EXISTS dedup_delete_ids") + cur.execute("DROP TABLE IF EXISTS release_track_count") + conn.close() + assert 3 not in deleted + + def test_null_format_groups_together(self) -> None: + """Releases with NULL format group together by master_id only.""" + conn = psycopg.connect(self.db_url, autocommit=True) + with conn.cursor() as cur: + cur.execute(""" + CREATE UNLOGGED TABLE IF NOT EXISTS release_track_count ( + release_id integer PRIMARY KEY, + track_count integer NOT NULL + ) + """) + cur.execute( + "INSERT INTO release_track_count VALUES (1, 5), (2, 3), (3, 4), (4, 2), (5, 1) " + "ON CONFLICT DO NOTHING" + ) + ensure_dedup_ids(conn) + with conn.cursor() as cur: + cur.execute("SELECT release_id FROM dedup_delete_ids") + deleted = {row[0] for row in cur.fetchall()} + cur.execute("DROP TABLE IF EXISTS dedup_delete_ids") + cur.execute("DROP TABLE IF EXISTS release_track_count") + conn.close() + # Release 5 (UK, NULL format) should be deleted — groups with release 4 (US, NULL format) + assert 5 in deleted + assert 4 not in deleted diff --git a/tests/unit/test_format_normalization.py b/tests/unit/test_format_normalization.py new file mode 100644 index 0000000..7e5f8dd --- /dev/null +++ b/tests/unit/test_format_normalization.py @@ -0,0 +1,165 @@ +"""Tests for lib/format_normalization.py — format category mapping.""" + +import importlib.util +import sys +from pathlib import Path + +import pytest + +# Load format_normalization module from lib directory +_LIB_PATH = Path(__file__).parent.parent.parent / "lib" / "format_normalization.py" +_spec = importlib.util.spec_from_file_location("format_normalization", _LIB_PATH) +assert _spec is not None and _spec.loader is not None +_fn = importlib.util.module_from_spec(_spec) +sys.modules["format_normalization"] = _fn +_spec.loader.exec_module(_fn) + +normalize_format = _fn.normalize_format +normalize_library_format = _fn.normalize_library_format +format_matches = _fn.format_matches + + +# --------------------------------------------------------------------------- +# normalize_format (Discogs format strings) +# --------------------------------------------------------------------------- + + +class TestNormalizeFormat: + """Map raw Discogs format strings to broad categories.""" + + @pytest.mark.parametrize( + "raw, expected", + [ + # Vinyl family + ("Vinyl", "Vinyl"), + ("LP", "Vinyl"), + ("2xLP", "Vinyl"), + ("3xLP", "Vinyl"), + ('12"', "Vinyl"), + ('10"', "Vinyl"), + # CD family + ("CD", "CD"), + ("2xCD", "CD"), + ("3xCD", "CD"), + ("CD-R", "CD"), + ("CDr", "CD"), + # Cassette + ("Cassette", "Cassette"), + # 7" singles + ('7"', '7"'), + # Digital + ("File", "Digital"), + ("FLAC", "Digital"), + ("MP3", "Digital"), + ("WAV", "Digital"), + # Unknown / empty + (None, None), + ("", None), + ("Box Set", None), + ("Laserdisc", None), + ], + ids=[ + "vinyl", + "lp", + "2xlp", + "3xlp", + "12_inch", + "10_inch", + "cd", + "2xcd", + "3xcd", + "cd-r", + "cdr", + "cassette", + "7_inch", + "file", + "flac", + "mp3", + "wav", + "none", + "empty", + "box_set", + "laserdisc", + ], + ) + def test_normalize_format(self, raw, expected): + assert normalize_format(raw) == expected + + def test_multi_format_takes_first(self): + """Multi-format strings separated by comma use the first format.""" + assert normalize_format("CD, DVD") == "CD" + assert normalize_format("LP, CD") == "Vinyl" + + def test_case_insensitive(self): + """Format matching is case-insensitive.""" + assert normalize_format("cd") == "CD" + assert normalize_format("vinyl") == "Vinyl" + assert normalize_format("lp") == "Vinyl" + assert normalize_format("cassette") == "Cassette" + + def test_whitespace_stripped(self): + """Leading/trailing whitespace is stripped.""" + assert normalize_format(" CD ") == "CD" + assert normalize_format(" LP ") == "Vinyl" + + +# --------------------------------------------------------------------------- +# normalize_library_format (WXYC library format strings) +# --------------------------------------------------------------------------- + + +class TestNormalizeLibraryFormat: + """Map WXYC library format strings to the same categories.""" + + @pytest.mark.parametrize( + "raw, expected", + [ + ("LP", "Vinyl"), + ("CD", "CD"), + ("Cassette", "Cassette"), + ('7"', '7"'), + ("Vinyl", "Vinyl"), + (None, None), + ("", None), + ], + ids=["lp", "cd", "cassette", "7_inch", "vinyl", "none", "empty"], + ) + def test_normalize_library_format(self, raw, expected): + assert normalize_library_format(raw) == expected + + def test_case_insensitive(self): + assert normalize_library_format("lp") == "Vinyl" + assert normalize_library_format("cd") == "CD" + + +# --------------------------------------------------------------------------- +# format_matches +# --------------------------------------------------------------------------- + + +class TestFormatMatches: + """Test format compatibility between release and library formats.""" + + def test_matching_format(self): + assert format_matches("CD", {"CD", "Vinyl"}) is True + + def test_non_matching_format(self): + assert format_matches("Cassette", {"CD", "Vinyl"}) is False + + def test_none_release_format_matches_anything(self): + """NULL release format matches any library format set.""" + assert format_matches(None, {"CD", "Vinyl"}) is True + + def test_none_in_library_formats_matches_anything(self): + """NULL in library formats means match anything.""" + assert format_matches("CD", {None}) is True + + def test_empty_library_formats_matches_anything(self): + """Empty library format set means no format data — match anything.""" + assert format_matches("CD", set()) is True + + def test_both_none(self): + assert format_matches(None, {None}) is True + + def test_none_release_empty_library(self): + assert format_matches(None, set()) is True diff --git a/tests/unit/test_import_csv.py b/tests/unit/test_import_csv.py index f081a96..15b37b0 100644 --- a/tests/unit/test_import_csv.py +++ b/tests/unit/test_import_csv.py @@ -111,6 +111,22 @@ def test_release_table_includes_country(self) -> None: assert "country" in release_config["csv_columns"] assert "country" in release_config["db_columns"] + def test_release_table_includes_format(self) -> None: + """The release table must import format for format-aware dedup.""" + release_config = next(t for t in TABLES if t["table"] == "release") + assert "format" in release_config["csv_columns"] + assert "format" in release_config["db_columns"] + + def test_release_table_transforms_format(self) -> None: + """The format field should be transformed via normalize_format.""" + release_config = next(t for t in TABLES if t["table"] == "release") + assert "format" in release_config["transforms"] + # Verify it normalizes correctly + transform = release_config["transforms"]["format"] + assert transform("2xLP") == "Vinyl" + assert transform("CD") == "CD" + assert transform(None) is None + def test_release_artist_table_includes_artist_id(self) -> None: """The release_artist table must import artist_id for alias-enhanced filtering.""" ra_config = next(t for t in TABLES if t["table"] == "release_artist") diff --git a/tests/unit/test_verify_cache.py b/tests/unit/test_verify_cache.py index 0a13672..b00586a 100644 --- a/tests/unit/test_verify_cache.py +++ b/tests/unit/test_verify_cache.py @@ -607,18 +607,18 @@ class TestLoadDiscogsReleases: @pytest.mark.asyncio async def test_returns_release_tuples(self): - """Returns list of (release_id, artist_name, title) tuples.""" + """Returns list of (release_id, artist_name, title, format) tuples.""" mock_conn = MagicMock() mock_conn.fetch = AsyncMock( return_value=[ - {"id": 28138, "title": "Confield", "artist_name": "Autechre"}, - {"id": 12345, "title": "OK Computer", "artist_name": "Radiohead"}, + {"id": 28138, "title": "Confield", "artist_name": "Autechre", "format": "CD"}, + {"id": 12345, "title": "OK Computer", "artist_name": "Radiohead", "format": None}, ] ) releases = await load_discogs_releases(mock_conn) assert len(releases) == 2 - assert releases[0] == (28138, "Autechre", "Confield") - assert releases[1] == (12345, "Radiohead", "OK Computer") + assert releases[0] == (28138, "Autechre", "Confield", "CD") + assert releases[1] == (12345, "Radiohead", "OK Computer", None) @pytest.mark.asyncio async def test_query_filters_extra_artists(self): @@ -977,3 +977,151 @@ def test_report_with_review_artists( captured = capsys.readouterr() assert "REVIEW" in captured.out assert "artist-level decisions needed" in captured.out + + +# --------------------------------------------------------------------------- +# Format-Aware LibraryIndex +# --------------------------------------------------------------------------- + +normalize_library_format = _vc.normalize_library_format +format_matches = _vc.format_matches + + +class TestLibraryIndexFormat: + """Test format-aware LibraryIndex construction and matching.""" + + def test_from_rows_3_tuples_builds_format_by_pair(self): + """LibraryIndex built from 3-tuples has format_by_pair.""" + rows = [ + ("Radiohead", "OK Computer", "CD"), + ("Radiohead", "OK Computer", "LP"), + ("Joy Division", "Unknown Pleasures", None), + ] + idx = LibraryIndex.from_rows(rows) + norm_radio = normalize_artist("Radiohead") + norm_ok = normalize_title("OK Computer") + assert (norm_radio, norm_ok) in idx.format_by_pair + assert idx.format_by_pair[(norm_radio, norm_ok)] == {"CD", "Vinyl"} + + def test_from_rows_multiple_formats(self): + """Library with both CD and LP for same album: both in format set.""" + rows = [ + ("Cat Power", "Moon Pix", "CD"), + ("Cat Power", "Moon Pix", "LP"), + ] + idx = LibraryIndex.from_rows(rows) + norm_artist = normalize_artist("Cat Power") + norm_title = normalize_title("Moon Pix") + formats = idx.format_by_pair.get((norm_artist, norm_title), set()) + assert "CD" in formats + assert "Vinyl" in formats + + def test_from_rows_null_format(self): + """NULL format produces None in format set.""" + rows = [("Joy Division", "Closer", None)] + idx = LibraryIndex.from_rows(rows) + norm_artist = normalize_artist("Joy Division") + norm_title = normalize_title("Closer") + formats = idx.format_by_pair.get((norm_artist, norm_title), set()) + assert None in formats + + def test_from_rows_2_tuples_backward_compatible(self): + """2-tuple rows produce empty format_by_pair (backward-compatible).""" + rows = [("Radiohead", "OK Computer"), ("Joy Division", "Unknown Pleasures")] + idx = LibraryIndex.from_rows(rows) + assert idx.format_by_pair == {} + + def test_from_sqlite_with_format(self, tmp_path): + """from_sqlite reads format column when present.""" + import sqlite3 + + db_path = tmp_path / "library.db" + conn = sqlite3.connect(str(db_path)) + cur = conn.cursor() + cur.execute( + "CREATE TABLE library (id INTEGER PRIMARY KEY, artist TEXT, title TEXT, format TEXT)" + ) + cur.execute( + "INSERT INTO library (artist, title, format) VALUES ('Radiohead', 'OK Computer', 'CD')" + ) + cur.execute( + "INSERT INTO library (artist, title, format) VALUES ('Radiohead', 'OK Computer', 'LP')" + ) + conn.commit() + conn.close() + + idx = LibraryIndex.from_sqlite(db_path) + norm_artist = normalize_artist("Radiohead") + norm_title = normalize_title("OK Computer") + assert (norm_artist, norm_title) in idx.format_by_pair + assert idx.format_by_pair[(norm_artist, norm_title)] == {"CD", "Vinyl"} + + def test_from_sqlite_without_format(self, tmp_path): + """from_sqlite falls back gracefully when format column is missing.""" + import sqlite3 + + db_path = tmp_path / "library.db" + conn = sqlite3.connect(str(db_path)) + cur = conn.cursor() + cur.execute("CREATE TABLE library (id INTEGER PRIMARY KEY, artist TEXT, title TEXT)") + cur.execute("INSERT INTO library (artist, title) VALUES ('Radiohead', 'OK Computer')") + conn.commit() + conn.close() + + idx = LibraryIndex.from_sqlite(db_path) + assert idx.format_by_pair == {} + # Should still have the pair in exact_pairs + norm_artist = normalize_artist("Radiohead") + norm_title = normalize_title("OK Computer") + assert (norm_artist, norm_title) in idx.exact_pairs + + +class TestFormatFilterClassification: + """Test format-based filtering in classify_all_releases.""" + + def test_format_filter_keeps_matching_format(self): + """KEEP release with matching format stays KEEP.""" + rows = [("Radiohead", "OK Computer", "CD")] + idx = LibraryIndex.from_rows(rows) + matcher = MultiIndexMatcher(idx) + releases = [(1, "Radiohead", "OK Computer", "CD")] + report = classify_all_releases(releases, idx, matcher) + assert 1 in report.keep_ids + + def test_format_filter_prunes_mismatching_format(self): + """KEEP release with mismatching format is downgraded to PRUNE.""" + rows = [("Radiohead", "OK Computer", "CD")] + idx = LibraryIndex.from_rows(rows) + matcher = MultiIndexMatcher(idx) + # Release is Cassette, but library only has CD + releases = [(1, "Radiohead", "OK Computer", "Cassette")] + report = classify_all_releases(releases, idx, matcher) + assert 1 in report.prune_ids + + def test_format_filter_keeps_null_release_format(self): + """KEEP release with NULL format stays KEEP (graceful degradation).""" + rows = [("Radiohead", "OK Computer", "CD")] + idx = LibraryIndex.from_rows(rows) + matcher = MultiIndexMatcher(idx) + releases = [(1, "Radiohead", "OK Computer", None)] + report = classify_all_releases(releases, idx, matcher) + assert 1 in report.keep_ids + + def test_format_filter_keeps_null_library_format(self): + """KEEP release when library has no format data stays KEEP.""" + # 2-tuple rows: no format data + rows = [("Radiohead", "OK Computer")] + idx = LibraryIndex.from_rows(rows) + matcher = MultiIndexMatcher(idx) + releases = [(1, "Radiohead", "OK Computer", "CD")] + report = classify_all_releases(releases, idx, matcher) + assert 1 in report.keep_ids + + def test_format_filter_null_release_format_with_library_formats(self): + """Library has format data but release format is NULL: stays KEEP (graceful degradation).""" + rows = [("Radiohead", "OK Computer", "CD"), ("Radiohead", "OK Computer", "LP")] + idx = LibraryIndex.from_rows(rows) + matcher = MultiIndexMatcher(idx) + releases = [(1, "Radiohead", "OK Computer", None)] + report = classify_all_releases(releases, idx, matcher) + assert 1 in report.keep_ids