Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ artist_mappings.json
!tests/fixtures/**/*.db
!tests/fixtures/**/*.csv
worklog/
.DS_Store
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ docker compose up db -d # just the database (for tests)
- `scripts/filter_csv.py` -- Filter Discogs CSVs to library artists (standalone, used outside the pipeline)
- `scripts/import_csv.py` -- Import CSVs into PostgreSQL (psycopg COPY). Child tables are imported in parallel via ThreadPoolExecutor after parent tables. Artist detail tables (artist_alias, artist_member) are filtered to known artist IDs to prevent FK violations, since the converter's CSVs contain all Discogs artists. Tables with `unique_key` configs are deduped in-memory during COPY.
- `scripts/dedup_releases.py` -- Deduplicate releases by master_id, preferring label match + sublabel resolution, US releases (copy-swap with `DROP CASCADE`). Index/constraint creation is parallelized via ThreadPoolExecutor.
- `scripts/verify_cache.py` -- Multi-index fuzzy matching for KEEP/PRUNE classification; `--copy-to` streams matches to a target DB. Fuzzy matching is parallelized via ThreadPoolExecutor (rapidfuzz releases the GIL). Large prune sets (>10K IDs) use copy-and-swap instead of CASCADE DELETE.
- `scripts/verify_cache.py` -- Multi-index fuzzy matching for KEEP/PRUNE classification; `--copy-to` streams matches to a target DB. Fuzzy matching is parallelized via ProcessPoolExecutor with fork context for true multi-core parallelism (the Python loop overhead between rapidfuzz calls holds the GIL, so threads serialize on a single core). Large prune sets (>10K IDs) use copy-and-swap instead of CASCADE DELETE.
- `scripts/csv_to_tsv.py` -- CSV to TSV conversion utility
- `scripts/fix_csv_newlines.py` -- Fix multiline CSV fields
- `lib/format_normalization.py` -- Normalize raw Discogs/library format strings to broad categories (Vinyl, CD, Cassette, 7", Digital)
Expand Down
117 changes: 86 additions & 31 deletions scripts/verify_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@
import enum
import json
import logging
import multiprocessing
import os
import re
import sqlite3
import sys
import time
import unicodedata
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path

Expand Down Expand Up @@ -1351,6 +1352,29 @@ def classify_fuzzy_batch(
return keep_ids, prune_ids, review_ids, review_by_artist


# ---------------------------------------------------------------------------
# Process pool worker functions for Phase 4 parallelization
# ---------------------------------------------------------------------------

_pool_index: LibraryIndex | None = None
_pool_matcher: MultiIndexMatcher | None = None


def _init_fuzzy_worker(index: LibraryIndex, matcher: MultiIndexMatcher) -> None:
"""Initializer for ProcessPoolExecutor workers. Stores shared read-only state."""
global _pool_index, _pool_matcher
_pool_index = index
_pool_matcher = matcher


def _classify_fuzzy_chunk(
chunk_args: tuple[list[str], dict[str, list[tuple[int, str, str]]]],
) -> tuple[set[int], set[int], set[int], dict[str, list[tuple[int, str, MatchResult]]]]:
"""Worker function for ProcessPoolExecutor. Reads index/matcher from module globals."""
artists, chunk_by_artist = chunk_args
return classify_fuzzy_batch(artists, chunk_by_artist, _pool_index, _pool_matcher)


def classify_all_releases(
releases: list[tuple[int, str, str]] | list[tuple[int, str, str, str | None]],
index: LibraryIndex,
Expand Down Expand Up @@ -1472,6 +1496,7 @@ def classify_all_releases(
# Phase 3: Token-overlap pre-screen for fuzzy candidates.
# Build a set of all tokens from library artist names. Discard short tokens
# (1-2 chars) that cause false positive overlaps ("dj", "mc", "j", etc.)
phase3_start = time.monotonic()
min_token_len = 3
library_tokens: set[str] = set()
for artist in index.all_artists:
Expand All @@ -1495,8 +1520,10 @@ def classify_all_releases(
if token_pruned
else 0
)
phase3_elapsed = time.monotonic() - phase3_start
logger.info(
f"Phase 3 pre-screen: {token_pruned:,} artists pruned by token overlap "
f"Phase 3 pre-screen in {phase3_elapsed:.1f}s: "
f"{token_pruned:,} artists pruned by token overlap "
f"({token_pruned_releases:,} releases), "
f"{len(truly_fuzzy):,} artists remain for fuzzy matching"
)
Expand All @@ -1506,47 +1533,75 @@ def classify_all_releases(
# releases by title only against the matched library artist's albums.
# This avoids the O(releases * all_library_pairs) cost of full scoring.
#
# Parallelized via ThreadPoolExecutor. Rapidfuzz's C extension releases
# the GIL during extractOne, so threads achieve real parallelism.
# Parallelized via ProcessPoolExecutor with fork context. The Python loop
# overhead between rapidfuzz extractOne calls holds the GIL, so threads
# serialize on a single core. Separate processes give true multi-core
# parallelism. Fork context avoids the cost of re-importing the module
# in each worker; the pipeline is single-threaded before this point so
# fork is safe.
logger.info(
"Phase 4: Fuzzy artist matching for %s artists...",
f"{len(truly_fuzzy):,}",
)
phase4_start = time.monotonic()

num_workers = min(os.cpu_count() or 4, 8)
chunk_size = max(1, len(truly_fuzzy) // (num_workers * 4))
# Target ~200 artists per chunk for frequent progress updates,
# but ensure at least num_workers * 2 chunks for load balancing.
min_chunks = num_workers * 2
target_chunk_size = 200
chunk_size = max(1, min(target_chunk_size, len(truly_fuzzy) // min_chunks))
chunks = [truly_fuzzy[i : i + chunk_size] for i in range(0, len(truly_fuzzy), chunk_size)]

logger.info(f" Using {num_workers} workers, {len(chunks)} chunks of ~{chunk_size} artists")

completed_chunks = 0
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = {
executor.submit(classify_fuzzy_batch, chunk, by_artist, index, matcher): chunk
for chunk in chunks
}
for future in as_completed(futures):
batch_keep, batch_prune, batch_review, batch_review_by = future.result()
keep_ids |= batch_keep
prune_ids |= batch_prune
review_ids |= batch_review
for k, v in batch_review_by.items():
review_by_artist.setdefault(k, []).extend(v)

completed_chunks += 1
chunk = futures[future]
releases_processed += sum(len(by_artist[a]) for a in chunk)
artists_fuzzy_matched += len(chunk)

elapsed = time.monotonic() - phase4_start
logger.info(
f" Chunk {completed_chunks}/{len(chunks)} done "
f"({releases_processed:,}/{total_releases:,} releases) "
f"| {elapsed:.1f}s elapsed "
f"| KEEP={len(keep_ids):,} PRUNE={len(prune_ids):,} "
f"REVIEW={len(review_ids):,}"
)
global _pool_index, _pool_matcher
_pool_index = index
_pool_matcher = matcher
try:
ctx = multiprocessing.get_context("fork")
with ProcessPoolExecutor(
max_workers=num_workers,
mp_context=ctx,
initializer=_init_fuzzy_worker,
initargs=(index, matcher),
) as executor:
futures = {}
for chunk in chunks:
chunk_by_artist = {a: by_artist[a] for a in chunk}
future = executor.submit(_classify_fuzzy_chunk, (chunk, chunk_by_artist))
futures[future] = chunk

for future in as_completed(futures):
batch_keep, batch_prune, batch_review, batch_review_by = future.result()
keep_ids |= batch_keep
prune_ids |= batch_prune
review_ids |= batch_review
for k, v in batch_review_by.items():
review_by_artist.setdefault(k, []).extend(v)

completed_chunks += 1
chunk = futures[future]
chunk_releases = sum(len(by_artist[a]) for a in chunk)
releases_processed += chunk_releases
artists_fuzzy_matched += len(chunk)

elapsed = time.monotonic() - phase4_start
rate = artists_fuzzy_matched / elapsed if elapsed > 0 else 0
remaining = len(truly_fuzzy) - artists_fuzzy_matched
eta_str = f", ETA {remaining / rate:.0f}s" if rate > 0 else ""
logger.info(
f" Chunk {completed_chunks}/{len(chunks)} done "
f"({releases_processed:,}/{total_releases:,} releases, "
f"{rate:.0f} artists/s{eta_str}) "
f"| {elapsed:.1f}s elapsed "
f"| KEEP={len(keep_ids):,} PRUNE={len(prune_ids):,} "
f"REVIEW={len(review_ids):,}"
)
finally:
_pool_index = None
_pool_matcher = None

elapsed = time.monotonic() - start_time
logger.info(
Expand Down
114 changes: 113 additions & 1 deletion tests/unit/test_verify_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import importlib.util
import json
import multiprocessing
import sys
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch

Expand Down Expand Up @@ -651,13 +653,15 @@ async def test_empty_results(self):
classify_all_releases = _vc.classify_all_releases
classify_artist_fuzzy = _vc.classify_artist_fuzzy
classify_fuzzy_batch = _vc.classify_fuzzy_batch
_init_fuzzy_worker = _vc._init_fuzzy_worker
_classify_fuzzy_chunk = _vc._classify_fuzzy_chunk
prune_releases_copy_swap = _vc.prune_releases_copy_swap

parse_args = _vc.parse_args


# ---------------------------------------------------------------------------
# Step 8.5: Parallel Fuzzy Matching
# Step 8.5: Process Pool Fuzzy Matching
# ---------------------------------------------------------------------------


Expand Down Expand Up @@ -814,6 +818,114 @@ def test_parallel_matches_serial(self, sample_index):
assert {4, 5} <= report.prune_ids


class TestProcessPoolFuzzyClassification:
"""Verify fuzzy classification works correctly via ProcessPoolExecutor."""

def test_worker_produces_same_results_as_direct_call(self, sample_index):
"""ProcessPoolExecutor worker gives same results as direct classify_fuzzy_batch."""
matcher = MultiIndexMatcher(sample_index)
by_artist = {
"radioheed": [(999, "Radioheed", "OK Computer")],
"zzyzx unknownband": [(888, "Zzyzx Unknownband", "Nonexistent Album")],
}
artists = list(by_artist.keys())

direct_result = classify_fuzzy_batch(artists, by_artist, sample_index, matcher)

ctx = multiprocessing.get_context("fork")
with ProcessPoolExecutor(
max_workers=1,
mp_context=ctx,
initializer=_init_fuzzy_worker,
initargs=(sample_index, matcher),
) as executor:
future = executor.submit(_classify_fuzzy_chunk, (artists, by_artist))
pool_result = future.result()

assert pool_result[0] == direct_result[0] # keep_ids
assert pool_result[1] == direct_result[1] # prune_ids
assert pool_result[2] == direct_result[2] # review_ids

def test_multiple_chunks_aggregate_correctly(self, sample_index):
"""Results from multiple process pool chunks aggregate to match a single batch."""
matcher = MultiIndexMatcher(sample_index)
by_artist = {
"radioheed": [(101, "Radioheed", "OK Computer")],
"joye division": [(102, "Joye Division", "Unknown Pleasures")],
"zzyzx unknownband": [(103, "Zzyzx Unknownband", "Fake Album")],
"aphex twins": [(104, "Aphex Twins", "Selected Ambient Works 85-92")],
}
all_artists = list(by_artist.keys())

single_result = classify_fuzzy_batch(all_artists, by_artist, sample_index, matcher)

chunk1 = all_artists[:2]
chunk2 = all_artists[2:]
chunk1_by = {a: by_artist[a] for a in chunk1}
chunk2_by = {a: by_artist[a] for a in chunk2}

ctx = multiprocessing.get_context("fork")
with ProcessPoolExecutor(
max_workers=2,
mp_context=ctx,
initializer=_init_fuzzy_worker,
initargs=(sample_index, matcher),
) as executor:
f1 = executor.submit(_classify_fuzzy_chunk, (chunk1, chunk1_by))
f2 = executor.submit(_classify_fuzzy_chunk, (chunk2, chunk2_by))
r1 = f1.result()
r2 = f2.result()

agg_keep = r1[0] | r2[0]
agg_prune = r1[1] | r2[1]
agg_review = r1[2] | r2[2]

assert agg_keep == single_result[0]
assert agg_prune == single_result[1]
assert agg_review == single_result[2]

def test_worker_init_sets_module_globals(self):
"""_init_fuzzy_worker stores index and matcher in module globals."""
rows = [("Autechre", "Confield")]
index = LibraryIndex.from_rows(rows)
matcher = MultiIndexMatcher(index)

_init_fuzzy_worker(index, matcher)

assert _vc._pool_index is index
assert _vc._pool_matcher is matcher

# Clean up
_vc._pool_index = None
_vc._pool_matcher = None


class TestPhase4Logging:
"""Verify Phase 4 logs throughput and ETA."""

def test_phase4_logs_throughput_and_eta(self, sample_index, caplog):
"""classify_all_releases Phase 4 logs include throughput and ETA."""
releases = [
(1, "Radiohead", "OK Computer"),
(2, "Joy Division", "Unknown Pleasures"),
# Include artists that require fuzzy matching (not exact matches)
(4, "Radioheed", "OK Computer"),
(5, "Joye Division", "Unknown Pleasures"),
]
matcher = MultiIndexMatcher(sample_index)

import logging

with caplog.at_level(logging.INFO, logger="verify_cache"):
classify_all_releases(releases, sample_index, matcher)

# Check for throughput info in chunk progress logs
chunk_logs = [r.message for r in caplog.records if "Chunk " in r.message]
if chunk_logs:
# At least one chunk log should have artists/sec throughput
assert any("artists/s" in msg for msg in chunk_logs)


class TestParseArgsCopyTo:
"""Test --copy-to argument parsing and mutual exclusivity with --prune."""

Expand Down
Loading