diff --git a/poetry.lock b/poetry.lock index e341507f..8eb9bec6 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1116,7 +1116,7 @@ files = [ {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] -markers = {main = "platform_system == \"Windows\" or extra == \"server\" and sys_platform == \"win32\"", dev = "sys_platform == \"win32\""} +markers = {main = "platform_system == \"Windows\" or sys_platform == \"win32\" and extra == \"server\"", dev = "sys_platform == \"win32\""} [[package]] name = "coloredlogs" @@ -3105,7 +3105,7 @@ description = "Pexpect allows easy control of interactive console applications." optional = true python-versions = "*" groups = ["main"] -markers = "extra == \"server\" and sys_platform != \"win32\" and sys_platform != \"emscripten\"" +markers = "sys_platform != \"win32\" and sys_platform != \"emscripten\" and extra == \"server\"" files = [ {file = "pexpect-4.9.0-py2.py3-none-any.whl", hash = "sha256:7236d1e080e4936be2dc3e326cec0af72acf9212a7e1d060210e70a47e253523"}, {file = "pexpect-4.9.0.tar.gz", hash = "sha256:ee7d41123f3c9911050ea2c2dac107568dc43b2d3b0c7557a33212c398ead30f"}, @@ -3274,7 +3274,7 @@ description = "Run a subprocess in a pseudo terminal" optional = true python-versions = "*" groups = ["main"] -markers = "extra == \"server\" and sys_platform != \"win32\" and sys_platform != \"emscripten\"" +markers = "sys_platform != \"win32\" and sys_platform != \"emscripten\" and extra == \"server\"" files = [ {file = "ptyprocess-0.7.0-py2.py3-none-any.whl", hash = "sha256:4b41f3967fce3af57cc7e94b888626c18bf37a083e3651ca8feeb66d492fef35"}, {file = "ptyprocess-0.7.0.tar.gz", hash = "sha256:5c5d0a3b48ceee0b48485e0c26037c0acd7d29765ca3fbb5cb3831d347423220"}, @@ -3713,14 +3713,14 @@ testing = ["covdefaults (>=2.3)", "coverage (>=7.5.4)", "pytest (>=8.3.5)", "pyt [[package]] name = "python-dotenv" -version = "0.20.0" +version = "1.2.2" description = "Read key-value pairs from a .env file and set them as environment variables" optional = false -python-versions = ">=3.5" +python-versions = ">=3.10" groups = ["main"] files = [ - {file = "python-dotenv-0.20.0.tar.gz", hash = "sha256:b7e3b04a59693c42c36f9ab1cc2acc46fa5df8c78e178fc33a8d4cd05c8d498f"}, - {file = "python_dotenv-0.20.0-py3-none-any.whl", hash = "sha256:d92a187be61fe482e4fd675b6d52200e7be63a12b724abbf931a40ce4fa92938"}, + {file = "python_dotenv-1.2.2-py3-none-any.whl", hash = "sha256:1d8214789a24de455a8b8bd8ae6fe3c6b69a5e3d64aa8a8e5d68e694bbcb285a"}, + {file = "python_dotenv-1.2.2.tar.gz", hash = "sha256:2c371a91fbd7ba082c2c1dc1f8bf89ca22564a087c2c287cd9b662adde799cf3"}, ] [package.extras] @@ -3910,25 +3910,25 @@ typing-extensions = {version = ">=4.4.0", markers = "python_version < \"3.13\""} [[package]] name = "requests" -version = "2.32.5" +version = "2.34.0" description = "Python HTTP for Humans." optional = false -python-versions = ">=3.9" +python-versions = ">=3.10" groups = ["main", "dev"] files = [ - {file = "requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6"}, - {file = "requests-2.32.5.tar.gz", hash = "sha256:dbba0bac56e100853db0ea71b82b4dfd5fe2bf6d3754a8893c3af500cec7d7cf"}, + {file = "requests-2.34.0-py3-none-any.whl", hash = "sha256:917520a21b767485ce7c588f4ebb917c436b24a31231b44228715eaeb5a52c60"}, + {file = "requests-2.34.0.tar.gz", hash = "sha256:7d62fe92f50eb82c529b0916bb445afa1531a566fc8f35ffdc64446e771b856a"}, ] [package.dependencies] -certifi = ">=2017.4.17" +certifi = ">=2023.5.7" charset_normalizer = ">=2,<4" idna = ">=2.5,<4" -urllib3 = ">=1.21.1,<3" +urllib3 = ">=1.26,<3" [package.extras] socks = ["PySocks (>=1.5.6,!=1.5.7)"] -use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] +use-chardet-on-py3 = ["chardet (>=3.0.2,<8)"] [[package]] name = "requests-mock" @@ -4688,14 +4688,14 @@ markers = {dev = "sys_platform == \"win32\""} [[package]] name = "urllib3" -version = "2.6.3" +version = "2.7.0" description = "HTTP library with thread-safe connection pooling, file post, and more." optional = false -python-versions = ">=3.9" +python-versions = ">=3.10" groups = ["main", "dev"] files = [ - {file = "urllib3-2.6.3-py3-none-any.whl", hash = "sha256:bf272323e553dfb2e87d9bfd225ca7b0f467b919d7bbd355436d3fd37cb0acd4"}, - {file = "urllib3-2.6.3.tar.gz", hash = "sha256:1b62b6884944a57dbe321509ab94fd4d3b307075e0c2eae991ac71ee15ad38ed"}, + {file = "urllib3-2.7.0-py3-none-any.whl", hash = "sha256:9fb4c81ebbb1ce9531cce37674bbc6f1360472bc18ca9a553ede278ef7276897"}, + {file = "urllib3-2.7.0.tar.gz", hash = "sha256:231e0ec3b63ceb14667c67be60f2f2c40a518cb38b03af60abc813da26505f4c"}, ] [package.extras] @@ -4738,7 +4738,7 @@ description = "Fast implementation of asyncio event loop on top of libuv" optional = true python-versions = ">=3.8.1" groups = ["main"] -markers = "sys_platform != \"win32\" and sys_platform != \"cygwin\" and platform_python_implementation != \"PyPy\" and extra == \"server\"" +markers = "platform_python_implementation != \"PyPy\" and extra == \"server\" and sys_platform != \"win32\" and sys_platform != \"cygwin\"" files = [ {file = "uvloop-0.22.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:ef6f0d4cc8a9fa1f6a910230cd53545d9a14479311e87e3cb225495952eb672c"}, {file = "uvloop-0.22.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:7cd375a12b71d33d46af85a3343b35d98e8116134ba404bd657b3b1d15988792"}, @@ -5103,4 +5103,4 @@ server = ["aiocache", "alembic", "alembic-utils", "arq", "authlib", "biocommons" [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "fd68159dcc748768ad234a45a89da32151e9879a926554b887847c696f51cc87" +content-hash = "64ca4755b1a5c8930882a7ad3b25feee434f42ffb02ba2841f38c86de56cc948" diff --git a/pyproject.toml b/pyproject.toml index 9e47bbd3..fdf7b481 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "mavedb" -version = "2026.2.1" +version = "2026.2.2" description = "API for MaveDB, the database of Multiplexed Assays of Variant Effect." license = "AGPL-3.0-only" readme = "README.md" @@ -35,7 +35,7 @@ numpy = "~1.26" httpx = "~0.26.0" pandas = ">=2.2.0,<3.0.0" pydantic = "~2.10.0" -python-dotenv = "~0.20.0" +python-dotenv = "^1.2" python-json-logger = "~2.0.7" SQLAlchemy = "~2.0.29" ga4gh-va-spec = "~0.4.2" @@ -58,7 +58,7 @@ pyathena = { version = "~3.14.1", optional = true } psycopg2 = { version = "~2.9.3", optional = true } python-jose = { extras = ["cryptography"], version = "~3.5.0", optional = true } python-multipart = { version = "~0.0.22", optional = true } -requests = { version = "~2.32.2", optional = true } +requests = { version = "^2.33", optional = true } starlette = { version = "~0.49.0", optional = true } starlette-context = { version = "^0.3.6", optional = true } slack-sdk = { version = "~3.21.3", optional = true } diff --git a/src/mavedb/__init__.py b/src/mavedb/__init__.py index 57f24491..57dafd65 100644 --- a/src/mavedb/__init__.py +++ b/src/mavedb/__init__.py @@ -6,7 +6,7 @@ logger = module_logging.getLogger(__name__) __project__ = "mavedb-api" -__version__ = "2026.2.1" +__version__ = "2026.2.2" logger.info(f"MaveDB {__version__}") diff --git a/src/mavedb/lib/clingen/services.py b/src/mavedb/lib/clingen/services.py index 7bf7e854..085d9103 100644 --- a/src/mavedb/lib/clingen/services.py +++ b/src/mavedb/lib/clingen/services.py @@ -276,70 +276,3 @@ def _existing_jwt(self) -> Optional[str]: logger.debug(msg="Found existing but expired Genboree JWT.", extra=logging_context()) return None - - -def get_allele_registry_associations( - content_submissions: list[str], submission_response: list[Union[ClinGenAllele, ClinGenSubmissionError]] -) -> dict[str, str]: - """ - Links HGVS strings and ClinGen Canonoical Allele IDs (CAIDs) given a list of both. - - Args: - content_submissions (list[str]): A list of HGVS strings to check for associations in the ClinGen Allele Registry. - submission_response (list[ClinGenAllele]): The response from the ClinGen Allele Registry submission, - which contains the registered alleles and their associated CAIDs. - - Returns: - dict[str, str]: A dictionary mapping HGVS strings to their corresponding ClinGen Allele Registry IDs (CAIDs). - The keys are the HGVS strings, and the values are the CAIDs assigned by the ClinGen Allele Registry. - If no associations are found, an empty dictionary is returned. - """ - save_to_logging_context( - { - "num_hgvs_strings_to_associate": len(content_submissions), - "num_car_alleles_to_associate": len(submission_response), - } - ) - - if not content_submissions or not submission_response: - logger.warning( - msg="No content submissions or submission response provided for ClinGen Allele Registry association.", - extra=logging_context(), - ) - return {} - else: - logger.info( - msg="Attempting to associate ClinGen Allele Registry allees with MaveDB HGVS strings.", - extra=logging_context(), - ) - - allele_registry_associations: dict[str, str] = {} - for registration in submission_response: - if "errorType" in registration: - logger.warning( - msg=f"Skipping errored ClinGen Allele Registry HGVS {registration.get('hgvs', 'unknown')} ({registration.get('errorType', 'unknown')}): {registration.get('message', 'unknown error message')}", - extra=logging_context(), - ) - continue - - # Extract the CAID from the URL (e.g., "http://reg.test.genome.network/allele/CA2513066" -> "CA2513066") - caid = registration["@id"].split("/")[-1] - alleles = ( - registration.get("genomicAlleles", []) - + registration.get("transcriptAlleles", []) - + registration.get("aminoAcidAlleles", []) - ) - - for allele in alleles: - for hgvs_string in content_submissions: - if hgvs_string in allele["hgvs"]: - allele_registry_associations[hgvs_string] = caid - logger.debug( - msg=f"Found allele registry association for HGVS string '{hgvs_string}': {caid}", - extra=logging_context(), - ) - break - - save_to_logging_context({"num_hgvs_strings_associated_with_caid": len(allele_registry_associations)}) - logger.info(msg="Done associating ClinGen Allele Registry responses.", extra=logging_context()) - return allele_registry_associations diff --git a/src/mavedb/lib/types/clingen.py b/src/mavedb/lib/types/clingen.py index 451c827c..708b6c17 100644 --- a/src/mavedb/lib/types/clingen.py +++ b/src/mavedb/lib/types/clingen.py @@ -1,6 +1,6 @@ from typing import Any, Literal, Optional, TypedDict -from typing_extensions import NotRequired, TypeGuard +from typing_extensions import NotRequired # See: https://ldh.genome.network/docs/ldh/submit.html#content-submission-body @@ -164,7 +164,3 @@ class ClinGenAlleleDefinition(TypedDict): "position": str, }, ) - - -def is_car_submission_error(err: ClinGenAllele | ClinGenSubmissionError) -> TypeGuard[ClinGenSubmissionError]: - return "errorType" in err and "hgvs" in err diff --git a/src/mavedb/lib/utils.py b/src/mavedb/lib/utils.py index c4b13f3b..0ec20c9c 100644 --- a/src/mavedb/lib/utils.py +++ b/src/mavedb/lib/utils.py @@ -1,7 +1,7 @@ import logging -import requests import time +import requests logger = logging.getLogger(__name__) @@ -11,11 +11,22 @@ def request_with_backoff( ) -> requests.Response: attempt = 0 while attempt <= backoff_limit: - logger.debug(f"Attempting request to {url}. This is attempt {attempt+1}.") + logger.debug(f"Attempting request to {url}. This is attempt {attempt + 1}.") try: response = requests.request(method=method, url=url, **kwargs) response.raise_for_status() return response + except requests.exceptions.HTTPError as exc: + status = exc.response.status_code if exc.response is not None else None + # 429 means rate-limited — retrying with backoff is the correct response. + # Other 4xx errors are client mistakes that won't improve on retry. + if status is not None and status != 429 and status < 500: + raise + logger.warning(f"Request to {url} failed with status {status}.", exc_info=exc) + backoff_time = backoff_wait * (2**attempt) + attempt += 1 + logger.info(f"Retrying request to {url} in {backoff_wait} seconds.") + time.sleep(backoff_time) except requests.exceptions.RequestException as exc: logger.warning(f"Request to {url} failed.", exc_info=exc) backoff_time = backoff_wait * (2**attempt) diff --git a/src/mavedb/lib/vep.py b/src/mavedb/lib/vep.py index b2e2330b..a7d4e7b3 100644 --- a/src/mavedb/lib/vep.py +++ b/src/mavedb/lib/vep.py @@ -6,6 +6,8 @@ import os from typing import Optional, Sequence +import requests + from mavedb.lib.utils import request_with_backoff logger = logging.getLogger(__name__) @@ -80,44 +82,48 @@ async def run_variant_recoder(missing_hgvs: Sequence[str]) -> dict[str, list[str Returns: dict[str, list[str]]: Mapping of input HGVS to list of genomic HGVS strings (hgvsg). - - Raises: - VEPProcessingError: If the API request fails. + Returns an empty dict if Ensembl rejects the batch (e.g. 400 for + unrecognised identifiers) — callers treat missing entries as failures. """ headers = {"Content-Type": "application/json", "Accept": "application/json"} # request_with_backoff is synchronous (requests lib + time.sleep backoff); run_in_executor # keeps the event loop free during the full request + any retry wait time. loop = asyncio.get_running_loop() - response = await loop.run_in_executor( - None, - functools.partial( - request_with_backoff, - method="POST", - url=f"{ENSEMBL_API_URL}/variant_recoder/human", - headers=headers, - json={"ids": list(missing_hgvs)}, - timeout=600, # Variant Recoder can be very slow for large batches and 504s are common; generous timeout and backoff retries are needed - ), - ) - hgvs_to_genomic: dict[str, list[str]] = {} - # request_with_backoff handles http errors, so no need to check response status + try: + response = await loop.run_in_executor( + None, + functools.partial( + request_with_backoff, + method="POST", + url=f"{ENSEMBL_API_URL}/variant_recoder/human", + headers=headers, + json={"ids": list(missing_hgvs)}, + timeout=600, # Variant Recoder can be very slow for large batches and 504s are common; generous timeout and backoff retries are needed + ), + ) + except requests.exceptions.HTTPError as exc: + # A 4xx from Ensembl (e.g. 400 for an unrecognised identifier format) means the batch + # cannot be recoded. Return empty so callers can handle these missing entries. + logger.warning( + f"Variant Recoder returned {exc.response.status_code if exc.response is not None else 'unknown'} " + f"for batch of {len(missing_hgvs)} HGVS strings — treating as no results.", + exc_info=exc, + ) + return {} + data = response.json() - for entry in data: - hgvs_string = entry.get("input") - if not hgvs_string: - continue - genomic_hgvs_list = [] - for variant, variant_data in entry.items(): - if variant == "input": + # request_with_backoff handles http errors, so no need to check response status + hgvs_to_genomic: dict[str, list[str]] = {} + for input_variant in data: + for variant_str, variant_data in input_variant.items(): + hgvs_string = variant_data.get("input") if isinstance(variant_data, dict) else None + if variant_str == "input" or not hgvs_string: continue genomic_strings = variant_data.get("hgvsg") if isinstance(variant_data, dict) else None if genomic_strings: for genomic_hgvs in genomic_strings: if genomic_hgvs.startswith("NC_"): - genomic_hgvs_list.append(genomic_hgvs) - if genomic_hgvs_list: - hgvs_to_genomic[hgvs_string] = genomic_hgvs_list - + hgvs_to_genomic.setdefault(hgvs_string, []).append(genomic_hgvs) return hgvs_to_genomic @@ -133,10 +139,10 @@ async def get_functional_consequence(hgvs_strings: Sequence[str]) -> dict[str, O Returns: dict[str, Optional[str]]: Mapping of HGVS string to functional consequence. - If no consequence found, maps to None. - - Raises: - VEPProcessingError: If VEP API processing fails critically. + If no consequence found, maps to None. Returns an empty dict + if Ensembl rejects the batch (e.g. 400 for unrecognised + identifiers) — callers treat missing entries as needing Recoder + fallback or as failures. """ if len(hgvs_strings) > 200: raise ValueError( @@ -149,19 +155,28 @@ async def get_functional_consequence(hgvs_strings: Sequence[str]) -> dict[str, O # request_with_backoff is synchronous (requests lib + time.sleep backoff); run_in_executor # keeps the event loop free during the full request + any retry wait time. loop = asyncio.get_running_loop() - response = await loop.run_in_executor( - None, - functools.partial( - request_with_backoff, - method="POST", - url=f"{ENSEMBL_API_URL}/vep/human/hgvs", - headers=headers, - json={"hgvs_notations": list(hgvs_strings)}, - timeout=60, # VEP can be slow for large batches. - ), - ) + try: + response = await loop.run_in_executor( + None, + functools.partial( + request_with_backoff, + method="POST", + url=f"{ENSEMBL_API_URL}/vep/human/hgvs", + headers=headers, + json={"hgvs_notations": list(hgvs_strings)}, + timeout=60, # VEP can be slow for large batches. + ), + ) + except requests.exceptions.HTTPError as exc: + # A 4xx from Ensembl (e.g. 400 for an unrecognised identifier) means the batch cannot + # be resolved. Return empty so the callers can handle these missing entries. + logger.warning( + f"VEP returned {exc.response.status_code if exc.response is not None else 'unknown'} " + f"for batch of {len(hgvs_strings)} HGVS strings — treating as no results.", + exc_info=exc, + ) + return result - # request_with_backoff handles http errors, so no need to check response status data = response.json() for entry in data: hgvs = entry.get("input") diff --git a/src/mavedb/worker/jobs/external_services/clingen.py b/src/mavedb/worker/jobs/external_services/clingen.py index 50107708..639160ed 100644 --- a/src/mavedb/worker/jobs/external_services/clingen.py +++ b/src/mavedb/worker/jobs/external_services/clingen.py @@ -26,9 +26,7 @@ from mavedb.lib.clingen.services import ( ClinGenAlleleRegistryService, ClinGenLdhService, - get_allele_registry_associations, ) -from mavedb.lib.types.clingen import is_car_submission_error from mavedb.lib.types.workflow import JobExecutionOutcome from mavedb.lib.variants import get_hgvs_from_post_mapped from mavedb.models.enums.annotation_type import AnnotationType @@ -134,15 +132,35 @@ async def submit_score_set_mappings_to_car(ctx: dict, job_id: int, job_manager: ) # Build HGVS strings for submission. Don't do duplicate submissions-- store mapped variant IDs by HGVS. + # Variants that can't produce an HGVS string are annotated as failures immediately. + annotation_manager = AnnotationStatusManager(job_manager.db, job_run_id=job_manager.job_id) variant_post_mapped_hgvs: dict[str, list[int]] = {} + no_hgvs_count = 0 for mapped_variant_id, post_mapped in variant_post_mapped_objects: hgvs_for_post_mapped = get_hgvs_from_post_mapped(post_mapped) if not hgvs_for_post_mapped: + no_hgvs_count += 1 logger.warning( msg=f"Could not construct a valid HGVS string for mapped variant {mapped_variant_id}. Skipping submission of this variant.", extra=job_manager.logging_context(), ) + + mapped_variant = job_manager.db.scalars( + select(MappedVariant).where(MappedVariant.id == mapped_variant_id) + ).one() + annotation_manager.add_annotation( + variant_id=mapped_variant.variant_id, # type: ignore + annotation_type=AnnotationType.CLINGEN_ALLELE_ID, + version=None, + status=AnnotationStatus.FAILED, + failure_category=AnnotationFailureCategory.MISSING_IDENTIFIER, + annotation_data={ + "error_message": "Could not extract a valid HGVS string from post-mapped variant data.", + "annotation_metadata": {}, + }, + current=True, + ) continue if hgvs_for_post_mapped in variant_post_mapped_hgvs: @@ -155,143 +173,113 @@ async def submit_score_set_mappings_to_car(ctx: dict, job_id: int, job_manager: # Do submission car_service = ClinGenAlleleRegistryService(url=CAR_SUBMISSION_ENDPOINT) - registered_alleles = car_service.dispatch_submissions(list(variant_post_mapped_hgvs.keys())) + hgvs_list = list(variant_post_mapped_hgvs.keys()) + registered_alleles = car_service.dispatch_submissions(hgvs_list) job_manager.update_progress(60, 100, "Processing registered alleles from CAR.") - # Build a map of HGVS string -> CAR error details for every rejected submission. - # The CAR response intermixes successes (have "@id") and errors (have "errorType"). - car_errors_by_hgvs: dict[str, dict] = { - err["hgvs"]: { - "error_type": err.get("errorType"), - "message": err.get("message"), - } - for err in registered_alleles - if is_car_submission_error(err) - } - - # Build an inverse map so we can look up the HGVS string for any mapped_variant_id. - mapped_variant_id_to_hgvs: dict[int, str] = { - vid: hgvs for hgvs, vids in variant_post_mapped_hgvs.items() for vid in vids - } - - # Process registered alleles and update mapped variants - linked_alleles = get_allele_registry_associations(list(variant_post_mapped_hgvs.keys()), registered_alleles) - total = len(linked_alleles) - processed = 0 - # Setup annotation manager - annotation_manager = AnnotationStatusManager(job_manager.db, job_run_id=job_manager.job_id) - registered_mapped_variant_ids = [] - for hgvs_string, caid in linked_alleles.items(): + # CAR returns one response per submitted HGVS in the same order (see CAR API docs). + # Zip the submissions with the responses and annotate each based on success or error. + linked_count = 0 + error_count = 0 + + for hgvs_string, response in zip(hgvs_list, registered_alleles): mapped_variant_ids = variant_post_mapped_hgvs[hgvs_string] - registered_mapped_variant_ids.extend(mapped_variant_ids) mapped_variants = job_manager.db.scalars( select(MappedVariant).where(MappedVariant.id.in_(mapped_variant_ids)) ).all() - for mapped_variant in mapped_variants: - mapped_variant.clingen_allele_id = caid - job_manager.db.add(mapped_variant) + if "errorType" in response: + error_count += 1 + logger.warning( + msg=f"CAR rejected HGVS '{hgvs_string}' ({response.get('errorType', 'unknown')}): {response.get('message', 'unknown')}", + extra=job_manager.logging_context(), + ) + for mapped_variant in mapped_variants: + annotation_manager.add_annotation( + variant_id=mapped_variant.variant_id, # type: ignore + annotation_type=AnnotationType.CLINGEN_ALLELE_ID, + version=None, + status=AnnotationStatus.FAILED, + failure_category=AnnotationFailureCategory.EXTERNAL_SERVICE_REJECTED, + annotation_data={ + "error_message": "Failed to register variant with ClinGen Allele Registry.", + "annotation_metadata": { + "submitted_hgvs": hgvs_string, + "car_error_type": response.get("errorType"), + "car_error_message": response.get("message"), + }, + }, + current=True, + ) + + else: + linked_count += 1 + caid = response["@id"].split("/")[-1] + for mapped_variant in mapped_variants: + mapped_variant.clingen_allele_id = caid + job_manager.db.add(mapped_variant) + + annotation_manager.add_annotation( + variant_id=mapped_variant.variant_id, # type: ignore + annotation_type=AnnotationType.CLINGEN_ALLELE_ID, + version=None, + status=AnnotationStatus.SUCCESS, + annotation_data={"annotation_metadata": {"clingen_allele_id": caid}}, + current=True, + ) + + # Any HGVS strings CAR did not respond to (network drop, service-side omission). + # Use EXTERNAL_SERVICE_REJECTED for explicit CAR errors, EXTERNAL_API_ERROR for silent failures. + no_response_hgvs = hgvs_list[len(registered_alleles) :] + for hgvs_string in no_response_hgvs: + mapped_variants = job_manager.db.scalars( + select(MappedVariant).where(MappedVariant.id.in_(variant_post_mapped_hgvs[hgvs_string])) + ).all() + for mapped_variant in mapped_variants: annotation_manager.add_annotation( variant_id=mapped_variant.variant_id, # type: ignore annotation_type=AnnotationType.CLINGEN_ALLELE_ID, version=None, - status=AnnotationStatus.SUCCESS, + status=AnnotationStatus.FAILED, + failure_category=AnnotationFailureCategory.EXTERNAL_API_ERROR, annotation_data={ - "annotation_metadata": {"clingen_allele_id": caid}, + "error_message": "Failed to register variant with ClinGen Allele Registry.", + "annotation_metadata": {"submitted_hgvs": hgvs_string}, }, current=True, ) - processed += 1 - - # Calculate progress: 50% + (processed/total_mapped)*50, rounded to nearest 5% - if total % 20 == 0 or processed == total: - progress = 50 + round((processed / total) * 45 / 5) * 5 - job_manager.update_progress(progress, 100, f"Processed {processed} of {total} registered alleles.") - logger.info( - msg=f"Processed {processed}/{total} registered alleles from CAR.", - extra=job_manager.logging_context(), - ) - - # For mapped variants which did not get a CAID, log failure annotation - failed_submissions = set(obj[0] for obj in variant_post_mapped_objects) - set(registered_mapped_variant_ids) - for mapped_variant_id in failed_submissions: - mapped_variant = job_manager.db.scalars( - select(MappedVariant).where(MappedVariant.id == mapped_variant_id) - ).one() - - failed_variant_hgvs = mapped_variant_id_to_hgvs.get(mapped_variant_id) - car_error = car_errors_by_hgvs.get(failed_variant_hgvs) if failed_variant_hgvs else None - - annotation_metadata: dict = {"submitted_hgvs": failed_variant_hgvs} - if car_error: - annotation_metadata["car_error_type"] = car_error["error_type"] - annotation_metadata["car_error_message"] = car_error["message"] - - # Use EXTERNAL_SERVICE_REJECTED when CAR explicitly rejected the submission with an error - # response (e.g. InvalidHGVS), vs EXTERNAL_API_ERROR for silent failures where CAR returned - # no response at all (network drop, service-side omission, etc.). - failure_category = ( - AnnotationFailureCategory.EXTERNAL_SERVICE_REJECTED - if car_error - else AnnotationFailureCategory.EXTERNAL_API_ERROR - ) - - annotation_manager.add_annotation( - variant_id=mapped_variant.variant_id, # type: ignore - annotation_type=AnnotationType.CLINGEN_ALLELE_ID, - version=None, - status=AnnotationStatus.FAILED, - failure_category=failure_category, - annotation_data={ - "error_message": "Failed to register variant with ClinGen Allele Registry.", - "annotation_metadata": annotation_metadata, - }, - current=True, - ) - annotation_manager.flush() + failed_count = no_hgvs_count + error_count + len(no_response_hgvs) + # When all registrations fail we will not be able to render any annotations. Fail the job # to explicitly halt the pipeline. - if failed_submissions and not linked_alleles: - error_message = ( - f"CAR submission failed for all {len(failed_submissions)} variants in score set {score_set.urn}." - ) - logger.error( - msg=error_message, - extra=job_manager.logging_context(), - ) + if linked_count == 0: + error_message = f"CAR submission failed for all {len(hgvs_list)} variants in score set {score_set.urn}." + logger.error(msg=error_message, extra=job_manager.logging_context()) job_manager.db.flush() return JobExecutionOutcome.failed( reason=error_message, - data={ - "submitted_count": len(variant_post_mapped_hgvs), - "matched_count": 0, - "failed_count": len(failed_submissions), - }, + data={"submitted_count": len(hgvs_list), "matched_count": 0, "failed_count": failed_count}, failure_category=FailureCategory.DEPENDENCY_FAILURE, ) - if failed_submissions: + if failed_count > 0: # CAR rejections are typically per-variant data quality issues (e.g. invalid HGVS) rather than # systemic failures. Per-variant AnnotationStatus.FAILED records are already written above for # traceability. We continue the pipeline so that successfully registered variants still receive # downstream annotations (warm_clingen_cache, gnomAD, ClinVar, HGVS, translations). logger.warning( - msg=f"CAR submission failed for {len(failed_submissions)} of {len(variant_post_mapped_hgvs)} variants in score set {score_set.urn}.", + msg=f"CAR submission failed for {failed_count} of {len(hgvs_list)} variants in score set {score_set.urn}.", extra=job_manager.logging_context(), ) logger.info(msg="Completed CAR mapped resource submission", extra=job_manager.logging_context()) job_manager.db.flush() return JobExecutionOutcome.succeeded( - data={ - "submitted_count": len(variant_post_mapped_hgvs), - "matched_count": len(linked_alleles), - "failed_count": len(failed_submissions), - } + data={"submitted_count": len(hgvs_list), "matched_count": linked_count, "failed_count": failed_count} ) diff --git a/src/mavedb/worker/jobs/external_services/vep.py b/src/mavedb/worker/jobs/external_services/vep.py index 9e0905c6..ba577f23 100644 --- a/src/mavedb/worker/jobs/external_services/vep.py +++ b/src/mavedb/worker/jobs/external_services/vep.py @@ -135,9 +135,8 @@ async def populate_vep_for_score_set(ctx: dict, job_id: int, job_manager: JobMan ) # --- Phase 1: Initial VEP pass --- - all_consequences: dict[str, str | None] = {} + all_consequences: dict[str, str] = {} all_missing_hgvs: set[str] = set() - missing_hgvs_to_variant_ids: dict[str, list[int]] = {} for batch_idx, batch in enumerate(batches): logger.debug( @@ -153,14 +152,16 @@ async def populate_vep_for_score_set(ctx: dict, job_id: int, job_manager: JobMan extra=job_manager.logging_context(), ) - all_consequences.update(consequences) + # Only store variants where VEP returned an actual consequence string. A None value + # means VEP knew the variant but couldn't classify it — treat that the same as absent + # and route to Recoder so we have the best chance of getting a consequence. + hit_consequences = {h: c for h, c in consequences.items() if c is not None} + all_consequences.update(hit_consequences) - missing_hgvs = set(hgvs_strings) - set(consequences.keys()) + missing_hgvs = set(hgvs_strings) - set(hit_consequences.keys()) for hgvs, mapped_variant_id in zip(hgvs_strings, mapped_variant_ids): if hgvs in missing_hgvs: all_missing_hgvs.add(hgvs) - mv = mapped_variants_by_id[mapped_variant_id] - missing_hgvs_to_variant_ids.setdefault(hgvs, []).append(mv.variant_id) # type: ignore progress_pct = int((batch_idx + 1) / len(batches) * 33) job_manager.save_to_context( @@ -255,7 +256,10 @@ async def _recoder_with_semaphore(batch: list[str], batch_idx: int, total: int) ) # --- Phase 3: VEP pass on the recoded genomic HGVS strings --- - recoded_vep_batch_list = list(batched(list(hgvs_to_genomic.values()), _VEP_BATCH_SIZE)) + # hgvs_to_genomic maps original HGVS → list[str]; flatten to a deduplicated list of + # genomic strings before batching with VEP. + all_recoded_genomic_hgvs = list({g for genomic_list in hgvs_to_genomic.values() for g in genomic_list}) + recoded_vep_batch_list = list(batched(all_recoded_genomic_hgvs, _VEP_BATCH_SIZE)) all_recoded_consequences: dict[str, str | None] = {} for recoded_vep_batch_idx, recoded_vep_batch in enumerate(recoded_vep_batch_list): diff --git a/tests/lib/clingen/test_services.py b/tests/lib/clingen/test_services.py index 74faed29..f0a4f510 100644 --- a/tests/lib/clingen/test_services.py +++ b/tests/lib/clingen/test_services.py @@ -15,7 +15,6 @@ from mavedb.lib.clingen.services import ( ClinGenAlleleRegistryService, ClinGenLdhService, - get_allele_registry_associations, ) from mavedb.lib.utils import batched @@ -265,69 +264,3 @@ def test_dispatch_submissions_failure(self, mock_auth_url, mock_put, car_service content_submissions = ["NM_0001:c.1A>G"] result = car_service.dispatch_submissions(content_submissions) assert result == [] - - -def test_get_allele_registry_associations_success(): - content_submissions = ["NM_0001:c.1A>G", "NM_0002:c.2T>C", "NM_0003:c.3G>A"] - submission_response = [ - { - "@id": "http://reg.test.genome.network/allele/CA123", - "genomicAlleles": [{"hgvs": "NM_0001:c.1A>G"}], - "transcriptAlleles": [], - }, - { - "@id": "http://reg.test.genome.network/allele/CA456", - "genomicAlleles": [], - "transcriptAlleles": [{"hgvs": "NM_0002:c.2T>C"}], - }, - { - "@id": "http://reg.test.genome.network/allele/CA789", - "genomicAlleles": [], - "transcriptAlleles": [], - "aminoAcidAlleles": [{"hgvs": "NM_0003:c.3G>A"}], - }, - ] - result = get_allele_registry_associations(content_submissions, submission_response) - assert result == {"NM_0001:c.1A>G": "CA123", "NM_0002:c.2T>C": "CA456", "NM_0003:c.3G>A": "CA789"} - - -def test_get_allele_registry_associations_empty(): - result = get_allele_registry_associations([], []) - assert result == {} - - -def test_get_allele_registry_associations_no_match(): - content_submissions = ["NM_0001:c.1A>G"] - submission_response = [ - { - "@id": "http://reg.test.genome.network/allele/CA123", - "genomicAlleles": [{"hgvs": "NM_0002:c.2T>C"}], - "transcriptAlleles": [], - } - ] - result = get_allele_registry_associations(content_submissions, submission_response) - assert result == {} - - -def test_get_allele_registry_associations_mixed(): - content_submissions = ["NM_0001:c.1A>G", "NM_0002:c.2T>C", "NM_0003:c.3G>A"] - submission_response = [ - { - "@id": "http://reg.test.genome.network/allele/CA123", - "genomicAlleles": [{"hgvs": "NM_0001:c.1A>G"}], - "transcriptAlleles": [], - }, - { - "errorType": "InvalidHGVS", - "hgvs": "NM_0002:c.2T>C", - "message": "The HGVS string is invalid.", - }, - { - "@id": "http://reg.test.genome.network/allele/CA789", - "genomicAlleles": [], - "transcriptAlleles": [{"hgvs": "NM_0003:c.3G>A"}], - }, - ] - - result = get_allele_registry_associations(content_submissions, submission_response) - assert result == {"NM_0001:c.1A>G": "CA123", "NM_0003:c.3G>A": "CA789"} diff --git a/tests/lib/test_vep.py b/tests/lib/test_vep.py new file mode 100644 index 00000000..2e9e1fae --- /dev/null +++ b/tests/lib/test_vep.py @@ -0,0 +1,242 @@ +"""Unit tests for mavedb.lib.vep — covers response parsing for both Variant Recoder and VEP APIs. + +These tests mock the underlying HTTP call (request_with_backoff) and assert that the parsing +logic correctly handles the actual Ensembl REST API response shapes. +""" + +from unittest.mock import MagicMock, patch + +import pytest + +from mavedb.lib.vep import get_functional_consequence, run_variant_recoder + + +def _mock_response(data) -> MagicMock: + """Return a mock requests.Response whose .json() returns *data*.""" + mock = MagicMock() + mock.json.return_value = data + return mock + + +# --------------------------------------------------------------------------- +# Realistic Variant Recoder response payloads +# +# Ensembl Variant Recoder returns a list of single-key dicts. The key is the +# called allele (e.g. "T"). The allele dict contains input, hgvsg, hgvsc, etc. +# --------------------------------------------------------------------------- + +RECODER_RESPONSE_SINGLE = [ + { + "T": { + "hgvsc": [ + "NM_007294.4:c.5G>A", + ], + "spdi": ["NC_000017.11:43094691:C:T"], + "input": "NP_009225.1:p.Cys2Tyr", + "hgvsp": ["NP_009225.1:p.Cys2Tyr"], + "hgvsg": [ + "NC_000017.11:g.43094692C>T", + "LRG_292:g.100C>T", # non-NC_ entry — should be filtered out + ], + } + } +] + +RECODER_RESPONSE_MULTIPLE = [ + { + "A": { + "input": "NM_007294.4:c.5G>T", + "hgvsg": ["NC_000017.11:g.43094692C>A"], + } + }, + { + "T": { + "input": "NM_007294.4:c.5G>A", + "hgvsg": ["NC_000017.11:g.43094692C>T"], + } + }, +] + +RECODER_RESPONSE_MULTI_ALLELE = [ + { + # Both allele keys share the same "input" value; each has distinct NC_ hgvsg entries. + "CAT": { + "input": "NP_009225.1:p.Val1696His", + "hgvsg": [ + "NC_000017.11:g.43063938_43063940delinsATG", + "LRG_292:g.154044_154046delinsCAT", + ], + "hgvsc": ["NM_007294.4:c.5086_5088delinsCAT"], + }, + "CAC": { + "input": "NP_009225.1:p.Val1696His", + "hgvsg": [ + "NC_000017.11:g.43063938_43063940inv", + "LRG_292:g.154044_154046inv", + ], + "hgvsc": ["NM_007294.4:c.5086_5088inv"], + }, + } +] + +RECODER_RESPONSE_NO_HGVSG = [ + { + "T": { + "input": "NP_009225.1:p.Xxx1Yyy", + "hgvsc": ["NM_007294.4:c.5G>A"], + # deliberately no "hgvsg" key + } + } +] + + +# --------------------------------------------------------------------------- +# Realistic VEP /vep/human/hgvs response payloads +# +# VEP returns a list of dicts where "input" and "most_severe_consequence" are +# *top-level* keys in each element (unlike Recoder where "input" is nested). +# --------------------------------------------------------------------------- + +VEP_RESPONSE_SINGLE = [ + { + "input": "NM_007294.4:c.5G>A", + "most_severe_consequence": "missense_variant", + "id": "rs80357382", + "seq_region_name": "17", + } +] + +VEP_RESPONSE_MULTIPLE = [ + { + "input": "NM_007294.4:c.5G>A", + "most_severe_consequence": "missense_variant", + }, + { + "input": "NM_007294.4:c.10C>T", + "most_severe_consequence": "synonymous_variant", + }, +] + + +# --------------------------------------------------------------------------- +# run_variant_recoder +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +@pytest.mark.unit +class TestRunVariantRecoder: + async def test_extracts_nc_genomic_hgvs_from_real_response_format(self): + """Correctly extracts NC_ genomic HGVS from the nested allele-key response structure.""" + with patch("mavedb.lib.vep.request_with_backoff", return_value=_mock_response(RECODER_RESPONSE_SINGLE)): + result = await run_variant_recoder(["NP_009225.1:p.Cys2Tyr"]) + + assert result == {"NP_009225.1:p.Cys2Tyr": ["NC_000017.11:g.43094692C>T"]} + + async def test_filters_non_nc_genomic_hgvs(self): + """Non-NC_ entries in hgvsg (e.g. LRG accessions) are excluded from the result.""" + with patch("mavedb.lib.vep.request_with_backoff", return_value=_mock_response(RECODER_RESPONSE_SINGLE)): + result = await run_variant_recoder(["NP_009225.1:p.Cys2Tyr"]) + + returned_hgvsg = result.get("NP_009225.1:p.Cys2Tyr", []) + assert all(h.startswith("NC_") for h in returned_hgvsg) + assert not any("LRG" in h for h in returned_hgvsg) + + async def test_handles_multiple_inputs(self): + """Multiple input variants in one batch are all mapped correctly.""" + with patch("mavedb.lib.vep.request_with_backoff", return_value=_mock_response(RECODER_RESPONSE_MULTIPLE)): + result = await run_variant_recoder(["NM_007294.4:c.5G>T", "NM_007294.4:c.5G>A"]) + + assert result == { + "NM_007294.4:c.5G>T": ["NC_000017.11:g.43094692C>A"], + "NM_007294.4:c.5G>A": ["NC_000017.11:g.43094692C>T"], + } + + async def test_handles_multiple_allele_keys_per_element(self): + """A single response element with multiple allele keys (e.g. CAT, CAC) collects NC_ hgvsg from all of them. + + Variant Recoder can return multiple possible allele representations for the same input variant + in a single element dict. Each allele key is independent and may carry different hgvsg entries. + """ + with patch("mavedb.lib.vep.request_with_backoff", return_value=_mock_response(RECODER_RESPONSE_MULTI_ALLELE)): + result = await run_variant_recoder(["NP_009225.1:p.Val1696His"]) + + genomic_hgvsg = result.get("NP_009225.1:p.Val1696His", []) + assert "NC_000017.11:g.43063938_43063940delinsATG" in genomic_hgvsg + assert "NC_000017.11:g.43063938_43063940inv" in genomic_hgvsg + assert all(h.startswith("NC_") for h in genomic_hgvsg) + + async def test_returns_empty_for_variant_without_hgvsg(self): + """A variant whose allele dict has no hgvsg field is not included in the result.""" + with patch("mavedb.lib.vep.request_with_backoff", return_value=_mock_response(RECODER_RESPONSE_NO_HGVSG)): + result = await run_variant_recoder(["NP_009225.1:p.Xxx1Yyy"]) + + assert result == {} + + async def test_skips_non_dict_allele_values(self): + """If an allele value is not a dict (e.g. null in the response), it is skipped gracefully.""" + response_with_null_allele = [{"T": None}] + with patch("mavedb.lib.vep.request_with_backoff", return_value=_mock_response(response_with_null_allele)): + result = await run_variant_recoder(["some_hgvs"]) + + assert result == {} + + async def test_input_field_at_allele_level_not_top_level(self): + """Regression: the 'input' field is nested inside the allele key, not at the top level of each list element. + + The old buggy implementation called entry.get("input") on the outer dict, + which always returned None and silently dropped all results. + """ + with patch("mavedb.lib.vep.request_with_backoff", return_value=_mock_response(RECODER_RESPONSE_SINGLE)): + result = await run_variant_recoder(["NP_009225.1:p.Cys2Tyr"]) + + # If "input" were read from the wrong level, result would be empty. + assert result != {} + assert "NP_009225.1:p.Cys2Tyr" in result + + +# --------------------------------------------------------------------------- +# get_functional_consequence +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +@pytest.mark.unit +class TestGetFunctionalConsequence: + async def test_extracts_most_severe_consequence(self): + """Parses input and most_severe_consequence from the top-level VEP response structure.""" + with patch("mavedb.lib.vep.request_with_backoff", return_value=_mock_response(VEP_RESPONSE_SINGLE)): + result = await get_functional_consequence(["NM_007294.4:c.5G>A"]) + + assert result == {"NM_007294.4:c.5G>A": "missense_variant"} + + async def test_maps_none_when_consequence_absent(self): + """When most_severe_consequence is missing from an entry, the HGVS maps to None.""" + response = [{"input": "NM_007294.4:c.5G>A"}] + with patch("mavedb.lib.vep.request_with_backoff", return_value=_mock_response(response)): + result = await get_functional_consequence(["NM_007294.4:c.5G>A"]) + + assert result == {"NM_007294.4:c.5G>A": None} + + async def test_skips_entries_without_input_key(self): + """Entries that have no 'input' key are skipped entirely.""" + response = [{"most_severe_consequence": "missense_variant"}] + with patch("mavedb.lib.vep.request_with_backoff", return_value=_mock_response(response)): + result = await get_functional_consequence(["NM_007294.4:c.5G>A"]) + + assert result == {} + + async def test_handles_multiple_variants(self): + """All variants in a batch response are extracted correctly.""" + with patch("mavedb.lib.vep.request_with_backoff", return_value=_mock_response(VEP_RESPONSE_MULTIPLE)): + result = await get_functional_consequence(["NM_007294.4:c.5G>A", "NM_007294.4:c.10C>T"]) + + assert result == { + "NM_007294.4:c.5G>A": "missense_variant", + "NM_007294.4:c.10C>T": "synonymous_variant", + } + + async def test_raises_if_more_than_200_variants(self): + """Passing more than 200 HGVS strings raises ValueError before any HTTP call.""" + with pytest.raises(ValueError, match="maximum of 200"): + await get_functional_consequence(["NM_007294.4:c.1A>T"] * 201) diff --git a/tests/worker/jobs/conftest.py b/tests/worker/jobs/conftest.py index 109595d7..de835a56 100644 --- a/tests/worker/jobs/conftest.py +++ b/tests/worker/jobs/conftest.py @@ -1230,9 +1230,9 @@ def setup_sample_variants_for_vep(session, with_populated_domain_data, mock_work variant = Variant( urn="urn:variant:test-variant-for-vep", score_set_id=score_set.id, - hgvs_nt="NM_007294.4:c.5G>A", + hgvs_nt="NM_007294.4:c.5A>G", hgvs_pro="NP_009225.1:p.Cys2Tyr", - data={"hgvs_c": "NM_007294.4:c.5G>A", "hgvs_p": "NP_009225.1:p.Cys2Tyr"}, + data={"hgvs_c": "NM_007294.4:c.5A>G", "hgvs_p": "NP_009225.1:p.Cys2Tyr"}, ) session.add(variant) session.commit() @@ -1241,8 +1241,40 @@ def setup_sample_variants_for_vep(session, with_populated_domain_data, mock_work current=True, mapped_date="2024-01-01T00:00:00Z", mapping_api_version="1.0.0", - post_mapped={"type": "Allele", "expressions": [{"value": "NM_007294.4:c.5G>A", "syntax": "hgvs.c"}]}, - hgvs_assay_level="NM_007294.4:c.5G>A", + post_mapped={"type": "Allele", "expressions": [{"value": "NM_007294.4:c.5A>G", "syntax": "hgvs.c"}]}, + hgvs_assay_level="NM_007294.4:c.5A>G", + ) + session.add(mapped_variant) + session.commit() + return variant, mapped_variant + + +@pytest.fixture +def setup_sample_protein_variant_for_vep(session, with_populated_domain_data, mock_worker_ctx, sample_populate_vep_run): + """Setup a protein HGVS variant (NP_ accession) that VEP cannot resolve directly. + + VEP's /vep/human/hgvs endpoint does not return results for protein HGVS strings like + NP_009225.1:p.Val1696His, so these must be recoded via Variant Recoder first. This fixture + exercises the recoder fallback path end-to-end. + """ + score_set = session.get(ScoreSet, sample_populate_vep_run.job_params["score_set_id"]) + + variant = Variant( + urn="urn:variant:test-protein-variant-for-vep", + score_set_id=score_set.id, + hgvs_pro="NP_009225.1:p.Val1696His", + data={"hgvs_p": "NP_009225.1:p.Val1696His"}, + ) + session.add(variant) + session.commit() + + mapped_variant = MappedVariant( + variant_id=variant.id, + current=True, + mapped_date="2024-01-01T00:00:00Z", + mapping_api_version="1.0.0", + post_mapped={"type": "Allele", "expressions": [{"value": "NP_009225.1:p.Val1696His", "syntax": "hgvs.p"}]}, + hgvs_assay_level="NP_009225.1:p.Val1696His", ) session.add(mapped_variant) session.commit() diff --git a/tests/worker/jobs/external_services/network/test_vep.py b/tests/worker/jobs/external_services/network/test_vep.py index e53013c9..61071fc4 100644 --- a/tests/worker/jobs/external_services/network/test_vep.py +++ b/tests/worker/jobs/external_services/network/test_vep.py @@ -54,3 +54,43 @@ async def test_populate_vep_e2e( ) ).one() assert annotation.status == AnnotationStatus.SUCCESS + + async def test_populate_vep_e2e_with_recoder_path( + self, + session, + arq_redis, + arq_worker, + sample_populate_vep_run_pipeline, + sample_populate_vep_pipeline, + setup_sample_protein_variant_for_vep, + ): + """VEP job uses Variant Recoder for a protein HGVS (NP_ accession) that VEP cannot resolve directly. + + NP_009225.1:p.Val1696His is a BRCA1 protein variant that VEP's /vep/human/hgvs endpoint + does not return a consequence for. The job must fall back to Variant Recoder, recode it + to a genomic HGVS, and then re-query VEP with the recoded string. + """ + _, mapped_variant = setup_sample_protein_variant_for_vep + + await arq_redis.enqueue_job("populate_vep_for_score_set", sample_populate_vep_run_pipeline.id) + await arq_worker.async_run() + await arq_worker.run_check() + + session.refresh(sample_populate_vep_run_pipeline) + assert sample_populate_vep_run_pipeline.status == JobStatus.SUCCEEDED + + session.refresh(sample_populate_vep_pipeline) + assert sample_populate_vep_pipeline.status == PipelineStatus.SUCCEEDED + + session.refresh(mapped_variant) + assert mapped_variant.vep_functional_consequence is not None + assert mapped_variant.vep_access_date is not None + + annotation = session.scalars( + select(VariantAnnotationStatus).where( + VariantAnnotationStatus.variant_id == mapped_variant.variant_id, + VariantAnnotationStatus.annotation_type == AnnotationType.VEP_FUNCTIONAL_CONSEQUENCE, + VariantAnnotationStatus.current.is_(True), + ) + ).one() + assert annotation.status == AnnotationStatus.SUCCESS diff --git a/tests/worker/jobs/external_services/test_clingen.py b/tests/worker/jobs/external_services/test_clingen.py index 57e2dcc5..616263df 100644 --- a/tests/worker/jobs/external_services/test_clingen.py +++ b/tests/worker/jobs/external_services/test_clingen.py @@ -160,7 +160,7 @@ async def test_submit_score_set_mappings_to_car_no_registered_alleles( assert ann.status == "failed" assert ann.annotation_type == "clingen_allele_id" - async def test_submit_score_set_mappings_to_car_no_linked_alleles( + async def test_submit_score_set_mappings_to_car_all_car_errors( self, mock_worker_ctx, session, @@ -184,10 +184,18 @@ async def test_submit_score_set_mappings_to_car_no_linked_alleles( dummy_variant_mapping_job_run, ) - # Patch ClinGenAlleleRegistryService to return registered alleles that do not match submitted HGVS + # Build error responses for every submitted HGVS — CAR explicitly rejected all of them + mapped_variants = session.scalars(select(MappedVariant)).all() registered_alleles_mock = [ - {"@id": "CA123456", "type": "nucleotide", "genomicAlleles": [{"hgvs": "NC_000007.14:g.140453136A>C"}]}, - {"@id": "CA234567", "type": "nucleotide", "genomicAlleles": [{"hgvs": "NC_000007.14:g.140453136A>G"}]}, + { + "errorType": "InvalidHGVS", + "hgvs": get_hgvs_from_post_mapped(mv.post_mapped) or "", + "message": "Invalid HGVS expression.", + "description": "", + "inputLine": get_hgvs_from_post_mapped(mv.post_mapped) or "", + "position": str(i), + } + for i, mv in enumerate(mapped_variants) ] with ( @@ -892,10 +900,12 @@ async def test_submit_score_set_mappings_to_car_no_linked_alleles( dummy_variant_mapping_job_run, ) - # Patch ClinGenAlleleRegistryService to return registered alleles that do not match submitted HGVS + # Patch ClinGenAlleleRegistryService to return only errors with no linked alleles registered_alleles_mock = [ - {"@id": "CA123456", "type": "nucleotide", "genomicAlleles": [{"hgvs": "NC_000007.14:g.140453136A>C"}]}, - {"@id": "CA234567", "type": "nucleotide", "genomicAlleles": [{"hgvs": "NC_000007.14:g.140453136A>G"}]}, + {"errorType": "InvalidHGVS", "hgvs": "test"}, + {"errorType": "InvalidHGVS", "hgvs": "test2"}, + {"errorType": "InvalidHGVS", "hgvs": "test3"}, + {"errorType": "InvalidHGVS", "hgvs": "test4"}, ] with ( diff --git a/tests/worker/jobs/external_services/test_vep.py b/tests/worker/jobs/external_services/test_vep.py index 02db5f26..611c3389 100644 --- a/tests/worker/jobs/external_services/test_vep.py +++ b/tests/worker/jobs/external_services/test_vep.py @@ -11,6 +11,9 @@ from mavedb.lib.types.workflow import JobExecutionOutcome from mavedb.models.enums.annotation_type import AnnotationType from mavedb.models.enums.job_pipeline import AnnotationFailureCategory, AnnotationStatus, JobStatus +from mavedb.models.mapped_variant import MappedVariant +from mavedb.models.score_set import ScoreSet +from mavedb.models.variant import Variant from mavedb.models.variant_annotation_status import VariantAnnotationStatus from mavedb.worker.jobs.external_services.vep import populate_vep_for_score_set from mavedb.worker.lib.managers.job_manager import JobManager @@ -245,6 +248,143 @@ async def test_vep_failure_after_recoder_annotated_as_failed( assert annotation.status == AnnotationStatus.FAILED assert annotation.failure_category == AnnotationFailureCategory.EXTERNAL_REFERENCE_NOT_FOUND + async def test_vep_none_consequence_routes_to_recoder( + self, + session, + with_populated_domain_data, + with_populate_vep_job, + mock_worker_ctx, + sample_populate_vep_run, + setup_sample_variants_for_vep, + ): + """A None consequence from VEP Phase 1 is treated as a miss and routed through Recoder. + + VEP can return an entry with most_severe_consequence=None when it recognises a variant + but cannot classify it. This should not silently fall into the UNKNOWN outcome branch — + instead it should be treated identically to an absent entry and sent to Recoder. + """ + _, mapped_variant = setup_sample_variants_for_vep + hgvs = mapped_variant.hgvs_assay_level + genomic_hgvs = "NC_000017.11:g.43094692C>T" + + with ( + patch( + "mavedb.worker.jobs.external_services.vep.get_functional_consequence", + side_effect=[ + {hgvs: None}, # VEP knows the variant but returns no consequence + {genomic_hgvs: "missense_variant"}, # Phase 3 on recoded genomic string + ], + ), + patch( + "mavedb.worker.jobs.external_services.vep.run_variant_recoder", + return_value={hgvs: [genomic_hgvs]}, + ), + ): + result = await populate_vep_for_score_set( + mock_worker_ctx, + 1, + JobManager(session, mock_worker_ctx["redis"], sample_populate_vep_run.id), + ) + + assert result.status == JobStatus.SUCCEEDED + assert result.data["variants_with_consequences"] == 1 + + session.refresh(mapped_variant) + assert mapped_variant.vep_functional_consequence == "missense_variant" + + async def test_most_severe_consequence_selected_from_multiple_genomic_hgvs( + self, + session, + with_populated_domain_data, + with_populate_vep_job, + mock_worker_ctx, + sample_populate_vep_run, + setup_sample_variants_for_vep, + ): + """When Recoder returns multiple genomic strings with different consequences, the most severe wins.""" + _, mapped_variant = setup_sample_variants_for_vep + hgvs = mapped_variant.hgvs_assay_level + genomic_less_severe = "NC_000017.11:g.43094692C>T" # → missense_variant + genomic_more_severe = "NC_000017.11:g.43094692C>A" # → stop_gained + + with ( + patch( + "mavedb.worker.jobs.external_services.vep.get_functional_consequence", + side_effect=[ + {}, # Phase 1: VEP misses + { + genomic_less_severe: "missense_variant", + genomic_more_severe: "stop_gained", + }, # Phase 3: two different consequences + ], + ), + patch( + "mavedb.worker.jobs.external_services.vep.run_variant_recoder", + return_value={hgvs: [genomic_less_severe, genomic_more_severe]}, + ), + ): + result = await populate_vep_for_score_set( + mock_worker_ctx, + 1, + JobManager(session, mock_worker_ctx["redis"], sample_populate_vep_run.id), + ) + + assert result.data["variants_with_consequences"] == 1 + + session.refresh(mapped_variant) + assert mapped_variant.vep_functional_consequence == "stop_gained" + + async def test_multiple_variants_sharing_hgvs_all_get_consequence( + self, + session, + with_populated_domain_data, + with_populate_vep_job, + mock_worker_ctx, + sample_populate_vep_run, + setup_sample_variants_for_vep, + ): + """All mapped variants that share an HGVS string each receive the consequence.""" + _, mapped_variant_1 = setup_sample_variants_for_vep + hgvs = mapped_variant_1.hgvs_assay_level + + score_set = session.get(ScoreSet, sample_populate_vep_run.job_params["score_set_id"]) + variant_2 = Variant( + urn="urn:variant:test-variant-for-vep-2", + score_set_id=score_set.id, + hgvs_nt=hgvs, + data={"hgvs_c": hgvs}, + ) + session.add(variant_2) + session.commit() + mapped_variant_2 = MappedVariant( + variant_id=variant_2.id, + current=True, + mapped_date="2024-01-01T00:00:00Z", + mapping_api_version="1.0.0", + post_mapped={"type": "Allele", "expressions": [{"value": hgvs, "syntax": "hgvs.c"}]}, + hgvs_assay_level=hgvs, + ) + session.add(mapped_variant_2) + session.commit() + + with patch( + "mavedb.worker.jobs.external_services.vep.get_functional_consequence", + return_value={hgvs: "missense_variant"}, + ): + result = await populate_vep_for_score_set( + mock_worker_ctx, + 1, + JobManager(session, mock_worker_ctx["redis"], sample_populate_vep_run.id), + ) + + assert result.data["variants_processed"] == 2 + assert result.data["variants_with_consequences"] == 2 + + session.refresh(mapped_variant_1) + session.refresh(mapped_variant_2) + assert mapped_variant_1.vep_functional_consequence == "missense_variant" + assert mapped_variant_2.vep_functional_consequence == "missense_variant" + async def test_vep_batch_api_exception_raises( self, session,