Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 276 additions & 0 deletions tests/integration/test_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
copy_table = _dd.copy_table
swap_tables = _dd.swap_tables
add_base_constraints_and_indexes = _dd.add_base_constraints_and_indexes
add_track_constraints_and_indexes = _dd.add_track_constraints_and_indexes
add_constraints_and_indexes = _dd.add_constraints_and_indexes
load_library_labels = _dd.load_library_labels
load_label_hierarchy = _dd.load_label_hierarchy
Expand Down Expand Up @@ -695,3 +696,278 @@ def test_total_release_count_same(self) -> None:
count = cur.fetchone()[0]
conn.close()
assert count == 12


class TestEnsureDedupIdsAlreadyExists:
"""Verify ensure_dedup_ids returns existing count when table already populated."""

@pytest.fixture(autouse=True, scope="class")
def _set_up(self, db_url):
self.__class__._db_url = db_url
conn = psycopg.connect(db_url, autocommit=True)
_drop_all_tables(conn)
with conn.cursor() as cur:
cur.execute(SCHEMA_DIR.joinpath("create_database.sql").read_text())
# Insert releases with duplicate master_ids (would normally be deduped)
cur.execute("INSERT INTO release (id, title, master_id) VALUES (1, 'A', 100)")
cur.execute("INSERT INTO release (id, title, master_id) VALUES (2, 'B', 100)")
# Pre-create the dedup_delete_ids table with some IDs
cur.execute("""
CREATE UNLOGGED TABLE dedup_delete_ids (
release_id integer PRIMARY KEY
)
""")
cur.execute("INSERT INTO dedup_delete_ids (release_id) VALUES (1)")
conn.close()

@pytest.fixture(autouse=True)
def _store_url(self):
self.db_url = self.__class__._db_url

def test_returns_existing_count(self) -> None:
"""When dedup_delete_ids already exists, returns its count without recreating."""
conn = psycopg.connect(self.db_url, autocommit=True)
count = ensure_dedup_ids(conn)
conn.close()
assert count == 1

def test_table_not_recreated(self) -> None:
"""The pre-existing dedup_delete_ids table is not dropped and recreated."""
conn = psycopg.connect(self.db_url, autocommit=True)
# Add a second ID to the existing table
with conn.cursor() as cur:
cur.execute(
"INSERT INTO dedup_delete_ids (release_id) VALUES (2) ON CONFLICT DO NOTHING"
)
count = ensure_dedup_ids(conn)
conn.close()
# Should reflect the updated count (not recreate from ROW_NUMBER query)
assert count == 2


class TestAddTrackConstraintsAndIndexes:
"""Verify add_track_constraints_and_indexes creates FK constraints and indexes."""

@pytest.fixture(autouse=True, scope="class")
def _set_up(self, db_url):
self.__class__._db_url = db_url
conn = psycopg.connect(db_url, autocommit=True)
_drop_all_tables(conn)
with conn.cursor() as cur:
cur.execute(SCHEMA_DIR.joinpath("create_database.sql").read_text())
cur.execute(SCHEMA_DIR.joinpath("create_functions.sql").read_text())
# Drop existing constraints (schema creates them, we want to test adding them)
cur.execute(
"ALTER TABLE release_track DROP CONSTRAINT IF EXISTS release_track_release_id_fkey"
)
cur.execute(
"ALTER TABLE release_track_artist "
"DROP CONSTRAINT IF EXISTS release_track_artist_release_id_fkey"
)
cur.execute("DROP INDEX IF EXISTS idx_release_track_release_id")
cur.execute("DROP INDEX IF EXISTS idx_release_track_artist_release_id")
cur.execute("DROP INDEX IF EXISTS idx_release_track_title_trgm")
cur.execute("DROP INDEX IF EXISTS idx_release_track_artist_name_trgm")
# Insert test data
cur.execute("INSERT INTO release (id, title) VALUES (1, 'Test Album')")
cur.execute(
"INSERT INTO release_track (release_id, sequence, title) VALUES (1, 1, 'Track 1')"
)
cur.execute(
"INSERT INTO release_track_artist (release_id, track_sequence, artist_name) "
"VALUES (1, 1, 'Test Artist')"
)
add_track_constraints_and_indexes(conn, db_url=db_url)
conn.close()

@pytest.fixture(autouse=True)
def _store_url(self):
self.db_url = self.__class__._db_url

def _connect(self):
return psycopg.connect(self.db_url)

def test_release_track_fk_exists(self) -> None:
"""FK constraint on release_track referencing release exists."""
conn = self._connect()
with conn.cursor() as cur:
cur.execute("""
SELECT constraint_name FROM information_schema.table_constraints
WHERE table_name = 'release_track' AND constraint_type = 'FOREIGN KEY'
""")
result = cur.fetchone()
conn.close()
assert result is not None

def test_release_track_artist_fk_exists(self) -> None:
"""FK constraint on release_track_artist referencing release exists."""
conn = self._connect()
with conn.cursor() as cur:
cur.execute("""
SELECT constraint_name FROM information_schema.table_constraints
WHERE table_name = 'release_track_artist' AND constraint_type = 'FOREIGN KEY'
""")
result = cur.fetchone()
conn.close()
assert result is not None

def test_release_track_release_id_index_exists(self) -> None:
"""Index on release_track(release_id) exists."""
conn = self._connect()
with conn.cursor() as cur:
cur.execute("""
SELECT indexname FROM pg_indexes
WHERE tablename = 'release_track' AND indexname = 'idx_release_track_release_id'
""")
result = cur.fetchone()
conn.close()
assert result is not None

