From d5578b16e2456d4b4cc83ef66379b811de81b4f3 Mon Sep 17 00:00:00 2001 From: Jake Bromberg Date: Wed, 11 Mar 2026 13:45:38 -0700 Subject: [PATCH 1/3] Adding .DS_Store to .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index f10151d..7bf7941 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ artist_mappings.json !tests/fixtures/**/*.db !tests/fixtures/**/*.csv worklog/ +.DS_Store From 510c28bb2cf2ac1474ceedd6ff092af7fc69e234 Mon Sep 17 00:00:00 2001 From: Jake Bromberg Date: Wed, 11 Mar 2026 15:36:13 -0700 Subject: [PATCH 2/3] perf: use ProcessPoolExecutor for Phase 4 fuzzy matching to utilize all CPU cores ThreadPoolExecutor was serializing on a single core because the Python loop overhead between rapidfuzz extractOne calls holds the GIL. ProcessPoolExecutor with fork context gives true multi-core parallelism. Also improves logging: Phase 3 now reports elapsed time, Phase 4 chunk logs include throughput (artists/s) and ETA, and chunks are smaller (~200 artists) for more frequent progress updates. --- CLAUDE.md | 2 +- scripts/verify_cache.py | 117 +++++++++++++++++++++++--------- tests/unit/test_verify_cache.py | 114 ++++++++++++++++++++++++++++++- 3 files changed, 200 insertions(+), 33 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 861709d..86d8da9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -70,7 +70,7 @@ docker compose up db -d # just the database (for tests) - `scripts/filter_csv.py` -- Filter Discogs CSVs to library artists (standalone, used outside the pipeline) - `scripts/import_csv.py` -- Import CSVs into PostgreSQL (psycopg COPY). Child tables are imported in parallel via ThreadPoolExecutor after parent tables. Artist detail tables (artist_alias, artist_member) are filtered to known artist IDs to prevent FK violations, since the converter's CSVs contain all Discogs artists. Tables with `unique_key` configs are deduped in-memory during COPY. - `scripts/dedup_releases.py` -- Deduplicate releases by master_id, preferring label match + sublabel resolution, US releases (copy-swap with `DROP CASCADE`). Index/constraint creation is parallelized via ThreadPoolExecutor. -- `scripts/verify_cache.py` -- Multi-index fuzzy matching for KEEP/PRUNE classification; `--copy-to` streams matches to a target DB. Fuzzy matching is parallelized via ThreadPoolExecutor (rapidfuzz releases the GIL). Large prune sets (>10K IDs) use copy-and-swap instead of CASCADE DELETE. +- `scripts/verify_cache.py` -- Multi-index fuzzy matching for KEEP/PRUNE classification; `--copy-to` streams matches to a target DB. Fuzzy matching is parallelized via ProcessPoolExecutor with fork context for true multi-core parallelism (the Python loop overhead between rapidfuzz calls holds the GIL, so threads serialize on a single core). Large prune sets (>10K IDs) use copy-and-swap instead of CASCADE DELETE. - `scripts/csv_to_tsv.py` -- CSV to TSV conversion utility - `scripts/fix_csv_newlines.py` -- Fix multiline CSV fields - `lib/format_normalization.py` -- Normalize raw Discogs/library format strings to broad categories (Vinyl, CD, Cassette, 7", Digital) diff --git a/scripts/verify_cache.py b/scripts/verify_cache.py index fcf391f..6d4f084 100644 --- a/scripts/verify_cache.py +++ b/scripts/verify_cache.py @@ -41,7 +41,8 @@ import sys import time import unicodedata -from concurrent.futures import ThreadPoolExecutor, as_completed +import multiprocessing +from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import dataclass from pathlib import Path @@ -1351,6 +1352,29 @@ def classify_fuzzy_batch( return keep_ids, prune_ids, review_ids, review_by_artist +# --------------------------------------------------------------------------- +# Process pool worker functions for Phase 4 parallelization +# --------------------------------------------------------------------------- + +_pool_index: LibraryIndex | None = None +_pool_matcher: MultiIndexMatcher | None = None + + +def _init_fuzzy_worker(index: LibraryIndex, matcher: MultiIndexMatcher) -> None: + """Initializer for ProcessPoolExecutor workers. Stores shared read-only state.""" + global _pool_index, _pool_matcher + _pool_index = index + _pool_matcher = matcher + + +def _classify_fuzzy_chunk( + chunk_args: tuple[list[str], dict[str, list[tuple[int, str, str]]]], +) -> tuple[set[int], set[int], set[int], dict[str, list[tuple[int, str, MatchResult]]]]: + """Worker function for ProcessPoolExecutor. Reads index/matcher from module globals.""" + artists, chunk_by_artist = chunk_args + return classify_fuzzy_batch(artists, chunk_by_artist, _pool_index, _pool_matcher) + + def classify_all_releases( releases: list[tuple[int, str, str]] | list[tuple[int, str, str, str | None]], index: LibraryIndex, @@ -1472,6 +1496,7 @@ def classify_all_releases( # Phase 3: Token-overlap pre-screen for fuzzy candidates. # Build a set of all tokens from library artist names. Discard short tokens # (1-2 chars) that cause false positive overlaps ("dj", "mc", "j", etc.) + phase3_start = time.monotonic() min_token_len = 3 library_tokens: set[str] = set() for artist in index.all_artists: @@ -1495,8 +1520,10 @@ def classify_all_releases( if token_pruned else 0 ) + phase3_elapsed = time.monotonic() - phase3_start logger.info( - f"Phase 3 pre-screen: {token_pruned:,} artists pruned by token overlap " + f"Phase 3 pre-screen in {phase3_elapsed:.1f}s: " + f"{token_pruned:,} artists pruned by token overlap " f"({token_pruned_releases:,} releases), " f"{len(truly_fuzzy):,} artists remain for fuzzy matching" ) @@ -1506,8 +1533,12 @@ def classify_all_releases( # releases by title only against the matched library artist's albums. # This avoids the O(releases * all_library_pairs) cost of full scoring. # - # Parallelized via ThreadPoolExecutor. Rapidfuzz's C extension releases - # the GIL during extractOne, so threads achieve real parallelism. + # Parallelized via ProcessPoolExecutor with fork context. The Python loop + # overhead between rapidfuzz extractOne calls holds the GIL, so threads + # serialize on a single core. Separate processes give true multi-core + # parallelism. Fork context avoids the cost of re-importing the module + # in each worker; the pipeline is single-threaded before this point so + # fork is safe. logger.info( "Phase 4: Fuzzy artist matching for %s artists...", f"{len(truly_fuzzy):,}", @@ -1515,38 +1546,62 @@ def classify_all_releases( phase4_start = time.monotonic() num_workers = min(os.cpu_count() or 4, 8) - chunk_size = max(1, len(truly_fuzzy) // (num_workers * 4)) + # Target ~200 artists per chunk for frequent progress updates, + # but ensure at least num_workers * 2 chunks for load balancing. + min_chunks = num_workers * 2 + target_chunk_size = 200 + chunk_size = max(1, min(target_chunk_size, len(truly_fuzzy) // min_chunks)) chunks = [truly_fuzzy[i : i + chunk_size] for i in range(0, len(truly_fuzzy), chunk_size)] logger.info(f" Using {num_workers} workers, {len(chunks)} chunks of ~{chunk_size} artists") completed_chunks = 0 - with ThreadPoolExecutor(max_workers=num_workers) as executor: - futures = { - executor.submit(classify_fuzzy_batch, chunk, by_artist, index, matcher): chunk - for chunk in chunks - } - for future in as_completed(futures): - batch_keep, batch_prune, batch_review, batch_review_by = future.result() - keep_ids |= batch_keep - prune_ids |= batch_prune - review_ids |= batch_review - for k, v in batch_review_by.items(): - review_by_artist.setdefault(k, []).extend(v) - - completed_chunks += 1 - chunk = futures[future] - releases_processed += sum(len(by_artist[a]) for a in chunk) - artists_fuzzy_matched += len(chunk) - - elapsed = time.monotonic() - phase4_start - logger.info( - f" Chunk {completed_chunks}/{len(chunks)} done " - f"({releases_processed:,}/{total_releases:,} releases) " - f"| {elapsed:.1f}s elapsed " - f"| KEEP={len(keep_ids):,} PRUNE={len(prune_ids):,} " - f"REVIEW={len(review_ids):,}" - ) + global _pool_index, _pool_matcher + _pool_index = index + _pool_matcher = matcher + try: + ctx = multiprocessing.get_context("fork") + with ProcessPoolExecutor( + max_workers=num_workers, + mp_context=ctx, + initializer=_init_fuzzy_worker, + initargs=(index, matcher), + ) as executor: + futures = {} + for chunk in chunks: + chunk_by_artist = {a: by_artist[a] for a in chunk} + future = executor.submit(_classify_fuzzy_chunk, (chunk, chunk_by_artist)) + futures[future] = chunk + + for future in as_completed(futures): + batch_keep, batch_prune, batch_review, batch_review_by = future.result() + keep_ids |= batch_keep + prune_ids |= batch_prune + review_ids |= batch_review + for k, v in batch_review_by.items(): + review_by_artist.setdefault(k, []).extend(v) + + completed_chunks += 1 + chunk = futures[future] + chunk_releases = sum(len(by_artist[a]) for a in chunk) + releases_processed += chunk_releases + artists_fuzzy_matched += len(chunk) + + elapsed = time.monotonic() - phase4_start + rate = artists_fuzzy_matched / elapsed if elapsed > 0 else 0 + remaining = len(truly_fuzzy) - artists_fuzzy_matched + eta_str = f", ETA {remaining / rate:.0f}s" if rate > 0 else "" + logger.info( + f" Chunk {completed_chunks}/{len(chunks)} done " + f"({releases_processed:,}/{total_releases:,} releases, " + f"{rate:.0f} artists/s{eta_str}) " + f"| {elapsed:.1f}s elapsed " + f"| KEEP={len(keep_ids):,} PRUNE={len(prune_ids):,} " + f"REVIEW={len(review_ids):,}" + ) + finally: + _pool_index = None + _pool_matcher = None elapsed = time.monotonic() - start_time logger.info( diff --git a/tests/unit/test_verify_cache.py b/tests/unit/test_verify_cache.py index b00586a..4fcf43e 100644 --- a/tests/unit/test_verify_cache.py +++ b/tests/unit/test_verify_cache.py @@ -2,7 +2,9 @@ import importlib.util import json +import multiprocessing import sys +from concurrent.futures import ProcessPoolExecutor from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch @@ -651,13 +653,15 @@ async def test_empty_results(self): classify_all_releases = _vc.classify_all_releases classify_artist_fuzzy = _vc.classify_artist_fuzzy classify_fuzzy_batch = _vc.classify_fuzzy_batch +_init_fuzzy_worker = _vc._init_fuzzy_worker +_classify_fuzzy_chunk = _vc._classify_fuzzy_chunk prune_releases_copy_swap = _vc.prune_releases_copy_swap parse_args = _vc.parse_args # --------------------------------------------------------------------------- -# Step 8.5: Parallel Fuzzy Matching +# Step 8.5: Process Pool Fuzzy Matching # --------------------------------------------------------------------------- @@ -814,6 +818,114 @@ def test_parallel_matches_serial(self, sample_index): assert {4, 5} <= report.prune_ids +class TestProcessPoolFuzzyClassification: + """Verify fuzzy classification works correctly via ProcessPoolExecutor.""" + + def test_worker_produces_same_results_as_direct_call(self, sample_index): + """ProcessPoolExecutor worker gives same results as direct classify_fuzzy_batch.""" + matcher = MultiIndexMatcher(sample_index) + by_artist = { + "radioheed": [(999, "Radioheed", "OK Computer")], + "zzyzx unknownband": [(888, "Zzyzx Unknownband", "Nonexistent Album")], + } + artists = list(by_artist.keys()) + + direct_result = classify_fuzzy_batch(artists, by_artist, sample_index, matcher) + + ctx = multiprocessing.get_context("fork") + with ProcessPoolExecutor( + max_workers=1, + mp_context=ctx, + initializer=_init_fuzzy_worker, + initargs=(sample_index, matcher), + ) as executor: + future = executor.submit(_classify_fuzzy_chunk, (artists, by_artist)) + pool_result = future.result() + + assert pool_result[0] == direct_result[0] # keep_ids + assert pool_result[1] == direct_result[1] # prune_ids + assert pool_result[2] == direct_result[2] # review_ids + + def test_multiple_chunks_aggregate_correctly(self, sample_index): + """Results from multiple process pool chunks aggregate to match a single batch.""" + matcher = MultiIndexMatcher(sample_index) + by_artist = { + "radioheed": [(101, "Radioheed", "OK Computer")], + "joye division": [(102, "Joye Division", "Unknown Pleasures")], + "zzyzx unknownband": [(103, "Zzyzx Unknownband", "Fake Album")], + "aphex twins": [(104, "Aphex Twins", "Selected Ambient Works 85-92")], + } + all_artists = list(by_artist.keys()) + + single_result = classify_fuzzy_batch(all_artists, by_artist, sample_index, matcher) + + chunk1 = all_artists[:2] + chunk2 = all_artists[2:] + chunk1_by = {a: by_artist[a] for a in chunk1} + chunk2_by = {a: by_artist[a] for a in chunk2} + + ctx = multiprocessing.get_context("fork") + with ProcessPoolExecutor( + max_workers=2, + mp_context=ctx, + initializer=_init_fuzzy_worker, + initargs=(sample_index, matcher), + ) as executor: + f1 = executor.submit(_classify_fuzzy_chunk, (chunk1, chunk1_by)) + f2 = executor.submit(_classify_fuzzy_chunk, (chunk2, chunk2_by)) + r1 = f1.result() + r2 = f2.result() + + agg_keep = r1[0] | r2[0] + agg_prune = r1[1] | r2[1] + agg_review = r1[2] | r2[2] + + assert agg_keep == single_result[0] + assert agg_prune == single_result[1] + assert agg_review == single_result[2] + + def test_worker_init_sets_module_globals(self): + """_init_fuzzy_worker stores index and matcher in module globals.""" + rows = [("Autechre", "Confield")] + index = LibraryIndex.from_rows(rows) + matcher = MultiIndexMatcher(index) + + _init_fuzzy_worker(index, matcher) + + assert _vc._pool_index is index + assert _vc._pool_matcher is matcher + + # Clean up + _vc._pool_index = None + _vc._pool_matcher = None + + +class TestPhase4Logging: + """Verify Phase 4 logs throughput and ETA.""" + + def test_phase4_logs_throughput_and_eta(self, sample_index, caplog): + """classify_all_releases Phase 4 logs include throughput and ETA.""" + releases = [ + (1, "Radiohead", "OK Computer"), + (2, "Joy Division", "Unknown Pleasures"), + # Include artists that require fuzzy matching (not exact matches) + (4, "Radioheed", "OK Computer"), + (5, "Joye Division", "Unknown Pleasures"), + ] + matcher = MultiIndexMatcher(sample_index) + + import logging + + with caplog.at_level(logging.INFO, logger="verify_cache"): + classify_all_releases(releases, sample_index, matcher) + + # Check for throughput info in chunk progress logs + chunk_logs = [r.message for r in caplog.records if "Chunk " in r.message] + if chunk_logs: + # At least one chunk log should have artists/sec throughput + assert any("artists/s" in msg for msg in chunk_logs) + + class TestParseArgsCopyTo: """Test --copy-to argument parsing and mutual exclusivity with --prune.""" From d58d18d2502f15a31a5ef85646bf9f80ae18dce2 Mon Sep 17 00:00:00 2001 From: Jake Bromberg Date: Wed, 11 Mar 2026 15:45:33 -0700 Subject: [PATCH 3/3] fix: sort multiprocessing import alphabetically for ruff I001 --- scripts/verify_cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/verify_cache.py b/scripts/verify_cache.py index 6d4f084..5051b4d 100644 --- a/scripts/verify_cache.py +++ b/scripts/verify_cache.py @@ -35,13 +35,13 @@ import enum import json import logging +import multiprocessing import os import re import sqlite3 import sys import time import unicodedata -import multiprocessing from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import dataclass from pathlib import Path