Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scripts/import_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class TableConfig(TypedDict, total=False):
"db_columns": ["id", "title", "country", "release_year", "master_id"],
"required": ["id", "title"],
"transforms": {"released": extract_year},
"unique_key": ["id"],
},
{
"csv_file": "release_artist.csv",
Expand Down
54 changes: 54 additions & 0 deletions tests/integration/test_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,3 +413,57 @@ def test_total_track_count(self) -> None:
conn.close()
# 1002: 3, 3001: 2, 4001: 2 = 7
assert count == 7


class TestDuplicateReleaseIds:
"""Import a CSV with duplicate release IDs — first occurrence wins."""

@pytest.fixture(autouse=True, scope="class")
def _set_up_database(self, db_url):
self.__class__._db_url = db_url
_clean_db(db_url)
conn = psycopg.connect(db_url, autocommit=True)
with conn.cursor() as cur:
cur.execute(SCHEMA_DIR.joinpath("create_database.sql").read_text())
conn.close()

@pytest.fixture(autouse=True)
def _store_url(self):
self.db_url = self.__class__._db_url

def _connect(self):
return psycopg.connect(self.db_url)

def test_duplicate_release_ids_keep_first(self, tmp_path) -> None:
"""When a CSV has duplicate release IDs, only the first row is imported."""
csv_path = tmp_path / "release.csv"
csv_path.write_text(
"id,status,title,country,released,notes,data_quality,master_id,format\n"
"5001,Accepted,DOGA,AR,2024-05-10,,Correct,8001,LP\n"
"5001,Accepted,Different Title,US,2025,,Correct,8002,CD\n"
"5002,Accepted,Aluminum Tunes,UK,1998-09-01,,Correct,8002,CD\n"
)

release_config = next(t for t in BASE_TABLES if t["table"] == "release")
conn = psycopg.connect(self.db_url)
count = import_csv_func(
conn,
csv_path,
release_config["table"],
release_config["csv_columns"],
release_config["db_columns"],
release_config["required"],
release_config["transforms"],
unique_key=release_config["unique_key"],
)
conn.close()

assert count == 2 # 2 unique IDs, not 3 rows

conn = self._connect()
with conn.cursor() as cur:
cur.execute("SELECT title FROM release WHERE id = 5001")
title = cur.fetchone()[0]
conn.close()
# First occurrence wins
assert title == "DOGA"
13 changes: 12 additions & 1 deletion tests/unit/test_import_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,20 @@ def test_all_tables_have_required_keys(self) -> None:
f"{required_keys - table_config.keys()}"
)

def test_release_table_has_unique_key_on_id(self) -> None:
"""The release table must dedup on id to handle duplicate releases in CSVs."""
release_config = next(t for t in TABLES if t["table"] == "release")
assert "unique_key" in release_config, "release table needs unique_key for dedup"
assert release_config["unique_key"] == ["id"]

def test_tables_with_unique_constraints_have_unique_key(self) -> None:
"""Tables with unique constraints must specify unique_key for dedup during import."""
tables_needing_dedup = {"release_artist", "release_label", "release_track_artist"}
tables_needing_dedup = {
"release",
"release_artist",
"release_label",
"release_track_artist",
}
for table_config in TABLES:
if table_config["table"] in tables_needing_dedup:
assert "unique_key" in table_config, (
Expand Down