Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion .dlt/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,19 @@ http_show_error_body = "true"
log_level = "INFO"
log_format = "JSON"

# REST Client retries
# default: 60
request_timeout = 60
# default: 5
request_max_attempts = 2
# default: 1
request_backoff_factor = 1.5
# default: 300
request_max_retry_delay = 600

# do not raise an error if a job fails
load.raise_on_failed_jobs = "false"

[extract]
workers = 10

Expand All @@ -14,7 +27,7 @@ workers = 10
destination_type = "filesystem"
bucket_url = "/output_dir"

[destination.minio]
[destination.s3]
destination_type = "filesystem"
bucket_url = "s3://cts/io/ialarmedalien/datasets/cts_output/bloooooop"

Expand Down
20 changes: 14 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies = [
"defusedxml>=0.7.1",
"delta-spark>=4.1.0",
"dlt[deltalake,duckdb,filesystem,parquet]>=1.22.2",
"frictionless[aws]>=5.18.1",
"lxml>=6.0.2",
"pydantic>=2.12.5",
"pydantic-settings>=2.12.0",
Expand All @@ -29,6 +30,7 @@ ncbi_rest_api = "cdm_data_loaders.pipelines.ncbi_rest_api:cli"
[dependency-groups]
dev = [
"berdl-notebook-utils",
"moto[s3]>=5.1.22",
"pytest>=9.0.2",
"pytest-asyncio>=1.3.0",
"pytest-cov>=7.1.0",
Expand All @@ -46,14 +48,12 @@ xml = [
]

[project.optional-dependencies]
# for minio interactions -- see utils/minio.py for more details
minio = [
# for s3 interactions -- see utils/s3.py for more details
s3 = [
"boto3[crt]>=1.42.0",
"tqdm>=4.67.3",
]

biopython = []

[tool.ruff]
line-length = 120
target-version = "py313"
Expand Down Expand Up @@ -170,7 +170,8 @@ ignore = [

[tool.ruff.lint.per-file-ignores]
"*.ipynb" = ["T201"] # ignore printing in notebooks
"tests/**/*.py" = ["S101", "T201", "FBT001", "FBT002"] # use of assert
"tests/**/*.py" = ["S101", "T201", "FBT001", "FBT002"] # use of assert, booleans
"tests/utils/test_s3.py" = ["ANN401"]
"**/__init__.py" = ["D104"]

[tool.ruff.lint.mccabe]
Expand All @@ -190,7 +191,7 @@ log_cli = true
log_cli_level = "INFO"
log_level = "INFO"
addopts = ["-v"]
markers = ["requires_spark: must be run in an environment where spark is available", "slow_test: does what it says on the tin"]
markers = ["requires_spark: must be run in an environment where spark is available", "s3: tests that mock s3 interactions", "slow_test: does what it says on the tin"]

# environment settings for running tests
[tool.pytest_env]
Expand All @@ -207,6 +208,13 @@ BERDL_HIVE_METASTORE_URI = "thrift://localhost:9083"
SPARK_CLUSTER_MANAGER_API_URL = "http://localhost:8000"
GOVERNANCE_API_URL = "http://localhost:8000"
DATALAKE_MCP_SERVER_URL = "http://localhost:8080"
AWS_ACCESS_KEY_ID = "whatever"
AWS_SECRET_ACCESS_KEY = "whatever"
AWS_SECURITY_TOKEN = "whatever"
AWS_SESSION_TOKEN = "whatever"
AWS_DEFAULT_REGION = "us-east-1"
AWS_ENDPOINT_URL = { unset = true }
AWS_ENDPOINT_URL_S3 = { unset = true}

[tool.uv.sources]
cdm-schema = { git = "https://github.com/kbase/cdm-schema.git" }
Expand Down
4 changes: 2 additions & 2 deletions scripts/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ set -euo pipefail

# Ensure at least one argument is provided
if [ "$#" -eq 0 ]; then
echo "Usage: $0 {uniref|uniprot|xml_split|test} [args...]"
echo "Usage: $0 {uniref|uniprot|ncbi_rest_api|xml_split|test} [args...]"
exit 1
fi

Expand Down Expand Up @@ -35,7 +35,7 @@ case "$cmd" in
exec /usr/bin/tini -- /bin/bash
;;
*)
echo "Error: unknown command '$cmd'; valid commands are 'uniref', 'uniprot', or 'xml_split'." >&2
echo "Error: unknown command '$cmd'; valid commands are 'uniref', 'uniprot', 'ncbi_rest_api', or 'xml_split'." >&2
exit 1
;;
esac
1 change: 0 additions & 1 deletion scripts/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ cd "$SCRIPT_DIR"

# use the system packages in this virtual environment
uv venv --system-site-packages
# source $(SCRIPT_DIR)/.venv/bin/activate

# run the tests using the active venv and with the dev dependencies installed.
uv run --active --frozen --group dev pytest --cov=src --cov-report=xml
6 changes: 3 additions & 3 deletions src/cdm_data_loaders/parsers/uniprot/idmapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from cdm_data_loaders.core.pipeline_run import PipelineRun
from cdm_data_loaders.readers.dsv import read
from cdm_data_loaders.utils.cdm_logger import get_cdm_logger
from cdm_data_loaders.utils.minio import list_remote_dir_contents
from cdm_data_loaders.utils.s3 import list_matching_objects
from cdm_data_loaders.utils.spark_delta import APPEND, set_up_workspace, write_delta
from cdm_data_loaders.validation.dataframe_validator import DataFrameValidator, Validator
from cdm_data_loaders.validation.df_nullable_fields import validate as check_nullable_fields
Expand Down Expand Up @@ -117,7 +117,7 @@
@click.option(
"--source",
required=True,
help="Full path to the source directory containing ID mapping file(s). Files are assumed to be in the CDM s3 minio bucket, and the s3a://cdm-lake prefix may be omitted.",
help="Full path to the S3 source directory containing ID mapping file(s), including the bucket name but excluding the protocol (`s3a://` or `s3://`). For example: `cdm-lake/datasets/raw_data/idmapping/`.",
)
@click.option(
"--namespace",
Expand All @@ -143,7 +143,7 @@
(spark, delta_ns) = set_up_workspace(APP_NAME, namespace, tenant_name)

# TODO: other locations / local files?
bucket_list = list_remote_dir_contents(source.removeprefix("s3a://cdm-lake/"))
bucket_list = list_matching_objects(source)

Check warning on line 146 in src/cdm_data_loaders/parsers/uniprot/idmapping.py

View check run for this annotation

Codecov / codecov/patch

src/cdm_data_loaders/parsers/uniprot/idmapping.py#L146

Added line #L146 was not covered by tests
for file in bucket_list:
# file names are in the 'Key' value
# 'tenant-general-warehouse/kbase/datasets/uniprot/id_mapping/id_mapping_part_001.tsv.gz'
Expand Down
19 changes: 18 additions & 1 deletion src/cdm_data_loaders/pipelines/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Any

import dlt
from dlt.common.runtime.slack import send_slack_message
from dlt.extract.items import DataItemWithMeta
from pydantic import ValidationError
from pydantic_settings import BaseSettings, SettingsError
Expand Down Expand Up @@ -60,9 +61,25 @@ def run_pipeline(

destination = dlt.destination(config.destination, **(destination_kwargs or {}))
pipeline = dlt.pipeline(destination=destination, **(pipeline_kwargs or {}))
load_info = pipeline.run(resource, **(pipeline_run_kwargs or {}))

slack_hook: str | None = pipeline.runtime_config.slack_incoming_hook

if not slack_hook:
logger.info("Slack webhook not configured; no Slack alerts will be sent.")

try:
load_info = pipeline.run(resource, **(pipeline_run_kwargs or {}))
except Exception as e:
err_msg = f"Pipeline failed: {e!s}"
logger.exception(err_msg)
if slack_hook:
send_slack_message(slack_hook, err_msg)
return

logger.info(load_info)
logger.info("Work complete!")
if slack_hook:
send_slack_message(slack_hook, "Pipeline completed successfully!")


def stream_xml_file_resource(
Expand Down
2 changes: 1 addition & 1 deletion src/cdm_data_loaders/pipelines/cts_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
INPUT_MOUNT = "/input_dir"
OUTPUT_MOUNT = "/output_dir"

VALID_DESTINATIONS = ["local_fs", "minio"]
VALID_DESTINATIONS = ["local_fs", "s3"]
DEFAULT_BATCH_SIZE = 50


Expand Down
72 changes: 69 additions & 3 deletions src/cdm_data_loaders/pipelines/ncbi_rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
)
from pydantic import AliasChoices, Field, model_validator
from pydantic_settings import CliSuppress, SettingsConfigDict
from requests.exceptions import HTTPError

from cdm_data_loaders.pipelines.core import (
run_cli,
Expand All @@ -38,6 +39,7 @@

DATASET = "dataset"
ANNOTATION = "annotation"
ERROR = "error"

dlt_logger = logging.getLogger("dlt")

Expand Down Expand Up @@ -143,6 +145,49 @@
)


def add_error(
error_list: list[dict[str, Any]],
error: Exception,
error_from: str,
assembly_id: str | None = None,
assembly_id_list: list[str] | None = None,
) -> None:
"""Add an error to the list of output errors.

:param error_list: running list of errors
:type error_list: list[dict[str, Any]]
:param error: the error object from the exception handler
:type error: Exception
:param error_from: what type of request was being made when the error occurred
:type error_from: str
:param assembly_id: ID of the assembly being fetched when the error occurred, defaults to None
:type assembly_id: str | None, optional
:param assembly_id_list: list of IDs being fetched when the error occurred, defaults to None
:type assembly_id_list: list[str] | None, optional
"""
err_args = {
"assembly_id": assembly_id or None,
"assembly_id_list": assembly_id_list or None,
"error_class": type(error).__name__,
"error_from": error_from,
"message": str(error),
"request_url": None,
"status": None,
"reason": None,
}

if isinstance(error, HTTPError):
# save the URL, status code, and error message
err_args = {
**err_args,
"request_url": error.request.url,
"status": error.response.status_code,
"reason": error.response.reason,
}

error_list.append(err_args)


def get_assembly_reports(assembly_id_list: list[str]) -> dict[str, Any]:
"""Retrieve dataset and annotation reports for a list of IDs from the NCBI datasets API.

Expand All @@ -154,14 +199,27 @@
if not assembly_id_list:
return {}

errors = []

# N.b. invalid IDs will not be present in dataset_reports
dataset_reports = get_dataset_reports(assembly_id_list)
annotation_reports = {assembly_id: get_annotation_report(assembly_id) for assembly_id in assembly_id_list}
dataset_reports = {}
try:
dataset_reports = get_dataset_reports(assembly_id_list)
except Exception as e: # noqa: BLE001
add_error(errors, e, "dataset_report", assembly_id_list=assembly_id_list)

annotation_reports: dict[str, Any] = {}
for assembly_id in assembly_id_list:
try:
annotation_reports[assembly_id] = get_annotation_report(assembly_id)
except Exception as e: # noqa: BLE001
add_error(errors, e, "annotation_report", assembly_id=assembly_id)

# ensure every assembly_id in the list has either the downloaded dataset_report or None
return {
DATASET: {assembly_id: dataset_reports.get(assembly_id) for assembly_id in assembly_id_list},
ANNOTATION: {assembly_id: annotation_reports.get(assembly_id) for assembly_id in assembly_id_list},
ERROR: errors,
}


Expand All @@ -172,6 +230,7 @@

dlt_logger.info("fetching dataset reports for:\n%s", ", ".join(sorted(assembly_id_list)))
assembly_dataset_reports = []

for page in ncbi_genome_client.paginate(
f"{'%2C'.join(assembly_id_list)}/dataset_report",
params={
Expand All @@ -182,7 +241,7 @@
assembly_dataset_reports.extend(page)

# return dataset reports, indexed by assembly_id
# invalid IDs are silently dropped by the API
# invalid IDs are silently dropped by the NCBI REST API
datasets = {report.get("accession"): report for report in assembly_dataset_reports}
# fill in the missing gaps in assembly_id_list with None
return {assembly_id: datasets.get(assembly_id) for assembly_id in assembly_id_list}
Expand All @@ -192,6 +251,7 @@
"""Fetch the annotation report for an assembly from the NCBI datasets REST API."""
dlt_logger.info("fetching annotation report for %s", assembly_id)
page_data = []

for page in ncbi_genome_client.paginate(
f"{assembly_id}/annotation_report",
params={
Expand All @@ -200,6 +260,7 @@
hooks=REST_CLIENT_HOOKS, # type: ignore[reportArgumentType]
):
page_data.extend(page)

# page_data is empty if the ID is invalid
return page_data or None

Expand Down Expand Up @@ -239,6 +300,11 @@

dataset_reports: dict[str, dict[str, Any]] = assembly_reports.get(DATASET) # type: ignore[reportAssignmentType]
annotation_reports: dict[str, list[dict[str, Any]]] = assembly_reports.get(ANNOTATION) # type: ignore[reportAssignmentType]
error_reports: list[dict[str, Any]] = assembly_reports.get(ERROR) # type: ignore[reportAssignmentType]

Check warning on line 303 in src/cdm_data_loaders/pipelines/ncbi_rest_api.py

View check run for this annotation

Codecov / codecov/patch

src/cdm_data_loaders/pipelines/ncbi_rest_api.py#L303

Added line #L303 was not covered by tests

if error_reports:
yield dlt.mark.with_table_name(error_reports, "ncbi_import_error")

Check warning on line 306 in src/cdm_data_loaders/pipelines/ncbi_rest_api.py

View check run for this annotation

Codecov / codecov/patch

src/cdm_data_loaders/pipelines/ncbi_rest_api.py#L305-L306

Added lines #L305 - L306 were not covered by tests

# yield the raw data to save as tables
yield dlt.mark.with_table_name(
[{"assembly_id": assembly_id, **(report or {})} for assembly_id, report in dataset_reports.items()],
Expand Down
Loading
Loading