def test_release_track_artist_release_id_index_exists(self) -> None:
"""Index on release_track_artist(release_id) exists."""
conn = self._connect()
with conn.cursor() as cur:
cur.execute("""
SELECT indexname FROM pg_indexes
WHERE tablename = 'release_track_artist'
AND indexname = 'idx_release_track_artist_release_id'
""")
result = cur.fetchone()
conn.close()
assert result is not None

def test_release_track_title_trgm_index_exists(self) -> None:
"""GIN trigram index on release_track(title) exists."""
conn = self._connect()
with conn.cursor() as cur:
cur.execute("""
SELECT indexname FROM pg_indexes
WHERE tablename = 'release_track'
AND indexname = 'idx_release_track_title_trgm'
""")
result = cur.fetchone()
conn.close()
assert result is not None

def test_release_track_artist_name_trgm_index_exists(self) -> None:
"""GIN trigram index on release_track_artist(artist_name) exists."""
conn = self._connect()
with conn.cursor() as cur:
cur.execute("""
SELECT indexname FROM pg_indexes
WHERE tablename = 'release_track_artist'
AND indexname = 'idx_release_track_artist_name_trgm'
""")
result = cur.fetchone()
conn.close()
assert result is not None


class TestAddConstraintsAndIndexes:
"""Verify add_constraints_and_indexes creates both base and track constraints."""

@pytest.fixture(autouse=True, scope="class")
def _set_up(self, db_url):
self.__class__._db_url = db_url
conn = psycopg.connect(db_url, autocommit=True)
_drop_all_tables(conn)
with conn.cursor() as cur:
cur.execute(SCHEMA_DIR.joinpath("create_database.sql").read_text())
cur.execute(SCHEMA_DIR.joinpath("create_functions.sql").read_text())
# Drop all FK constraints and indexes (schema creates them)
for constraint, table in [
("release_artist_release_id_fkey", "release_artist"),
("release_label_release_id_fkey", "release_label"),
("release_track_release_id_fkey", "release_track"),
("release_track_artist_release_id_fkey", "release_track_artist"),
("cache_metadata_release_id_fkey", "cache_metadata"),
]:
cur.execute(f"ALTER TABLE {table} DROP CONSTRAINT IF EXISTS {constraint}")
# Drop the PK on release so add_constraints_and_indexes can recreate it
cur.execute("ALTER TABLE release DROP CONSTRAINT IF EXISTS release_pkey")
cur.execute("ALTER TABLE cache_metadata DROP CONSTRAINT IF EXISTS cache_metadata_pkey")
# Drop all indexes (FK, GIN trigram, cache metadata)
for idx in [
"idx_release_artist_release_id",
"idx_release_label_release_id",
"idx_release_track_release_id",
"idx_release_track_artist_release_id",
"idx_release_artist_name_trgm",
"idx_release_title_trgm",
"idx_release_track_title_trgm",
"idx_release_track_artist_name_trgm",
"idx_cache_metadata_cached_at",
"idx_cache_metadata_source",
"idx_release_master_id",
]:
cur.execute(f"DROP INDEX IF EXISTS {idx}")
# Insert test data
cur.execute("INSERT INTO release (id, title) VALUES (1, 'Test Album')")
cur.execute(
"INSERT INTO release_artist (release_id, artist_name) VALUES (1, 'Test Artist')"
)
cur.execute("INSERT INTO release_label (release_id, label_name) VALUES (1, 'Test Lbl')")
cur.execute(
"INSERT INTO release_track (release_id, sequence, title) VALUES (1, 1, 'Track 1')"
)
cur.execute(
"INSERT INTO release_track_artist (release_id, track_sequence, artist_name) "
"VALUES (1, 1, 'Track Artist')"
)
cur.execute("INSERT INTO cache_metadata (release_id, source) VALUES (1, 'bulk_import')")
add_constraints_and_indexes(conn, db_url=db_url)
conn.close()

@pytest.fixture(autouse=True)
def _store_url(self):
self.db_url = self.__class__._db_url

def _connect(self):
return psycopg.connect(self.db_url)

def test_release_pk_exists(self) -> None:
"""Primary key on release(id) exists."""
conn = self._connect()
with conn.cursor() as cur:
cur.execute("""
SELECT constraint_name FROM information_schema.table_constraints
WHERE table_name = 'release' AND constraint_type = 'PRIMARY KEY'
""")
result = cur.fetchone()
conn.close()
assert result is not None

def test_all_fk_constraints_exist(self) -> None:
"""FK constraints on all child tables exist."""
conn = self._connect()
with conn.cursor() as cur:
cur.execute("""
SELECT tc.table_name
FROM information_schema.table_constraints tc
WHERE tc.constraint_type = 'FOREIGN KEY'
""")
fk_tables = {row[0] for row in cur.fetchall()}
conn.close()
expected = {
"release_artist",
"release_label",
"release_track",
"release_track_artist",
"cache_metadata",
}
assert expected.issubset(fk_tables)

def test_base_and_track_indexes_exist(self) -> None:
"""Both base and track FK indexes exist."""
conn = self._connect()
with conn.cursor() as cur:
cur.execute("SELECT indexname FROM pg_indexes WHERE schemaname = 'public'")
indexes = {row[0] for row in cur.fetchall()}
conn.close()
expected_indexes = {
"idx_release_artist_release_id",
"idx_release_label_release_id",
"idx_release_track_release_id",
"idx_release_track_artist_release_id",
}
assert expected_indexes.issubset(indexes)
Loading