Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "mavedb"
version = "2026.2.0"
version = "2026.2.1"
description = "API for MaveDB, the database of Multiplexed Assays of Variant Effect."
license = "AGPL-3.0-only"
readme = "README.md"
Expand Down
2 changes: 1 addition & 1 deletion src/mavedb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
logger = module_logging.getLogger(__name__)

__project__ = "mavedb-api"
__version__ = "2026.2.0"
__version__ = "2026.2.1"

logger.info(f"MaveDB {__version__}")

Expand Down
2 changes: 1 addition & 1 deletion src/mavedb/lib/clingen/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
LDH_ENTITY_ENDPOINT = "maveDb" # for some reason, not the same :/

DEFAULT_LDH_SUBMISSION_BATCH_SIZE = 100
CLINGEN_CACHE_WARMING_CONCURRENCY = 5
CLINGEN_CACHE_WARMING_CONCURRENCY = int(os.getenv("CLINGEN_CACHE_WARMING_CONCURRENCY", "5"))
"""Maximum number of concurrent requests to make to the ClinGen API when pre-warming the cache for mapped variants."""
LDH_SUBMISSION_ENDPOINT = f"https://genboree.org/mq/brdg/pulsar/{CLIN_GEN_TENANT}/ldh/submissions/{LDH_ENTITY_ENDPOINT}"
LDH_ACCESS_ENDPOINT = os.getenv("LDH_ACCESS_ENDPOINT", "https://genboree.org/ldh")
Expand Down
38 changes: 22 additions & 16 deletions src/mavedb/lib/variant_translations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
ClinGen IDs.
"""

from sqlalchemy import select
from typing import cast

from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.engine import CursorResult
from sqlalchemy.orm import Session

from mavedb.models.variant_translation import VariantTranslation
Expand All @@ -14,22 +17,25 @@
def upsert_variant_translations(db: Session, translations: list[tuple[str, str]]) -> tuple[int, int]:
"""Insert VariantTranslation rows for (aa, nt) pairs that don't already exist.

Uses INSERT ... ON CONFLICT DO NOTHING to avoid race conditions between
concurrent jobs and duplicate pairs accumulating within a single session
before a commit.

