Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ The repo provides a Docker container that can be used to run several import pipe
Current endpoints include:

- `test`: run the unit tests that do _not_ require external dependencies like Spark
- `uniprot`: run the UniProtKB (UniProt protein database) import pipeline; see [the UniProtKB pipeline](src/cdm_data_loaders/pipelines/uniprot_kb_pipeline.py) for arguments
- `uniref`: run the UniRef import pipeline; the [the UniRef pipeline](src/cdm_data_loaders/pipelines/uniref_pipeline.py) for arguments
- `uniprot`: run the UniProtKB (UniProt protein database) import pipeline; see [the UniProtKB pipeline](src/cdm_data_loaders/pipelines/uniprot_kb.py) for arguments
- `uniref`: run the UniRef import pipeline; the [the UniRef pipeline](src/cdm_data_loaders/pipelines/uniref.py) for arguments


## Development
Expand Down
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies = [
"click>=8.3.1",
"defusedxml>=0.7.1",
"delta-spark>=4.1.0",
"dlt[deltalake,filesystem,parquet]>=1.22.2",
"dlt[deltalake,duckdb,filesystem,parquet]>=1.22.2",
"lxml>=6.0.2",
"pydantic>=2.12.5",
"pydantic-settings>=2.12.0",
Expand All @@ -23,8 +23,9 @@ dependencies = [

[project.scripts]
idmapping = "cdm_data_loaders.parsers.uniprot.idmapping:cli"
uniprot_pipeline = "cdm_data_loaders.pipelines.uniprot_kb_pipeline:cli"
uniref_pipeline = "cdm_data_loaders.pipelines.uniref_pipeline:cli"
uniprot = "cdm_data_loaders.pipelines.uniprot_kb:cli"
uniref = "cdm_data_loaders.pipelines.uniref:cli"
ncbi_rest_api = "cdm_data_loaders.pipelines.ncbi_rest_api:cli"

[dependency-groups]
dev = [
Expand Down
16 changes: 10 additions & 6 deletions scripts/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ set -euo pipefail

# Ensure at least one argument is provided
if [ "$#" -eq 0 ]; then
echo "Usage: $0 {uniref|uniprot|test|xml_split} [args...]"
echo "Usage: $0 {uniref|uniprot|xml_split|test} [args...]"
exit 1
fi

Expand All @@ -16,13 +16,17 @@ case "$cmd" in
exec /usr/bin/tini -- xml_file_splitter "$@"
;;
uniref)
# Run the uniref pipeline with any additional arguments via tini
exec /usr/bin/tini -- uv run --no-sync uniref_pipeline "$@"
# Run the uniref pipeline with any additional arguments
exec /usr/bin/tini -- uv run --no-sync uniref "$@"
;;
uniprot)
# Run the uniprot pipeline with any additional arguments via tini
exec /usr/bin/tini -- uv run --no-sync uniprot_pipeline "$@"
# Run the uniprot pipeline with any additional arguments
exec /usr/bin/tini -- uv run --no-sync uniprot "$@"
;;
# ncbi_rest_api)
# # Run the NCBI datasets API importer
# exec /usr/bin/tini -- uv run --no-sync ncbi_rest_api "$@"
# ;;
test)
# run the tests
exec /usr/bin/tini -- uv run --no-sync pytest -m "not requires_spark"
Expand All @@ -31,7 +35,7 @@ case "$cmd" in
exec /usr/bin/tini -- /bin/bash
;;
*)
echo "Error: unknown command '$cmd'; valid commands are 'uniref' or 'uniprot'." >&2
echo "Error: unknown command '$cmd'; valid commands are 'uniref', 'uniprot', or 'xml_split'." >&2
exit 1
;;
esac
2 changes: 1 addition & 1 deletion src/cdm_data_loaders/parsers/uniprot/uniref.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
NS = "ns"
UNIREF_NS = {NS: UNIREF_URL, "": UNIREF_URL}
UNIREF = "UniRef"
UNIREF_VARIANTS = [100, 90, 50]
UNIREF_VARIANTS = ["100", "90", "50"]
ENTITY_ID = "entity_id"

