Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .dlt/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,12 @@ bucket_url = "/output_dir"

[destination.minio]
destination_type = "filesystem"
bucket_url = "s3://cdm-lake/tenant-general-warehouse/kbase/datasets/uniprot/"
bucket_url = "s3://cts/io/ialarmedalien/datasets/cts_output/bloooooop"

[data_writer]
# set buffer size for extract and normalize stages
buffer_max_items = 100000
# files will rotate after 100_000_000 items have been written
file_max_items = 100000000
# files will rotate when file size is greater than 1_000 MB
file_max_bytes = 1000000000
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Repo for CDM input data loading and wrangling
- [Loading genomes, contigs, and features](#loading-genomes-contigs-and-features)
- [Running bbmap stats and checkm2 on genome or contigset files](#running-bbmap-stats-and-checkm2-on-genome-or-contigset-files)
- [Changelog](#changelog)
- [v0.1.4](#v014)
- [v0.1.3](#v013)
- [v0.1.2](#v012)
- [v0.1.1](#v011)
Expand Down Expand Up @@ -168,6 +169,11 @@ where `path/to/genome_paths_file.json` specifies the path to the genome paths fi

## Changelog

### v0.1.4

- Add in NCBI REST API interface.


### v0.1.3

- Add in file batcher for use with file-based importers.
Expand Down
16 changes: 8 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "cdm-data-loaders"
version = "0.1.3"
version = "0.1.4"
description = "Data loaders and wranglers for the CDM."
requires-python = ">= 3.13"
readme = "README.md"
Expand All @@ -9,7 +9,7 @@ authors = [
]

dependencies = [
"bioregistry>=0.13.20",
"bioregistry>=0.13.31",
"boto3[crt]>=1.42.55",
"click>=8.3.1",
"defusedxml>=0.7.1",
Expand All @@ -18,11 +18,10 @@ dependencies = [
"lxml>=6.0.2",
"pydantic>=2.12.5",
"pydantic-settings>=2.12.0",
"thefuzz>=0.22.1",
]

[project.scripts]
idmapping = "cdm_data_loaders.parsers.uniprot.idmapping:cli"
# idmapping = "cdm_data_loaders.parsers.uniprot.idmapping:cli"
uniprot = "cdm_data_loaders.pipelines.uniprot_kb:cli"
uniref = "cdm_data_loaders.pipelines.uniref:cli"
ncbi_rest_api = "cdm_data_loaders.pipelines.ncbi_rest_api:cli"
Expand All @@ -32,9 +31,10 @@ dev = [
"berdl-notebook-utils",
"pytest>=9.0.2",
"pytest-asyncio>=1.3.0",
"pytest-cov>=7.0.0",
"pytest-env>=1.5.0",
"ruff>=0.15.0",
"pytest-cov>=7.1.0",
"pytest-env>=1.6.0",
"pytest-recording>=0.13.4",
"ruff>=0.15.4",
]
models = [
"genson>=1.3.0",
Expand Down Expand Up @@ -181,7 +181,7 @@ max-complexity = 15
convention = "google"

[build-system]
requires = ["uv_build>=0.9.9,<0.11.0"]
requires = ["uv_build>=0.9.9,<0.20.0"]
build-backend = "uv_build"

[tool.pytest]
Expand Down
8 changes: 4 additions & 4 deletions scripts/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ case "$cmd" in
# Run the uniprot pipeline with any additional arguments
exec /usr/bin/tini -- uv run --no-sync uniprot "$@"
;;
# ncbi_rest_api)
# # Run the NCBI datasets API importer
# exec /usr/bin/tini -- uv run --no-sync ncbi_rest_api "$@"
# ;;
ncbi_rest_api)
# Run the NCBI datasets API importer
exec /usr/bin/tini -- uv run --no-sync ncbi_rest_api "$@"
;;
test)
# run the tests
exec /usr/bin/tini -- uv run --no-sync pytest -m "not requires_spark"
Expand Down
2 changes: 1 addition & 1 deletion scripts/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ cd "$SCRIPT_DIR"

# use the system packages in this virtual environment
uv venv --system-site-packages
source $(SCRIPT_DIR)/.venv/bin/activate
# source $(SCRIPT_DIR)/.venv/bin/activate

# run the tests using the active venv and with the dev dependencies installed.
uv run --active --frozen --group dev pytest --cov=src --cov-report=xml
4 changes: 2 additions & 2 deletions src/cdm_data_loaders/parsers/uniprot/uniref.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from cdm_data_loaders.utils.cdm_logger import get_cdm_logger

UNIREF_URL = "http://uniprot.org/uniref"
UNIREF_URL_BRKT = f"{{{UNIREF_URL}}}"
ENTRY_XML_TAG = f"{{{UNIREF_URL}}}entry"
NS = "ns"
UNIREF_NS = {NS: UNIREF_URL, "": UNIREF_URL}
UNIREF = "UniRef"
Expand Down Expand Up @@ -126,7 +126,7 @@ def extract_cross_refs(


def parse_uniref_entry(
entry: Element, timestamp: datetime.datetime, uniref_variant: str, file_path: str | Path
entry: Element, timestamp: datetime.datetime, file_path: str | Path, uniref_variant: str
) -> dict[str, list[dict[str, str]]]:
"""Parse a single UniRef <entry> element into CDM-friendly row tuples."""
# Cluster basic info
Expand Down
95 changes: 95 additions & 0 deletions src/cdm_data_loaders/pipelines/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""Common reusable pipeline elements."""

import datetime
from collections.abc import Callable, Generator
from typing import Any

import dlt
from dlt.extract.items import DataItemWithMeta
from pydantic import ValidationError
from pydantic_settings import BaseSettings, SettingsError

from cdm_data_loaders.pipelines.cts_defaults import DEFAULT_BATCH_SIZE, BatchedFileInputSettings, CtsDefaultSettings
from cdm_data_loaders.utils.cdm_logger import get_cdm_logger
from cdm_data_loaders.utils.file_system import BatchCursor
from cdm_data_loaders.utils.xml_utils import stream_xml_file

logger = get_cdm_logger()


def run_cli(settings_cls: type[BaseSettings], pipeline_fn: Callable[[Any], None]) -> None:
"""Generic CLI entry point for any pipeline.

:param settings_cls: the Settings class to instantiate
:param pipeline_fn: the run_pipeline function to call with the config
"""
try:
config = settings_cls()
except (SettingsError, ValidationError) as e:
print(f"Error initialising config: {e}")
raise
except Exception as e:
print(f"Unexpected error setting up config: {e}")
raise

pipeline_fn(config)


def run_pipeline(
config: CtsDefaultSettings,
resource: Any, # noqa: ANN401
destination_kwargs: dict[str, Any] | None = None,
pipeline_kwargs: dict[str, Any] | None = None,
pipeline_run_kwargs: dict[str, Any] | None = None,
) -> None:
"""Execute a dlt pipeline.

:param config: pipeline config with output and destination
:type config: BatchedFileInputSettings
:param resource: dlt resource to run
:type resource: Any
:param destination_kwargs: keyword arguments for the dlt destination
:type destination_kwargs: dict[str, Any] | None
:param pipeline_kwargs: keyword arguments for the dlt pipeline
:type pipeline_kwargs: dict[str, Any] | None
:param pipeline_run_kwargs: keyword arguments for the dlt pipeline run
:type pipeline_run_kwargs: dict[str, Any] | None
"""
if config.output:
dlt.config[f"destination.{config.destination}.bucket_url"] = config.output

destination = dlt.destination(config.destination, **(destination_kwargs or {}))
pipeline = dlt.pipeline(destination=destination, **(pipeline_kwargs or {}))
load_info = pipeline.run(resource, **(pipeline_run_kwargs or {}))
logger.info(load_info)
logger.info("Work complete!")


def stream_xml_file_resource(
config: BatchedFileInputSettings,
xml_tag: str,
parse_fn: Callable,
log_interval: int = 1000,
) -> Generator[DataItemWithMeta, Any]:
"""Core generator shared by XML-based dlt pipeline resources.

:param config: pipeline config with input_dir and start_at
:param xml_tag: XML element tag to stream
:param parse_fn: callable(entry, timestamp, file_path) -> dict[str, rows]
:param log_interval: log a progress message every N entries
"""
timestamp = datetime.datetime.now(tz=datetime.UTC)
batch_params: dict[str, Any] = {}
if config.start_at:
batch_params["start_at"] = config.start_at

batcher = BatchCursor(config.input_dir, batch_size=DEFAULT_BATCH_SIZE, **batch_params)
while files := batcher.get_batch():
for file_path in files:
logger.info("Reading from %s", str(file_path))
for n_entries, entry in enumerate(stream_xml_file(file_path, xml_tag)):
parsed_entry = parse_fn(entry=entry, timestamp=timestamp, file_path=file_path)
for table, rows in parsed_entry.items():
yield dlt.mark.with_table_name(rows, table)
if (n_entries + 1) % log_interval == 0:
logger.info("Processed %d entries", n_entries + 1)
58 changes: 58 additions & 0 deletions src/cdm_data_loaders/pipelines/cts_defaults.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,62 @@
"""Common defaults for running pipelines on the KBase CTS."""

from pydantic import AliasChoices, Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict

INPUT_MOUNT = "/input_dir"
OUTPUT_MOUNT = "/output_dir"

VALID_DESTINATIONS = ["local_fs", "minio"]
DEFAULT_BATCH_SIZE = 50


class CtsDefaultSettings(BaseSettings):
"""Configuration for running a basic import pipeline."""

model_config = SettingsConfigDict(
cli_parse_args=True,
cli_exit_on_error=False,
cli_ignore_unknown_args=True,
)
input_dir: str = Field(
default=INPUT_MOUNT,
description="Location of directory containing UniRef XML files to import",
# explicitly allow both kebab case and snake case
validation_alias=AliasChoices("i", "input_dir", "input-dir", "input_dir"),
)
destination: str = Field(
default="local_fs",
description=f"Destination configuration to use for data output. Choices: {VALID_DESTINATIONS}",
validation_alias=AliasChoices("d", "destination"),
)
output: str | None = Field(
default=None,
description="Location to save imported data to, if different from the default supplied by the destination config",
validation_alias=AliasChoices("o", "output"),
)

@field_validator("destination")
@classmethod
def validate_destination(cls, v: str) -> str:
"""Validate the destination against valid choices.

:param v: destination specified
:type v: str
:raises ValueError: if the destination is not valid
:return: valid destination
:rtype: str
"""
if v not in VALID_DESTINATIONS:
err_msg = f"destination must be one of {VALID_DESTINATIONS}, got '{v}'"
raise ValueError(err_msg)
return v


class BatchedFileInputSettings(CtsDefaultSettings):
"""Settings object for an importer that deals with batches of files."""

start_at: int = Field(
default=0,
description="File to start import at",
validation_alias=AliasChoices("s", "start", "start-at", "start_at"),
)
Loading
Loading