Returns (created, existing) counts.
"""
created = 0
existing = 0
for aa_clingen_id, nt_clingen_id in translations:
found = db.scalars(
select(VariantTranslation).where(
VariantTranslation.aa_clingen_id == aa_clingen_id,
VariantTranslation.nt_clingen_id == nt_clingen_id,
)
).one_or_none()

if found:
existing += 1
else:
db.add(VariantTranslation(aa_clingen_id=aa_clingen_id, nt_clingen_id=nt_clingen_id))
created += 1
if not translations:
return 0, 0

unique = list({(aa, nt) for aa, nt in translations})
rows = [{"aa_clingen_id": aa, "nt_clingen_id": nt} for aa, nt in unique]

stmt = (
insert(VariantTranslation)
.values(rows)
.on_conflict_do_nothing(index_elements=["aa_clingen_id", "nt_clingen_id"])
)
result = cast(CursorResult, db.execute(stmt))

created = result.rowcount
existing = len(unique) - created
return created, existing
5 changes: 3 additions & 2 deletions src/mavedb/lib/vep.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import asyncio
import functools
import logging
import os
from typing import Optional, Sequence

from mavedb.lib.utils import request_with_backoff

logger = logging.getLogger(__name__)

ENSEMBL_API_URL = "https://rest.ensembl.org"
ENSEMBL_API_URL = os.environ.get("ENSEMBL_API_URL", "https://rest.ensembl.org")

# List of all possible VEP consequences, in order from most to least severe
VEP_CONSEQUENCES = [
Expand Down Expand Up @@ -95,7 +96,7 @@ async def run_variant_recoder(missing_hgvs: Sequence[str]) -> dict[str, list[str
url=f"{ENSEMBL_API_URL}/variant_recoder/human",
headers=headers,
json={"ids": list(missing_hgvs)},
timeout=300, # Variant Recoder can be very slow for large batches and 504s are common; generous timeout and backoff retries are needed
timeout=600, # Variant Recoder can be very slow for large batches and 504s are common; generous timeout and backoff retries are needed
),
)
hgvs_to_genomic: dict[str, list[str]] = {}
Expand Down
5 changes: 3 additions & 2 deletions src/mavedb/worker/jobs/external_services/vep.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import asyncio
import logging
import os
from datetime import date

from sqlalchemy import select
Expand All @@ -29,8 +30,8 @@
logger = logging.getLogger(__name__)

_VEP_BATCH_SIZE = 200
_RECODER_BATCH_SIZE = 25
_RECODER_CONCURRENCY = 5
_RECODER_BATCH_SIZE = int(os.getenv("RECODER_BATCH_SIZE", "25"))
_RECODER_CONCURRENCY = int(os.getenv("RECODER_CONCURRENCY", "5"))


@with_pipeline_management
Expand Down
2 changes: 1 addition & 1 deletion src/mavedb/worker/jobs/system/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
# Timeout thresholds for detecting stalled jobs (in minutes).
# RUNNING_TIMEOUT_MINUTES must stay below ArqWorkerSettings.job_timeout (currently 2 hours)
# to avoid marking legitimately running jobs as stalled.
RUNNING_TIMEOUT_MINUTES = 90 # RUNNING jobs should complete within 90 min (30 min buffer under ARQ timeout)
RUNNING_TIMEOUT_MINUTES = 150 # RUNNING jobs should complete within 150 min (30 min buffer under ARQ timeout)
PENDING_TIMEOUT_MINUTES = 5 # PENDING jobs which are actionable within pipelines should be enqueued within 5 minutes
PIPELINE_STUCK_TIMEOUT_MINUTES = (
5 # Pipelines in non-terminal states with no active jobs should resolve within 5 minutes
Expand Down
2 changes: 1 addition & 1 deletion src/mavedb/worker/settings/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# driver, enabling incremental migration of job functions without touching
# the FastAPI layer. Once all jobs use async sessions, raise MAX_JOBS to 10+.
MAX_JOBS = 2
JOB_TIMEOUT_SECONDS = 2 * 60 * 60 # 2 hours — matches RUNNING_TIMEOUT_MINUTES (90 min) with buffer
JOB_TIMEOUT_SECONDS = 3 * 60 * 60 # 3 hours — matches RUNNING_TIMEOUT_MINUTES (150 min) with buffer


class ArqWorkerSettings:
Expand Down
93 changes: 93 additions & 0 deletions tests/lib/test_variant_translations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# ruff: noqa: E402

import pytest

pytest.importorskip("psycopg2")

from sqlalchemy import select

from mavedb.lib.variant_translations import upsert_variant_translations
from mavedb.models.variant_translation import VariantTranslation


@pytest.mark.unit
class TestUpsertVariantTranslations:
"""Unit tests for upsert_variant_translations.

