diff --git a/pyproject.toml b/pyproject.toml index b9cf6829..9e47bbd3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "mavedb" -version = "2026.2.0" +version = "2026.2.1" description = "API for MaveDB, the database of Multiplexed Assays of Variant Effect." license = "AGPL-3.0-only" readme = "README.md" diff --git a/src/mavedb/__init__.py b/src/mavedb/__init__.py index 5061f0c0..57f24491 100644 --- a/src/mavedb/__init__.py +++ b/src/mavedb/__init__.py @@ -6,7 +6,7 @@ logger = module_logging.getLogger(__name__) __project__ = "mavedb-api" -__version__ = "2026.2.0" +__version__ = "2026.2.1" logger.info(f"MaveDB {__version__}") diff --git a/src/mavedb/lib/clingen/constants.py b/src/mavedb/lib/clingen/constants.py index 5787501f..2276ee18 100644 --- a/src/mavedb/lib/clingen/constants.py +++ b/src/mavedb/lib/clingen/constants.py @@ -14,7 +14,7 @@ LDH_ENTITY_ENDPOINT = "maveDb" # for some reason, not the same :/ DEFAULT_LDH_SUBMISSION_BATCH_SIZE = 100 -CLINGEN_CACHE_WARMING_CONCURRENCY = 5 +CLINGEN_CACHE_WARMING_CONCURRENCY = int(os.getenv("CLINGEN_CACHE_WARMING_CONCURRENCY", "5")) """Maximum number of concurrent requests to make to the ClinGen API when pre-warming the cache for mapped variants.""" LDH_SUBMISSION_ENDPOINT = f"https://genboree.org/mq/brdg/pulsar/{CLIN_GEN_TENANT}/ldh/submissions/{LDH_ENTITY_ENDPOINT}" LDH_ACCESS_ENDPOINT = os.getenv("LDH_ACCESS_ENDPOINT", "https://genboree.org/ldh") diff --git a/src/mavedb/lib/variant_translations.py b/src/mavedb/lib/variant_translations.py index ec17cc9c..c24c5ed9 100644 --- a/src/mavedb/lib/variant_translations.py +++ b/src/mavedb/lib/variant_translations.py @@ -5,7 +5,10 @@ ClinGen IDs. """ -from sqlalchemy import select +from typing import cast + +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.engine import CursorResult from sqlalchemy.orm import Session from mavedb.models.variant_translation import VariantTranslation @@ -14,22 +17,25 @@ def upsert_variant_translations(db: Session, translations: list[tuple[str, str]]) -> tuple[int, int]: """Insert VariantTranslation rows for (aa, nt) pairs that don't already exist. + Uses INSERT ... ON CONFLICT DO NOTHING to avoid race conditions between + concurrent jobs and duplicate pairs accumulating within a single session + before a commit. + Returns (created, existing) counts. """ - created = 0 - existing = 0 - for aa_clingen_id, nt_clingen_id in translations: - found = db.scalars( - select(VariantTranslation).where( - VariantTranslation.aa_clingen_id == aa_clingen_id, - VariantTranslation.nt_clingen_id == nt_clingen_id, - ) - ).one_or_none() - - if found: - existing += 1 - else: - db.add(VariantTranslation(aa_clingen_id=aa_clingen_id, nt_clingen_id=nt_clingen_id)) - created += 1 + if not translations: + return 0, 0 + + unique = list({(aa, nt) for aa, nt in translations}) + rows = [{"aa_clingen_id": aa, "nt_clingen_id": nt} for aa, nt in unique] + + stmt = ( + insert(VariantTranslation) + .values(rows) + .on_conflict_do_nothing(index_elements=["aa_clingen_id", "nt_clingen_id"]) + ) + result = cast(CursorResult, db.execute(stmt)) + created = result.rowcount + existing = len(unique) - created return created, existing diff --git a/src/mavedb/lib/vep.py b/src/mavedb/lib/vep.py index 0d61cd50..b2e2330b 100644 --- a/src/mavedb/lib/vep.py +++ b/src/mavedb/lib/vep.py @@ -3,13 +3,14 @@ import asyncio import functools import logging +import os from typing import Optional, Sequence from mavedb.lib.utils import request_with_backoff logger = logging.getLogger(__name__) -ENSEMBL_API_URL = "https://rest.ensembl.org" +ENSEMBL_API_URL = os.environ.get("ENSEMBL_API_URL", "https://rest.ensembl.org") # List of all possible VEP consequences, in order from most to least severe VEP_CONSEQUENCES = [ @@ -95,7 +96,7 @@ async def run_variant_recoder(missing_hgvs: Sequence[str]) -> dict[str, list[str url=f"{ENSEMBL_API_URL}/variant_recoder/human", headers=headers, json={"ids": list(missing_hgvs)}, - timeout=300, # Variant Recoder can be very slow for large batches and 504s are common; generous timeout and backoff retries are needed + timeout=600, # Variant Recoder can be very slow for large batches and 504s are common; generous timeout and backoff retries are needed ), ) hgvs_to_genomic: dict[str, list[str]] = {} diff --git a/src/mavedb/worker/jobs/external_services/vep.py b/src/mavedb/worker/jobs/external_services/vep.py index 5620b1e3..9e0905c6 100644 --- a/src/mavedb/worker/jobs/external_services/vep.py +++ b/src/mavedb/worker/jobs/external_services/vep.py @@ -9,6 +9,7 @@ import asyncio import logging +import os from datetime import date from sqlalchemy import select @@ -29,8 +30,8 @@ logger = logging.getLogger(__name__) _VEP_BATCH_SIZE = 200 -_RECODER_BATCH_SIZE = 25 -_RECODER_CONCURRENCY = 5 +_RECODER_BATCH_SIZE = int(os.getenv("RECODER_BATCH_SIZE", "25")) +_RECODER_CONCURRENCY = int(os.getenv("RECODER_CONCURRENCY", "5")) @with_pipeline_management diff --git a/src/mavedb/worker/jobs/system/cleanup.py b/src/mavedb/worker/jobs/system/cleanup.py index 65340431..89089828 100644 --- a/src/mavedb/worker/jobs/system/cleanup.py +++ b/src/mavedb/worker/jobs/system/cleanup.py @@ -39,7 +39,7 @@ # Timeout thresholds for detecting stalled jobs (in minutes). # RUNNING_TIMEOUT_MINUTES must stay below ArqWorkerSettings.job_timeout (currently 2 hours) # to avoid marking legitimately running jobs as stalled. -RUNNING_TIMEOUT_MINUTES = 90 # RUNNING jobs should complete within 90 min (30 min buffer under ARQ timeout) +RUNNING_TIMEOUT_MINUTES = 150 # RUNNING jobs should complete within 150 min (30 min buffer under ARQ timeout) PENDING_TIMEOUT_MINUTES = 5 # PENDING jobs which are actionable within pipelines should be enqueued within 5 minutes PIPELINE_STUCK_TIMEOUT_MINUTES = ( 5 # Pipelines in non-terminal states with no active jobs should resolve within 5 minutes diff --git a/src/mavedb/worker/settings/worker.py b/src/mavedb/worker/settings/worker.py index e84b68c5..6a558e6b 100644 --- a/src/mavedb/worker/settings/worker.py +++ b/src/mavedb/worker/settings/worker.py @@ -25,7 +25,7 @@ # driver, enabling incremental migration of job functions without touching # the FastAPI layer. Once all jobs use async sessions, raise MAX_JOBS to 10+. MAX_JOBS = 2 -JOB_TIMEOUT_SECONDS = 2 * 60 * 60 # 2 hours — matches RUNNING_TIMEOUT_MINUTES (90 min) with buffer +JOB_TIMEOUT_SECONDS = 3 * 60 * 60 # 3 hours — matches RUNNING_TIMEOUT_MINUTES (150 min) with buffer class ArqWorkerSettings: diff --git a/tests/lib/test_variant_translations.py b/tests/lib/test_variant_translations.py new file mode 100644 index 00000000..832152c7 --- /dev/null +++ b/tests/lib/test_variant_translations.py @@ -0,0 +1,93 @@ +# ruff: noqa: E402 + +import pytest + +pytest.importorskip("psycopg2") + +from sqlalchemy import select + +from mavedb.lib.variant_translations import upsert_variant_translations +from mavedb.models.variant_translation import VariantTranslation + + +@pytest.mark.unit +class TestUpsertVariantTranslations: + """Unit tests for upsert_variant_translations. + + Focuses on the INSERT ... ON CONFLICT DO NOTHING semantics: correct + created/existing counts, deduplication within a batch, and idempotency + across successive calls within the same transaction. + """ + + def test_inserts_new_pairs(self, session): + created, existing = upsert_variant_translations(session, [("PA1", "CA1"), ("PA1", "CA2")]) + + assert created == 2 + assert existing == 0 + rows = session.scalars(select(VariantTranslation)).all() + assert len(rows) == 2 + + def test_returns_existing_count_for_committed_rows(self, session): + session.add(VariantTranslation(aa_clingen_id="PA1", nt_clingen_id="CA1")) + session.commit() + + created, existing = upsert_variant_translations(session, [("PA1", "CA1"), ("PA1", "CA2")]) + + assert created == 1 + assert existing == 1 + rows = session.scalars(select(VariantTranslation)).all() + assert len(rows) == 2 + + def test_empty_input_returns_zeros(self, session): + created, existing = upsert_variant_translations(session, []) + + assert created == 0 + assert existing == 0 + + def test_deduplicates_duplicate_pairs_within_batch(self, session): + # Same pair appears twice in the input — should only insert once. + created, existing = upsert_variant_translations(session, [("PA1", "CA1"), ("PA1", "CA1")]) + + assert created == 1 + assert existing == 0 + rows = session.scalars(select(VariantTranslation)).all() + assert len(rows) == 1 + + def test_different_nt_under_same_aa_are_distinct_rows(self, session): + # (PA1, CA1) and (PA1, CA2) share aa_clingen_id but are different rows — + # ON CONFLICT only fires on an exact composite-key match, so both insert. + created, existing = upsert_variant_translations(session, [("PA1", "CA1"), ("PA1", "CA2"), ("PA1", "CA3")]) + + assert created == 3 + assert existing == 0 + + def test_idempotent_across_calls_without_intermediate_commit(self, session): + # This is the exact scenario that caused the UniqueViolation crash. + # Two separate calls within the same transaction share overlapping pairs. + # The second call must succeed without error (ON CONFLICT DO NOTHING) + # rather than trying to INSERT a duplicate and blowing up at commit time. + created1, existing1 = upsert_variant_translations(session, [("PA1", "CA1"), ("PA1", "CA2")]) + assert created1 == 2 + assert existing1 == 0 + + # Overlapping call — CA1 already exists in this transaction, CA3 is new. + created2, existing2 = upsert_variant_translations(session, [("PA1", "CA1"), ("PA1", "CA3")]) + assert created2 == 1 + assert existing2 == 1 + + # Commit must succeed — no UniqueViolation. + session.commit() + + rows = session.scalars(select(VariantTranslation)).all() + assert len(rows) == 3 + + def test_fully_overlapping_second_call_inserts_nothing(self, session): + upsert_variant_translations(session, [("PA1", "CA1"), ("PA1", "CA2")]) + + created, existing = upsert_variant_translations(session, [("PA1", "CA1"), ("PA1", "CA2")]) + assert created == 0 + assert existing == 2 + + session.commit() + rows = session.scalars(select(VariantTranslation)).all() + assert len(rows) == 2 diff --git a/tests/worker/jobs/external_services/test_variant_translation.py b/tests/worker/jobs/external_services/test_variant_translation.py index 0b1677df..3e1f364e 100644 --- a/tests/worker/jobs/external_services/test_variant_translation.py +++ b/tests/worker/jobs/external_services/test_variant_translation.py @@ -10,6 +10,8 @@ from mavedb.lib.types.workflow import JobExecutionOutcome from mavedb.models.enums.job_pipeline import FailureCategory, JobStatus, PipelineStatus +from mavedb.models.mapped_variant import MappedVariant +from mavedb.models.variant import Variant from mavedb.models.variant_annotation_status import VariantAnnotationStatus from mavedb.models.variant_translation import VariantTranslation from mavedb.worker.jobs.external_services.variant_translation import populate_variant_translations_for_score_set @@ -347,6 +349,72 @@ async def test_propagates_exceptions( assert str(exc_info.value) == "Test exception" + async def test_multiple_alleles_sharing_pa_no_duplicate_error( + self, + session, + with_populated_domain_data, + with_populate_variant_translations_job, + mock_worker_ctx, + sample_populate_variant_translations_run, + setup_sample_variants_with_caid_for_translation, + ): + """Test that two CA alleles mapping to the same PA don't cause a UniqueViolation. + + This is a regression test for a bug where the SELECT-then-INSERT upsert pattern + failed to detect in-session duplicates: both alleles' iterations called + upsert_variant_translations with overlapping (PA, CA) pairs, the SELECT found + no committed row, both staged db.add() for the same pair, and the subsequent + update_progress commit raised a UniqueViolation. + """ + # Add a second variant with a different CA allele under the same score set. + score_set_id = sample_populate_variant_translations_run.job_params["score_set_id"] + + variant2 = Variant( + urn="urn:variant:test-second-ca-allele", + score_set_id=score_set_id, + hgvs_nt="NM_000000.1:c.2T>G", + hgvs_pro="NP_000000.1:p.Val2Gly", + data={}, + ) + session.add(variant2) + session.commit() + mapped_variant2 = MappedVariant( + variant_id=variant2.id, + clingen_allele_id="CA_SECOND", + current=True, + mapped_date="2024-01-01T00:00:00Z", + mapping_api_version="1.0.0", + ) + session.add(mapped_variant2) + session.commit() + + # Both CA alleles resolve to the same PA. The PA then returns the same set of + # registered CAs for both iterations, producing fully overlapping translation pairs. + with ( + patch( + "mavedb.worker.jobs.external_services.variant_translation.get_canonical_pa_ids", + return_value=["PA_SHARED"], + ), + patch( + "mavedb.worker.jobs.external_services.variant_translation.get_matching_registered_ca_ids", + return_value=["CA9765210", "CA_SECOND"], + ), + ): + result = await populate_variant_translations_for_score_set( + mock_worker_ctx, + 1, + JobManager(session, mock_worker_ctx["redis"], sample_populate_variant_translations_run.id), + ) + + assert result.status == JobStatus.SUCCEEDED + + translations = session.scalars(select(VariantTranslation)).all() + pairs = {(t.aa_clingen_id, t.nt_clingen_id) for t in translations} + # PA_SHARED paired with each CA: CA9765210 (original from allele 1), + # CA_SECOND (original from allele 2), plus both as registered CAs. + assert ("PA_SHARED", "CA9765210") in pairs + assert ("PA_SHARED", "CA_SECOND") in pairs + async def test_total_api_failure_returns_failed( self, session,