PREFIX_TRANSLATION = {
Expand Down
4 changes: 4 additions & 0 deletions src/cdm_data_loaders/pipelines/cts_defaults.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Common defaults for running pipelines on the KBase CTS."""

INPUT_MOUNT = "/input_dir"
OUTPUT_MOUNT = "/output_dir"
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def run_pipeline(config: Settings) -> None:
:param config: config for running the pipeline.
:type config: Settings
"""
for uniprot_file in sorted(config.input_dir.glob("*.xml.gz")):
for uniprot_file in sorted(config.input_dir.glob("*.xml*")):
if config.start_at:
# get the integer part of the file name
f_int = uniprot_file.stem.replace("parts_", "")
Expand Down
153 changes: 153 additions & 0 deletions src/cdm_data_loaders/pipelines/uniref.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""DLT pipeline to import UniRef data."""

import datetime
from collections.abc import Generator
from typing import Any

import dlt
from dlt.extract.items import DataItemWithMeta
from pydantic import AliasChoices, Field, ValidationError, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict, SettingsError

from cdm_data_loaders.parsers.uniprot.uniref import UNIREF_URL, UNIREF_VARIANTS, parse_uniref_entry
from cdm_data_loaders.pipelines.cts_defaults import INPUT_MOUNT
from cdm_data_loaders.utils.cdm_logger import get_cdm_logger
from cdm_data_loaders.utils.file_system import BatchCursor
from cdm_data_loaders.utils.xml_utils import stream_xml_file

logger = get_cdm_logger()

APP_NAME = "uniref_importer"

VALID_DESTINATIONS = ["local_fs", "minio"]
DEFAULT_BATCH_SIZE = 50


class Settings(BaseSettings):
"""Configuration for running the UniRef import pipeline."""

model_config = SettingsConfigDict(
cli_parse_args=True,
cli_prog_name="uniref",
cli_exit_on_error=False,
cli_ignore_unknown_args=True,
)
input_dir: str = Field(
default=INPUT_MOUNT,
description="Location of directory containing UniRef XML files to import",
# explicitly allow both kebab case and snake case
validation_alias=AliasChoices("i", "input_dir", "input-dir", "input_dir"),
)
destination: str = Field(
default="local_fs",
description=f"Destination configuration to use for data output. Choices: {VALID_DESTINATIONS}",
validation_alias=AliasChoices("d", "destination"),
)
uniref_variant: str = Field(
description=f"Which UniRef variant to import. Choices: {UNIREF_VARIANTS}",
validation_alias=AliasChoices("u", "uniref", "uniref-variant", "uniref_variant"),
)
start_at: int = Field(
default=0,
description="File to start import at",
validation_alias=AliasChoices("s", "start", "start-at", "start_at"),
)
output: str | None = Field(
default=None,
description="Location to save imported data to, if different from the default supplied by the destination config",
validation_alias=AliasChoices("o", "output"),
)

@field_validator("uniref_variant")
@classmethod
def validate_uniref_variant(cls, v: str) -> str:
"""Validate the uniref variant against valid choices.

:param v: uniref variant specified
:type v: str
:raises ValueError: if the uniref variant is not valid
:return: valid uniref variant
:rtype: str
"""
if v not in UNIREF_VARIANTS:
err_msg = f"uniref_variant must be one of {UNIREF_VARIANTS}, got '{v}'"
raise ValueError(err_msg)
return v

@field_validator("destination")
@classmethod
def validate_destination(cls, v: str) -> str:
"""Validate the destination against valid choices.

:param v: destination specified
:type v: str
:raises ValueError: if the destination is not valid
:return: valid destination
:rtype: str
"""
if v not in VALID_DESTINATIONS:
err_msg = f"destination must be one of {VALID_DESTINATIONS}, got '{v}'"
raise ValueError(err_msg)
return v


@dlt.resource(name="parse_uniref", parallelized=True)
def parse_uniref(config: Settings) -> Generator[DataItemWithMeta, Any]:
"""Parse the information from UniRef files, batch by batch.

:param config: config for running the pipeline.
:type config: Settings
"""
timestamp = datetime.datetime.now(tz=datetime.UTC)
batch_params: dict[str, Any] = {}
if config.start_at:
batch_params["start_at"] = config.start_at

batcher = BatchCursor(config.input_dir, batch_size=DEFAULT_BATCH_SIZE, **batch_params)
while uniref_files := batcher.get_batch():
for file_path in uniref_files:
logger.info("Reading from %s", str(file_path))
for n_entries, entry in enumerate(stream_xml_file(file_path, f"{{{UNIREF_URL}}}entry")):
parsed_entry = parse_uniref_entry(entry, timestamp, f"UniRef {config.uniref_variant}", file_path)
for table, rows in parsed_entry.items():
yield dlt.mark.with_table_name(rows, table)
if n_entries + 1 % 10000 == 0:
logger.info("Processed %d entries", n_entries + 1)


def run_pipeline(config: Settings) -> None:
"""Execute the pipeline.

:param config: config for running the pipeline.
:type config: Settings
"""
# check whether there is a custom output location; if so, set it in the config
if config.output:
dlt.config[f"destination.{config.destination}.bucket_url"] = config.output

pipeline = dlt.pipeline(
pipeline_name=f"uniref_{config.uniref_variant}",
destination=dlt.destination(config.destination, max_table_nesting=0),
dataset_name="uniprot_kb",
)
load_info = pipeline.run(parse_uniref(config), table_format="delta")
logger.info(load_info)
logger.info("Work complete!")


def cli() -> None:
"""CLI interface for the UniRef importer pipeline.

See the ``Settings`` object for parameters.
"""
try:
config = Settings() # pyright: ignore[reportCallIssue]
except (Exception, SettingsError, ValidationError) as e:
print(f"Error initialising config: {e}")
raise

run_pipeline(config)


if __name__ == "__main__":
cli()
132 changes: 0 additions & 132 deletions src/cdm_data_loaders/pipelines/uniref_pipeline.py

This file was deleted.

Loading
Loading