Focuses on the INSERT ... ON CONFLICT DO NOTHING semantics: correct
created/existing counts, deduplication within a batch, and idempotency
across successive calls within the same transaction.
"""

def test_inserts_new_pairs(self, session):
created, existing = upsert_variant_translations(session, [("PA1", "CA1"), ("PA1", "CA2")])

assert created == 2
assert existing == 0
rows = session.scalars(select(VariantTranslation)).all()
assert len(rows) == 2

def test_returns_existing_count_for_committed_rows(self, session):
session.add(VariantTranslation(aa_clingen_id="PA1", nt_clingen_id="CA1"))
session.commit()

created, existing = upsert_variant_translations(session, [("PA1", "CA1"), ("PA1", "CA2")])

assert created == 1
assert existing == 1
rows = session.scalars(select(VariantTranslation)).all()
assert len(rows) == 2

def test_empty_input_returns_zeros(self, session):
created, existing = upsert_variant_translations(session, [])

assert created == 0
assert existing == 0

def test_deduplicates_duplicate_pairs_within_batch(self, session):
# Same pair appears twice in the input — should only insert once.
created, existing = upsert_variant_translations(session, [("PA1", "CA1"), ("PA1", "CA1")])

assert created == 1
assert existing == 0
rows = session.scalars(select(VariantTranslation)).all()
assert len(rows) == 1

def test_different_nt_under_same_aa_are_distinct_rows(self, session):
# (PA1, CA1) and (PA1, CA2) share aa_clingen_id but are different rows —
# ON CONFLICT only fires on an exact composite-key match, so both insert.
created, existing = upsert_variant_translations(session, [("PA1", "CA1"), ("PA1", "CA2"), ("PA1", "CA3")])

assert created == 3
assert existing == 0

def test_idempotent_across_calls_without_intermediate_commit(self, session):
# This is the exact scenario that caused the UniqueViolation crash.
# Two separate calls within the same transaction share overlapping pairs.
# The second call must succeed without error (ON CONFLICT DO NOTHING)
# rather than trying to INSERT a duplicate and blowing up at commit time.
created1, existing1 = upsert_variant_translations(session, [("PA1", "CA1"), ("PA1", "CA2")])
assert created1 == 2
assert existing1 == 0

# Overlapping call — CA1 already exists in this transaction, CA3 is new.
created2, existing2 = upsert_variant_translations(session, [("PA1", "CA1"), ("PA1", "CA3")])
assert created2 == 1
assert existing2 == 1

# Commit must succeed — no UniqueViolation.
session.commit()

rows = session.scalars(select(VariantTranslation)).all()
assert len(rows) == 3

def test_fully_overlapping_second_call_inserts_nothing(self, session):
upsert_variant_translations(session, [("PA1", "CA1"), ("PA1", "CA2")])

created, existing = upsert_variant_translations(session, [("PA1", "CA1"), ("PA1", "CA2")])
assert created == 0
assert existing == 2

session.commit()
rows = session.scalars(select(VariantTranslation)).all()
assert len(rows) == 2
68 changes: 68 additions & 0 deletions tests/worker/jobs/external_services/test_variant_translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

from mavedb.lib.types.workflow import JobExecutionOutcome
from mavedb.models.enums.job_pipeline import FailureCategory, JobStatus, PipelineStatus
from mavedb.models.mapped_variant import MappedVariant
from mavedb.models.variant import Variant
from mavedb.models.variant_annotation_status import VariantAnnotationStatus
from mavedb.models.variant_translation import VariantTranslation
from mavedb.worker.jobs.external_services.variant_translation import populate_variant_translations_for_score_set
Expand Down Expand Up @@ -347,6 +349,72 @@ async def test_propagates_exceptions(

assert str(exc_info.value) == "Test exception"

async def test_multiple_alleles_sharing_pa_no_duplicate_error(
self,
session,
with_populated_domain_data,
with_populate_variant_translations_job,
mock_worker_ctx,
sample_populate_variant_translations_run,
setup_sample_variants_with_caid_for_translation,
):
"""Test that two CA alleles mapping to the same PA don't cause a UniqueViolation.

This is a regression test for a bug where the SELECT-then-INSERT upsert pattern
failed to detect in-session duplicates: both alleles' iterations called
upsert_variant_translations with overlapping (PA, CA) pairs, the SELECT found
no committed row, both staged db.add() for the same pair, and the subsequent
update_progress commit raised a UniqueViolation.
"""
# Add a second variant with a different CA allele under the same score set.
score_set_id = sample_populate_variant_translations_run.job_params["score_set_id"]

variant2 = Variant(
urn="urn:variant:test-second-ca-allele",
score_set_id=score_set_id,
hgvs_nt="NM_000000.1:c.2T>G",
hgvs_pro="NP_000000.1:p.Val2Gly",
data={},
)
session.add(variant2)
session.commit()
mapped_variant2 = MappedVariant(
variant_id=variant2.id,
clingen_allele_id="CA_SECOND",
current=True,
mapped_date="2024-01-01T00:00:00Z",
mapping_api_version="1.0.0",
)
session.add(mapped_variant2)
session.commit()

# Both CA alleles resolve to the same PA. The PA then returns the same set of
# registered CAs for both iterations, producing fully overlapping translation pairs.
with (
patch(
"mavedb.worker.jobs.external_services.variant_translation.get_canonical_pa_ids",
return_value=["PA_SHARED"],
),
patch(
"mavedb.worker.jobs.external_services.variant_translation.get_matching_registered_ca_ids",
return_value=["CA9765210", "CA_SECOND"],
),
):
result = await populate_variant_translations_for_score_set(
mock_worker_ctx,
1,
JobManager(session, mock_worker_ctx["redis"], sample_populate_variant_translations_run.id),
)

assert result.status == JobStatus.SUCCEEDED

translations = session.scalars(select(VariantTranslation)).all()
pairs = {(t.aa_clingen_id, t.nt_clingen_id) for t in translations}
# PA_SHARED paired with each CA: CA9765210 (original from allele 1),
# CA_SECOND (original from allele 2), plus both as registered CAs.
assert ("PA_SHARED", "CA9765210") in pairs
assert ("PA_SHARED", "CA_SECOND") in pairs

async def test_total_api_failure_returns_failed(
self,
session,
Expand Down
Loading