diff --git a/Dockerfile.dev b/Dockerfile.dev index 9d1e5548..1d69b75f 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -1,6 +1,6 @@ -FROM ghcr.io/wtsi-npg/ub-18.04-baton-irods-4.2.11:latest +FROM ghcr.io/wtsi-npg/ub-22.04-baton-irods-4.3.4:latest -ARG PYTHON_VERSION=3.12 +ARG PYTHON_VERSION=3.14 ENV DEBIAN_FRONTEND=noninteractive @@ -18,6 +18,7 @@ RUN apt-get update && \ make \ libbz2-dev \ libncurses-dev \ + libsqlite3-dev \ libreadline-dev \ libssl-dev \ zlib1g-dev @@ -28,7 +29,7 @@ RUN echo "deb [arch=amd64] https://packages.irods.org/apt/ $(lsb_release -sc) ma tee /etc/apt/sources.list.d/renci-irods.list && \ apt-get update && \ apt-get install -q -y --no-install-recommends \ - irods-icommands="4.2.11-1~$(lsb_release -sc)" + irods-icommands="4.3.4-0~$(lsb_release -sc)" WORKDIR /app @@ -53,7 +54,7 @@ COPY requirements.txt test-requirements.txt /app/ RUN pip install --no-cache-dir -r requirements.txt && \ pip install --no-cache-dir -r test-requirements.txt -COPY . /app/ +COPY . /app RUN pip install --no-cache-dir . && \ git status && \ @@ -65,4 +66,4 @@ USER appuser ENTRYPOINT ["/app/docker/entrypoint.sh"] -CMD ["/bin/bash", "-c", "sleep infinity"] +CMD ["/bin/bash"] diff --git a/README.md b/README.md index 9d528e46..aa36dae3 100644 --- a/README.md +++ b/README.md @@ -62,12 +62,12 @@ To run the tests in a container, you will need to have Docker installed. With this in place, you can run the tests with the following command: - docker-compose run app pytest --it + docker compose run app pytest --it There will be a delay the first time this is run because the Docker image will be built. To pre-build the image, you can run: - docker-compose build + docker compose build ## Creating a release @@ -148,4 +148,4 @@ string. ## Architecture -- `publish-directory` removes public permissions unless explicitly specified by `--group public` to be able to publish privately whilst having iRODS inheritance enabled on sequencing runs collections ([ADR 1](/docs/decisions/adr-01-publish-directory-removes-public-permissions-by-default.md)) \ No newline at end of file +- `publish-directory` removes public permissions unless explicitly specified by `--group public` to be able to publish privately whilst having iRODS inheritance enabled on sequencing runs collections ([ADR 1](/docs/decisions/adr-01-publish-directory-removes-public-permissions-by-default.md)) diff --git a/docker-compose.yml b/docker-compose.yml index 2022890a..75c6d5e4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,7 +19,7 @@ services: irods-server: platform: linux/amd64 container_name: irods-server - image: "ghcr.io/wtsi-npg/ub-16.04-irods-4.2.7:latest" + image: "ghcr.io/wtsi-npg/ub-22.04-irods-4.3.4:latest" ports: - "127.0.0.1:1247:1247" - "127.0.0.1:20000-20199:20000-20199" @@ -37,10 +37,8 @@ services: context: . dockerfile: Dockerfile.dev restart: always - volumes: - - "./tests/.irods:/home/appuser/.irods/" environment: - IRODS_ENVIRONMENT_FILE: "/home/appuser/.irods/irods_environment.json" + IRODS_ENVIRONMENT_FILE: "/app/tests/.irods/irods_environment.json" IRODS_PASSWORD: "irods" depends_on: irods-server: diff --git a/docker/install_pyenv.sh b/docker/install_pyenv.sh index 677f1bab..97762639 100755 --- a/docker/install_pyenv.sh +++ b/docker/install_pyenv.sh @@ -2,7 +2,7 @@ set -ex -PYENV_RELEASE_VERSION=${PYENV_RELEASE_VERSION:="2.4.16"} +PYENV_RELEASE_VERSION=${PYENV_RELEASE_VERSION:="2.6.16"} export PYENV_GIT_TAG="v${PYENV_RELEASE_VERSION}" PYENV_ROOT=${PYENV_ROOT:-"$HOME/.pyenv"} diff --git a/pyproject.toml b/pyproject.toml index a2e8c089..ebf92ec8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ dynamic = ["version"] dependencies = [ "npg-python-lib >= 1.0.0,<2", "npg_id_generation >=5.0.1", - "partisan >=4.0.2,<5", + "partisan >=4.1,<5", "pymysql >=1.1.1", "python-dateutil >=2.9.0,<3", "rich >=13.6.0", diff --git a/requirements.txt b/requirements.txt index 670741d4..065c96d9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ cryptography==46.0.5 npg_id_generation@https://github.com/wtsi-npg/npg_id_generation/releases/download/5.0.1/npg_id_generation-5.0.1.tar.gz -npg-python-lib@https://github.com/wtsi-npg/npg-python-lib/releases/download/1.1.0/npg_python_lib-1.1.0.tar.gz +npg-python-lib@https://github.com/wtsi-npg/npg-python-lib/releases/download/1.2.0/npg_python_lib-1.2.0.tar.gz partisan@https://github.com/wtsi-npg/partisan/releases/download/4.1.2/partisan-4.1.2.tar.gz pymysql==1.1.2 python-dateutil==2.9.0.post0 diff --git a/src/npg_irods/cli/locate_data_objects.py b/src/npg_irods/cli/locate_data_objects.py index adcf53fa..dccac24f 100644 --- a/src/npg_irods/cli/locate_data_objects.py +++ b/src/npg_irods/cli/locate_data_objects.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright © 2023, 2024 Genome Research Ltd. All rights reserved. +# Copyright © 2023, 2024, 2026 Genome Research Ltd. All rights reserved. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -21,6 +21,7 @@ import argparse import sys from datetime import datetime +from pathlib import Path from typing import Any, Iterator import sqlalchemy @@ -51,6 +52,7 @@ find_updated_samples, find_updated_studies, ) +from npg_irods.db.mlwh_cache import MlwhChangeCache from npg_irods.exception import CollectionNotFound from npg_irods.illumina import find_qc_collection from npg_irods.metadata import infinium @@ -162,6 +164,7 @@ def consent_withdrawn(cli_args: argparse.ArgumentParser): def illumina_updates_cli(cli_args: argparse.ArgumentParser): """Process the command line arguments for finding Illumina data objects and execute the command.""" + dbconfig = IniData(db.Config).from_file(cli_args.db_config.name, "mlwh_ro") engine = sqlalchemy.create_engine( dbconfig.url, pool_pre_ping=True, pool_recycle=3600 @@ -171,10 +174,19 @@ def illumina_updates_cli(cli_args: argparse.ArgumentParser): skip_absent_runs = cli_args.skip_absent_runs json = cli_args.report_json zone = cli_args.zone + cache_path = cli_args.mlwh_cache + prime_cache = cli_args.prime_mlwh_cache with Session(engine) as sess: num_proc, num_errors = illumina_updates( - sess, since, until, skip_absent_runs=skip_absent_runs, json=json, zone=zone + sess, + since, + until, + skip_absent_runs=skip_absent_runs, + json=json, + zone=zone, + cache_path=Path(cache_path), + prime_cache=prime_cache, ) if num_errors: @@ -188,6 +200,8 @@ def illumina_updates( skip_absent_runs: int = None, json: bool = False, zone: str = None, + cache_path: Path | None = None, + prime_cache: bool = False, ) -> tuple[int, int]: """Find recently updated Illumina data in the ML warehouse, locate corresponding data objects in iRODS and print their paths. @@ -200,10 +214,13 @@ def illumina_updates( this number of attempts. json: Print output in JSON format. zone: iRODS zone to query. + cache_path: Path to local content-hash cache for MLWH change detection. + prime_cache: Prime the cache with all samples and studies in the MLWH. Returns: The number of ML warehouse records processed, the number of errors encountered. """ + num_processed = num_errors = 0 attempts = successes = 0 to_print = set() @@ -211,8 +228,18 @@ def illumina_updates( if skip_absent_runs is not None: log.info("Skipping absent runs after n attempts", n=skip_absent_runs) + changed_sample_ids, changed_study_ids = _load_mlwh_change_ids( + sess, since, until, cache_path, prime_cache=prime_cache + ) + for prev, curr in with_previous( - illumina.find_updated_components(sess, since=since, until=until) + illumina.find_updated_components( + sess, + since=since, + until=until, + changed_sample_ids=changed_sample_ids, + changed_study_ids=changed_study_ids, + ) ): if curr is None: # Last item when this is reached continue @@ -276,6 +303,7 @@ def illumina_updates( def ont_updates_cli(cli_args: argparse.ArgumentParser): """Process the command line arguments for finding ONT data objects and execute the command.""" + dbconfig = IniData(db.Config).from_file(cli_args.db_config.name, "mlwh_ro") engine = sqlalchemy.create_engine( dbconfig.url, pool_pre_ping=True, pool_recycle=3600 @@ -285,10 +313,19 @@ def ont_updates_cli(cli_args: argparse.ArgumentParser): report_tags = cli_args.report_tags json = cli_args.report_json zone = cli_args.zone + cache_path = cli_args.mlwh_cache + prime_cache = cli_args.prime_mlwh_cache with Session(engine) as sess: num_proc, num_errors = ont_updates( - sess, since, until, report_tags=report_tags, json=json, zone=zone + sess, + since, + until, + report_tags=report_tags, + json=json, + zone=zone, + cache_path=Path(cache_path), + prime_cache=prime_cache, ) if num_errors: @@ -302,12 +339,23 @@ def ont_updates( report_tags: bool = False, json: bool = False, zone: str = None, + cache_path: Path | None = None, + prime_cache: bool = False, ) -> tuple[int, int]: num_processed = num_errors = 0 + changed_sample_ids, changed_study_ids = _load_mlwh_change_ids( + sess, since, until, cache_path, prime_cache=prime_cache + ) + for i, c in enumerate( ont.find_updated_components( - sess, since=since, until=until, include_tags=report_tags + sess, + since=since, + until=until, + include_tags=report_tags, + changed_sample_ids=changed_sample_ids, + changed_study_ids=changed_study_ids, ) ): num_processed += 1 @@ -346,6 +394,7 @@ def ont_updates( def ont_run_collections_created_cli(cli_args: argparse.ArgumentParser): """Process the command line arguments for finding ONT runfolder collections selected on the time they were created in iRODS, and execute the command.""" + since = cli_args.begin_date until = cli_args.end_date json = cli_args.report_json @@ -387,6 +436,7 @@ def ont_run_collections_created( def pacbio_updates_cli(cli_args: argparse.ArgumentParser): """Process the command line arguments for finding PacBio data objects and execute the command.""" + dbconfig = IniData(db.Config).from_file(cli_args.db_config.name, "mlwh_ro") engine = sqlalchemy.create_engine( dbconfig.url, pool_pre_ping=True, pool_recycle=3600 @@ -396,10 +446,19 @@ def pacbio_updates_cli(cli_args: argparse.ArgumentParser): skip_absent_runs = cli_args.skip_absent_runs json = cli_args.report_json zone = cli_args.zone + cache_path = cli_args.mlwh_cache + prime_cache = cli_args.prime_mlwh_cache with Session(engine) as sess: num_proc, num_errors = pacbio_updates( - sess, since, until, skip_absent_runs=skip_absent_runs, json=json, zone=zone + sess, + since, + until, + skip_absent_runs=skip_absent_runs, + json=json, + zone=zone, + cache_path=Path(cache_path), + prime_cache=prime_cache, ) if num_errors: @@ -413,7 +472,9 @@ def pacbio_updates( skip_absent_runs: int = None, json: bool = False, zone: str = None, -) -> (int, int): + cache_path: Path | None = None, + prime_cache: bool = False, +) -> tuple[int, int]: num_processed = num_errors = 0 attempts = successes = 0 to_print = set() @@ -421,8 +482,18 @@ def pacbio_updates( if skip_absent_runs is not None: log.info("Skipping absent runs after n attempts", n=skip_absent_runs) + changed_sample_ids, changed_study_ids = _load_mlwh_change_ids( + sess, since, until, cache_path, prime_cache=prime_cache + ) + for prev, curr in with_previous( - pacbio.find_updated_components(sess, since=since, until=until) + pacbio.find_updated_components( + sess, + since=since, + until=until, + changed_sample_ids=changed_sample_ids, + changed_study_ids=changed_study_ids, + ) ): if curr is None: # Last item when this is reached continue @@ -481,6 +552,7 @@ def pacbio_updates( def infinium_updates_cli(cli_args: argparse.ArgumentParser): """Process the command line arguments for finding Infinium microarray data objects and execute the command.""" + dbconfig = IniData(db.Config).from_file(cli_args.db_config.name, "mlwh_ro") engine = sqlalchemy.create_engine( dbconfig.url, pool_pre_ping=True, pool_recycle=3600 @@ -519,6 +591,7 @@ def infinium_microarray_updates( def sequenom_updates_cli(cli_args: argparse.ArgumentParser): """Process the command line arguments for finding Sequenom genotype data objects and execute the command.""" + dbconfig = IniData(db.Config).from_file(cli_args.db_config.name, "mlwh_ro") engine = sqlalchemy.create_engine( dbconfig.url, pool_pre_ping=True, pool_recycle=3600 @@ -547,6 +620,48 @@ def sequenom_genotype_updates( return num_processed, num_errors +def _add_mlwh_cache_arguments(parser: argparse.ArgumentParser): + parser.add_argument( + "--mlwh-cache", + "--mlwh_cache", + help="Path to a SQLite cache used to filter Sample/Study updates by content.", + type=str, + ) + parser.add_argument( + "--prime-mlwh-cache", + "--prime_mlwh_cache", + help="Prime the MLWH cache with all Sample/Study rows before filtering.", + action="store_true", + ) + + +def _load_mlwh_change_ids( + sess: Session, + since: datetime, + until: datetime, + cache_path: Path | None, + prime_cache: bool = False, +) -> tuple[set[str] | None, set[str] | None]: + if cache_path is None: + if prime_cache: + log.warning("MLWH cache priming requested without cache path") + + return None, None + + with MlwhChangeCache(cache_path, prime_cache=prime_cache) as cache: + sample_ids = cache.changed_sample_ids(sess, since, until) + study_ids = cache.changed_study_ids(sess, since, until) + + log.info( + "Filtering MLWH updates using cache", + cache=str(cache_path), + samples=len(sample_ids), + studies=len(study_ids), + ) + + return sample_ids, study_ids + + def _print_data_objects_updated_in_mlwh( sess: Session, query: list[AVU], @@ -576,7 +691,7 @@ def _print_data_objects_updated_in_mlwh( def _find_and_print_data_objects( attr: Any, - values: Iterator[int], + values: Iterator[str], query: list[AVU], since: datetime, until: datetime, @@ -640,27 +755,28 @@ def _print_batch(items: set[RodsItem], json: bool = False): def main(): - parser = argparse.ArgumentParser( + root_parser = argparse.ArgumentParser( description=description, formatter_class=argparse.RawDescriptionHelpFormatter ) - add_logging_arguments(parser) - add_db_config_arguments(parser) + add_logging_arguments(root_parser) + add_db_config_arguments(root_parser) - parser.add_argument( + root_parser.add_argument( "--zone", help="Specify a federated iRODS zone in which to find data objects to check. " "This is not required if the target collections are in the local zone.", type=str, ) - parser.add_argument( + root_parser.add_argument( "--version", help="Print the version and exit.", action="version", version=version(), ) - subparsers = parser.add_subparsers(title="Sub-commands", required=True) + subparsers = root_parser.add_subparsers(title="Sub-commands", required=True) + # CLI to locate data with consent withdrawn metadata updates cwdr_parser = subparsers.add_parser( "consent-withdrawn", help="Find data objects related to samples whose consent for data use has " @@ -674,12 +790,15 @@ def main(): ) cwdr_parser.set_defaults(func=consent_withdrawn) + # CLI to find Illumina run metadata updates ilup_parser = subparsers.add_parser( "illumina-updates", help="Find data objects, which are components of Illumina runs, whose tracking " "metadata in the ML warehouse have changed since a specified time.", ) add_date_range_arguments(ilup_parser) + _add_mlwh_cache_arguments(ilup_parser) + ilup_parser.add_argument( "--skip-absent-runs", "--skip_absent_runs", @@ -699,12 +818,14 @@ def main(): ) ilup_parser.set_defaults(func=illumina_updates_cli) + # CLI to find ONT runfolder collections created within a specified time range ontcre_parser = subparsers.add_parser( "ont-run-creation", help="Find ONT runfolder collections created in iRODS within a specified time " "range.", ) add_date_range_arguments(ontcre_parser) + ontcre_parser.add_argument( "--report-json", "--report_json", @@ -713,12 +834,15 @@ def main(): ) ontcre_parser.set_defaults(func=ont_run_collections_created_cli) + # CLI to find ONT run metadata updates ontup_parser = subparsers.add_parser( "ont-updates", help="Find collections, containing data objects for ONT runs, whose tracking" "metadata in the ML warehouse have changed since a specified time.", ) add_date_range_arguments(ontup_parser) + _add_mlwh_cache_arguments(ontup_parser) + ontup_parser.add_argument( "--report-tags", "--report_tags", @@ -733,12 +857,15 @@ def main(): ) ontup_parser.set_defaults(func=ont_updates_cli) + # CLI to find PacBio run metadata updates pbup_parser = subparsers.add_parser( "pacbio-updates", help="Find data objects, which are components of PacBio runs, whose tracking " "metadata in the ML warehouse have changed since a specified time.", ) add_date_range_arguments(pbup_parser) + _add_mlwh_cache_arguments(pbup_parser) + pbup_parser.add_argument( "--skip-absent-runs", "--skip_absent_runs", @@ -758,12 +885,14 @@ def main(): ) pbup_parser.set_defaults(func=pacbio_updates_cli) + # CLI to find Infinium microarray run metadata updates imup_parser = subparsers.add_parser( "infinium-updates", help="Find data objects related to Infinium microarray samples whose tracking " "metadata in the ML warehouse have changed since a specified time.", ) add_date_range_arguments(imup_parser) + imup_parser.add_argument( "--report-json", "--report_json", @@ -772,12 +901,14 @@ def main(): ) imup_parser.set_defaults(func=infinium_updates_cli) + # CLI to find Sequenom genotype run metadata updates squp_parser = subparsers.add_parser( "sequenom-updates", help="Find data objects related to Sequenom genotype samples whose tracking " "metadata in the ML warehouse have changed since a specified time.", ) add_date_range_arguments(squp_parser) + squp_parser.add_argument( "--report-json", "--report_json", @@ -786,7 +917,7 @@ def main(): ) squp_parser.set_defaults(func=sequenom_updates_cli) - args = parser.parse_args() + args = root_parser.parse_args() configure_structlog( config_file=args.log_config, debug=args.debug, diff --git a/src/npg_irods/db/mlwh.py b/src/npg_irods/db/mlwh.py index 7a91288d..6ee7e99c 100644 --- a/src/npg_irods/db/mlwh.py +++ b/src/npg_irods/db/mlwh.py @@ -22,7 +22,7 @@ import enum from contextlib import contextmanager from datetime import datetime, timedelta -from typing import Iterator, Type +from typing import Any, Generator, Iterator, Type import structlog from sqlalchemy import ( @@ -364,7 +364,7 @@ class SeqProductIrodsLocations(Base): @contextmanager -def session_context(engine: Engine) -> Session: +def session_context(engine: Engine) -> Generator[Session, Any, None]: """Yield a session and close, or rollback on error. This context manager does not handle exceptions and will raise them to the caller.""" @@ -426,7 +426,7 @@ def find_sample_by_sample_id(sess: Session, sample_id: str) -> Sample: def find_updated_samples( sess: Session, since: datetime, until: datetime -) -> Iterator[int]: +) -> Iterator[str]: """Return IDs of Samples that have been updated in the ML warehouse. Args: @@ -454,7 +454,7 @@ def find_updated_samples( def find_updated_studies( sess: Session, since: datetime, until: datetime -) -> Iterator[int]: +) -> Iterator[str]: """Return IDs of Studies that have been updated in the ML warehouse. Args: diff --git a/src/npg_irods/db/mlwh_cache.py b/src/npg_irods/db/mlwh_cache.py new file mode 100644 index 00000000..61e9e57c --- /dev/null +++ b/src/npg_irods/db/mlwh_cache.py @@ -0,0 +1,469 @@ +# -*- coding: utf-8 -*- +# +# Copyright © 2026 Genome Research Ltd. All rights reserved. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +"""SQLite-backed cache of ML warehouse content hashes.""" + +import hashlib +import json +import sqlite3 +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Iterable, Sequence + +from sqlalchemy import asc, select +from sqlalchemy.orm import Session +from structlog import get_logger + +from npg_irods.db.mlwh import Sample, Study, find_updated_samples, find_updated_studies + + +def logger(): + """Return a logger for this module.""" + + return get_logger(__name__) + + +CACHE_SCHEMA_VERSION = 1 +SQLITE_BUSY_TIMEOUT_MS = 5000 +CACHE_CHUNK_SIZE = ( + 500 # Don't make this bigger than 32,766 (maximum size of an SQLite IN clause) +) + +SAMPLE_HASH_FIELDS = ( + "id_lims", + "id_sample_lims", + "consent_withdrawn", + "name", + "organism", + "accession_number", + "common_name", + "cohort", + "sanger_sample_id", + "supplier_name", + "public_name", + "donor_id", + "date_of_consent_withdrawn", + "marked_as_consent_withdrawn_by", + "uuid_sample_lims", +) + +STUDY_HASH_FIELDS = ( + "id_lims", + "id_study_lims", + "name", + "accession_number", + "description", + "contains_human_dna", + "contaminated_human_dna", + "remove_x_and_autosomes", + "separate_y_chromosome_data", + "ena_project_id", + "study_title", + "study_visibility", + "ega_dac_accession_number", + "data_access_group", +) + + +SAMPLE_CACHE_CREATE_SQL = ( + "CREATE TABLE IF NOT EXISTS sample_cache (" + "id_sample_lims TEXT PRIMARY KEY, " + "content_hash TEXT NOT NULL, " + "hash_schema_version INTEGER NOT NULL, " + "last_changed_at TEXT NOT NULL)" +) +STUDY_CACHE_CREATE_SQL = ( + "CREATE TABLE IF NOT EXISTS study_cache (" + "id_study_lims TEXT PRIMARY KEY, " + "content_hash TEXT NOT NULL, " + "hash_schema_version INTEGER NOT NULL, " + "last_changed_at TEXT NOT NULL)" +) + +SAMPLE_CACHE_UPSERT_SQL = ( + "INSERT INTO sample_cache (id_sample_lims, content_hash, hash_schema_version, " + "last_changed_at) VALUES (?, ?, ?, ?) " + "ON CONFLICT(id_sample_lims) DO UPDATE SET " + "content_hash=excluded.content_hash, " + "hash_schema_version=excluded.hash_schema_version, " + "last_changed_at=excluded.last_changed_at " + "WHERE content_hash != excluded.content_hash " + "OR hash_schema_version != excluded.hash_schema_version" +) +STUDY_CACHE_UPSERT_SQL = ( + "INSERT INTO study_cache (id_study_lims, content_hash, hash_schema_version, last_changed_at) " + "VALUES (?, ?, ?, ?) " + "ON CONFLICT(id_study_lims) DO UPDATE SET " + "content_hash=excluded.content_hash, " + "hash_schema_version=excluded.hash_schema_version, " + "last_changed_at=excluded.last_changed_at " + "WHERE content_hash != excluded.content_hash " + "OR hash_schema_version != excluded.hash_schema_version" +) + + +@dataclass +class MlwhChangeCache: + """Cache for detecting ML warehouse Sample/Study content changes. + + Uses an SQLite database to store content hashes for rows in the ML warehouse so + that timestamp-only updates can be filtered out. + + The timestamp columns in the study and sample tables are not sufficient to for us + to tell whether or not row data values have changed. This is because the timestamp + columns are updated whenever a row is modified, even if no values have changed. + (Possibly related to the MLWH update mechanism which deletes and inserts new rows + rather than updating existing rows.) + + Therefore, we need to use content hashes to detect changes in the actual data. + + Args: + path: Filesystem path to the SQLite cache file. + hash_schema_version: Version number for the hashing schema. + busy_timeout_ms: SQLite busy timeout in milliseconds. + prime_cache: When True, populate the cache with all rows before filtering + for changes. + """ + + path: Path + hash_schema_version: int = CACHE_SCHEMA_VERSION + busy_timeout_ms: int = SQLITE_BUSY_TIMEOUT_MS + prime_cache: bool = False + + _conn: sqlite3.Connection | None = None + + def __enter__(self): + """Open the cache and ensure required tables exist. + + Returns: + The open cache instance. + """ + + path = self.path.resolve() + path.parent.mkdir(parents=True, exist_ok=True) + + self._conn = sqlite3.connect( + path.as_posix(), timeout=self.busy_timeout_ms / 1000 + ) + self._conn.execute("PRAGMA journal_mode=WAL") + self._conn.execute(f"PRAGMA busy_timeout = {self.busy_timeout_ms}") + + _ensure_schema(self._conn) + + return self + + def __exit__(self, err_type, err, traceback): + """Close the cache connection. + + Args: + err_type: Exception type raised within the context, if any. + err: Exception raised within the context, if any. + traceback: Traceback for the exception, if any. + """ + + if self._conn is not None: + self._conn.close() + self._conn = None + + def changed_sample_ids( + self, sess: Session, since: datetime, until: datetime + ) -> set[str]: + """Return Sample IDs with content changes in the given time range. + + Args: + sess: Open SQLAlchemy session for the ML warehouse. + since: Start of the recorded_at time window. + until: End of the recorded_at time window. + + Returns: + Set of sample IDs whose content has changed since the last cache run. + """ + + if self.prime_cache: + self.prime_samples(sess) + + return _filter_changed_rows( + sess, + self._active_conn(), + Sample, + "id_sample_lims", + SAMPLE_HASH_FIELDS, + find_updated_samples(sess, since, until), + self.hash_schema_version, + _load_sample_cache, + _upsert_sample_cache, + ) + + def changed_study_ids( + self, sess: Session, since: datetime, until: datetime + ) -> set[str]: + """Return Study IDs with content changes in the given time range. + + Args: + sess: Open SQLAlchemy session for the ML warehouse. + since: Start of the recorded_at time window. + until: End of the recorded_at time window. + + Returns: + Set of study IDs whose content has changed since the last cache run. + """ + + if self.prime_cache: + self.prime_studies(sess) + + return _filter_changed_rows( + sess, + self._active_conn(), + Study, + "id_study_lims", + STUDY_HASH_FIELDS, + find_updated_studies(sess, since, until), + self.hash_schema_version, + _load_study_cache, + _upsert_study_cache, + ) + + def prime_samples(self, sess: Session) -> int: + """Populate the sample cache with hashes for all MLWH Sample rows.""" + + total = _prime_cache( + sess, + self._active_conn(), + Sample, + "id_sample_lims", + SAMPLE_HASH_FIELDS, + self.hash_schema_version, + _upsert_sample_cache, + ) + logger().info("Primed sample cache", cache=self.path.as_posix(), rows=total) + + return total + + def prime_studies(self, sess: Session) -> int: + """Populate the study cache with hashes for all MLWH Study rows.""" + + total = _prime_cache( + sess, + self._active_conn(), + Study, + "id_study_lims", + STUDY_HASH_FIELDS, + self.hash_schema_version, + _upsert_study_cache, + ) + logger().info("Primed study cache", cache=self.path.as_posix(), rows=total) + + return total + + def _active_conn(self) -> sqlite3.Connection: + """Return the active SQLite connection or raise if not open.""" + + if self._conn is None: + raise RuntimeError("Cache is not open") + + return self._conn + + +def _filter_changed_rows( + sess: Session, + conn: sqlite3.Connection, + model, + id_attr: str, + hash_fields: Sequence[str], + candidate_ids: Iterable[str], + hash_schema_version: int, + load_cache, + upsert_cache, +) -> set[str]: + """Return IDs whose cached hashes differ from the current content.""" + + changed: set[str] = set() + + for chunk in _chunked(candidate_ids, CACHE_CHUNK_SIZE): + ids = list(dict.fromkeys(chunk)) + if not ids: + continue + + rows = ( + sess.execute(select(model).where(getattr(model, id_attr).in_(ids))) + .scalars() + .all() + ) + if not rows: + continue + + cache_map = load_cache(conn, [getattr(row, id_attr) for row in rows]) + updates: list[tuple[str, str, int, str]] = [] + now = datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat() + + for row in rows: + row_id = getattr(row, id_attr) + content_hash = _stable_hash(_payload(row, hash_fields)) + cached = cache_map.get(row_id) + if cached is None: + changed.add(row_id) + updates.append((row_id, content_hash, hash_schema_version, now)) + elif cached[0] != content_hash or cached[1] != hash_schema_version: + changed.add(row_id) + updates.append((row_id, content_hash, hash_schema_version, now)) + + upsert_cache(conn, updates) + + return changed + + +def _prime_cache( + sess: Session, + conn: sqlite3.Connection, + model, + id_attr: str, + hash_fields: Sequence[str], + hash_schema_version: int, + upsert_cache, +) -> int: + """Insert hashes for all rows in the model into the cache.""" + + updates: list[tuple[str, str, int, str]] = [] + total = 0 + query = sess.query(model).order_by(asc(getattr(model, id_attr))) + + for row in query.yield_per(CACHE_CHUNK_SIZE): + row_id = getattr(row, id_attr) + content_hash = _stable_hash(_payload(row, hash_fields)) + now = datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat() + updates.append((row_id, content_hash, hash_schema_version, now)) + + if len(updates) >= CACHE_CHUNK_SIZE: + upsert_cache(conn, updates) + total += len(updates) + updates.clear() + + if updates: + upsert_cache(conn, updates) + total += len(updates) + + return total + + +def _load_sample_cache(conn: sqlite3.Connection, ids: list[str]) -> dict: + """Load cached sample hashes for the given IDs.""" + + if not ids: + return {} + + placeholders = ",".join("?" for _ in ids) + query = ( + "SELECT id_sample_lims, content_hash, hash_schema_version " + f"FROM sample_cache WHERE id_sample_lims IN ({placeholders})" + ) + rows = conn.execute(query, ids).fetchall() + logger().debug( + "Loaded sample cache rows", num_requested=len(ids), num_loaded=len(rows) + ) + + return {row[0]: (row[1], row[2]) for row in rows} + + +def _load_study_cache(conn: sqlite3.Connection, ids: list[str]) -> dict: + """Load cached study hashes for the given IDs.""" + + if not ids: + return {} + + placeholders = ",".join("?" for _ in ids) + query = ( + "SELECT id_study_lims, content_hash, hash_schema_version " + "FROM study_cache " + f"WHERE id_study_lims IN ({placeholders})" + ) + rows = conn.execute(query, ids).fetchall() + logger().debug( + "Loaded study cache rows", num_requested=len(ids), num_loaded=len(rows) + ) + + return {row[0]: (row[1], row[2]) for row in rows} + + +def _upsert_sample_cache( + conn: sqlite3.Connection, updates: list[tuple[str, str, int, str]] +) -> None: + """Insert or update sample cache rows.""" + + if not updates: + return + + conn.executemany(SAMPLE_CACHE_UPSERT_SQL, updates) + conn.commit() + logger().debug("Upserted new sample cache rows", n=len(updates)) + + +def _upsert_study_cache( + conn: sqlite3.Connection, updates: list[tuple[str, str, int, str]] +) -> None: + """Insert or update study cache rows.""" + + if not updates: + return + + conn.executemany(STUDY_CACHE_UPSERT_SQL, updates) + conn.commit() + logger().debug("Upserted new study cache rows", n=len(updates)) + + +def _ensure_schema(conn: sqlite3.Connection) -> None: + """Create cache tables if missing.""" + + conn.execute(SAMPLE_CACHE_CREATE_SQL) + conn.execute(STUDY_CACHE_CREATE_SQL) + conn.commit() + + +def _chunked(values: Iterable[str], size: int) -> Iterable[list[str]]: + """Yield lists of values in fixed-size chunks.""" + + chunk: list[str] = [] + for value in values: + chunk.append(value) + if len(chunk) >= size: + yield chunk + chunk = [] + + if chunk: + yield chunk + + +def _payload(row, fields: Sequence[str]) -> dict: + """Build a dict payload of selected attributes for hashing.""" + + def _normalise_value(value): + """Return a JSON-safe representation for hashing.""" + + if isinstance(value, datetime): + return value.isoformat() + + return value + + return {field: _normalise_value(getattr(row, field)) for field in fields} + + +def _stable_hash(payload: dict) -> str: + """Return a stable SHA-256 hash of the payload.""" + + s = json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str) + return hashlib.sha256(s.encode("utf-8")).hexdigest() diff --git a/src/npg_irods/illumina.py b/src/npg_irods/illumina.py index 3bf41bf8..b635a7fe 100644 --- a/src/npg_irods/illumina.py +++ b/src/npg_irods/illumina.py @@ -24,11 +24,11 @@ from enum import Enum, unique from functools import lru_cache from pathlib import PurePath -from typing import Iterator, Optional, Type +from typing import Any, Generator, Iterable, Iterator, Optional, Type from partisan.irods import AVU, Collection, DataObject from partisan.metadata import AsValueEnum -from sqlalchemy import asc, not_ +from sqlalchemy import and_, asc, not_, or_ from sqlalchemy.orm import Session from structlog import get_logger @@ -488,8 +488,12 @@ def find_flowcells_by_component( def find_updated_components( - sess: Session, since: datetime, until: datetime -) -> Iterator[Component]: + sess: Session, + since: datetime, + until: datetime, + changed_sample_ids: Optional[Iterable[str]] = None, + changed_study_ids: Optional[Iterable[str]] = None, +) -> Generator[Component, Any, None]: """Find in the ML warehouse any Illumina sequence components whose tracking metadata has been changed within the given time range. @@ -504,19 +508,58 @@ def find_updated_components( the number of rows returned by the query by a factor of 10. Args: - sess: An open ML warehouse session. + sess: An open ML warehouse session. since: The start of the time range. until: The end of the time range. + changed_sample_ids: Optional set of Sample IDs whose content has changed + within the time range. If provided, Sample updates are filtered to + these IDs rather than using recorded_at. + changed_study_ids: Optional set of Study IDs whose content has changed + within the time range. If provided, Study updates are filtered to + these IDs rather than using recorded_at. Returns: - An iterator over Components whose tracking metadata have changed. + A generator over Components whose tracking metadata have changed. """ # Test that the created date is at least 1 day before the recorded date because we # want to avoid rows that have had their recorded_at timestamp changed simply # because they were recently created. + recent_creation = since - timedelta(days=1) + # When cache-based filtering is enabled, an empty set means “no content changes”. + # In that case, do not fall back to time-window-based filtering. + if ( + changed_sample_ids is not None + and changed_study_ids is not None + and not changed_sample_ids + and not changed_study_ids + ): + return + + filters = [IseqFlowcell.recorded_at.between(since, until)] + + if changed_sample_ids is None: + filters.append( + and_( + Sample.recorded_at.between(since, until), + not_(Sample.created.between(recent_creation, since)), + ) + ) + elif changed_sample_ids: + filters.append(Sample.id_sample_lims.in_(changed_sample_ids)) + + if changed_study_ids is None: + filters.append( + and_( + Study.recorded_at.between(since, until), + not_(Study.created.between(recent_creation, since)), + ) + ) + elif changed_study_ids: + filters.append(Study.id_study_lims.in_(changed_study_ids)) + query = ( sess.query( IseqProductMetrics.id_run, IseqFlowcell.position, IseqFlowcell.tag_index @@ -525,13 +568,7 @@ def find_updated_components( .join(IseqFlowcell.sample) .join(IseqFlowcell.study) .join(IseqFlowcell.iseq_product_metrics) - .filter( - Sample.recorded_at.between(since, until) - & not_(Sample.created.between(recent_creation, since)) - | Study.recorded_at.between(since, until) - & not_(Study.created.between(recent_creation, since)) - | IseqFlowcell.recorded_at.between(since, until) - ) + .filter(or_(*filters)) .order_by( asc(IseqProductMetrics.id_run), asc(IseqFlowcell.position), diff --git a/src/npg_irods/ont.py b/src/npg_irods/ont.py index ade027c3..97c16a2c 100644 --- a/src/npg_irods/ont.py +++ b/src/npg_irods/ont.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # -# Copyright © 2021, 2022, 2023, 2024 Genome Research Ltd. All rights reserved. +# Copyright © 2021, 2022, 2023, 2024, 2026 Genome Research Ltd. All +# rights reserved. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -25,12 +26,12 @@ from datetime import datetime from os import PathLike from pathlib import PurePath -from typing import Iterator, Optional, Type +from typing import Any, Generator, Iterable, Optional, Type from partisan.exception import RodsError from partisan.icommands import iquest from partisan.irods import AVU, Collection, DataObject, query_metadata -from sqlalchemy import asc, distinct +from sqlalchemy import asc, distinct, or_ from sqlalchemy.orm import Session from structlog import get_logger @@ -359,8 +360,13 @@ def find_recent_expt(sess: Session, since: datetime) -> list[str]: def find_updated_components( - sess: Session, since: datetime, until: datetime, include_tags=True -) -> Iterator[Component]: + sess: Session, + since: datetime, + until: datetime, + include_tags=True, + changed_sample_ids: Optional[Iterable[str]] = None, + changed_study_ids: Optional[Iterable[str]] = None, +) -> Generator[Component, Any, None]: """Return the components of runs whose ML warehouse metadata has been updated at or since the given date and time. @@ -370,25 +376,47 @@ def find_updated_components( until: A datetime. include_tags: Resolve the components to the granularity of individual tags, rather than as whole runs. Optional, defaults to True. + changed_sample_ids: Optional set of Sample IDs whose content has changed + within the time range. If provided, Sample updates are filtered to + these IDs rather than using recorded_at. + changed_study_ids: Optional set of Study IDs whose content has changed + within the time range. If provided, Study updates are filtered to + these IDs rather than using recorded_at. Returns: - An iterator over the matching components. + An generator over the matching components. """ columns = [OseqFlowcell.experiment_name, OseqFlowcell.instrument_slot] if include_tags: columns.append(OseqFlowcell.tag_identifier) + if ( + changed_sample_ids is not None + and changed_study_ids is not None + and not changed_sample_ids + and not changed_study_ids + ): + return + + filters = [OseqFlowcell.recorded_at.between(since, until)] + + if changed_sample_ids is None: + filters.append(Sample.recorded_at.between(since, until)) + elif changed_sample_ids: + filters.append(Sample.id_sample_lims.in_(changed_sample_ids)) + + if changed_study_ids is None: + filters.append(Study.recorded_at.between(since, until)) + elif changed_study_ids: + filters.append(Study.id_study_lims.in_(changed_study_ids)) + query = ( sess.query(*columns) .distinct() .join(OseqFlowcell.sample) .join(OseqFlowcell.study) - .filter( - Sample.recorded_at.between(since, until) - | Study.recorded_at.between(since, until) - | OseqFlowcell.recorded_at.between(since, until) - ) + .filter(or_(*filters)) .group_by(*columns) ) diff --git a/src/npg_irods/pacbio.py b/src/npg_irods/pacbio.py index 7186803d..68f503e3 100644 --- a/src/npg_irods/pacbio.py +++ b/src/npg_irods/pacbio.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright © 2023, 2024 Genome Research Ltd. All rights reserved. +# Copyright © 2023, 2024, 2026 Genome Research Ltd. All rights reserved. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -22,9 +22,10 @@ from dataclasses import dataclass from datetime import datetime from pathlib import PurePath -from typing import Iterator, Optional, Type +from typing import Any, Generator, Iterable, Iterator, Optional, Type from partisan.irods import AVU, Collection, DataObject +from sqlalchemy import or_ from sqlalchemy.orm import Session from structlog import get_logger @@ -248,8 +249,12 @@ def find_runs_by_component( def find_updated_components( - sess: Session, since: datetime, until: datetime -) -> Iterator[Component]: + sess: Session, + since: datetime, + until: datetime, + changed_sample_ids: Optional[Iterable[str]] = None, + changed_study_ids: Optional[Iterable[str]] = None, +) -> Generator[Component, Any, None]: """Find in the ML warehouse any PacBio sequence components whose tracking metadata has been changed within the given time range. @@ -260,11 +265,37 @@ def find_updated_components( sess: An open ML warehouse session. since: The start of the time range. until: The end of the time range. + changed_sample_ids: Optional set of Sample IDs whose content has changed + within the time range. If provided, Sample updates are filtered to + these IDs rather than using recorded_at. + changed_study_ids: Optional set of Study IDs whose content has changed + within the time range. If provided, Study updates are filtered to + these IDs rather than using recorded_at. Returns: - An iterator over Components whose tracking metadata have changed. + A generator over Components whose tracking metadata have changed. """ + if ( + changed_sample_ids is not None + and changed_study_ids is not None + and not changed_sample_ids + and not changed_study_ids + ): + return + + filters = [PacBioRun.recorded_at.between(since, until)] + + if changed_sample_ids is None: + filters.append(Sample.recorded_at.between(since, until)) + elif changed_sample_ids: + filters.append(Sample.id_sample_lims.in_(changed_sample_ids)) + + if changed_study_ids is None: + filters.append(Study.recorded_at.between(since, until)) + elif changed_study_ids: + filters.append(Study.id_study_lims.in_(changed_study_ids)) + query = ( sess.query( PacBioRun.pac_bio_run_name, @@ -275,11 +306,7 @@ def find_updated_components( .distinct() .join(PacBioRun.sample) .join(PacBioRun.study) - .filter( - Sample.recorded_at.between(since, until) - | Study.recorded_at.between(since, until) - | PacBioRun.recorded_at.between(since, until) - ) + .filter(or_(*filters)) .order_by( PacBioRun.pac_bio_run_name, PacBioRun.well_label, diff --git a/tests/illumina/conftest.py b/tests/illumina/conftest.py index c7650888..75c31fb7 100644 --- a/tests/illumina/conftest.py +++ b/tests/illumina/conftest.py @@ -43,6 +43,7 @@ def initialize_mlwh_illumina_synthetic(session: Session): This is represented by the files in ./tests/data/illumina/synthetic """ + default_timestamps = { "created": CREATED, "last_updated": BEGIN, @@ -196,6 +197,7 @@ def make_sample(n): def initialize_mlwh_illumina_backfill(sess: Session): """Insert ML warehouse test data for Illumina product iRODS paths.""" + changed_study = Study( id_lims="LIMS_05", id_study_lims="4000", @@ -383,6 +385,7 @@ def initialize_mlwh_illumina_backfill(sess: Session): @pytest.fixture(scope="function") def illumina_synthetic_mlwh(mlwh_session) -> Generator[Session, Any, None]: """An ML warehouse database fixture populated with Illumina-related records.""" + initialize_mlwh_illumina_synthetic(mlwh_session) yield mlwh_session @@ -391,6 +394,7 @@ def illumina_synthetic_mlwh(mlwh_session) -> Generator[Session, Any, None]: def illumina_backfill_mlwh(mlwh_session) -> Generator[Session, Any, None]: """An ML warehouse database fixture populated with Illumina iRODS path backfill records.""" + initialize_mlwh_illumina_backfill(mlwh_session) yield mlwh_session diff --git a/tests/illumina/test_locate_data_objects.py b/tests/illumina/test_locate_data_objects.py index c3231a47..c1fa4f05 100644 --- a/tests/illumina/test_locate_data_objects.py +++ b/tests/illumina/test_locate_data_objects.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright © 2023 Genome Research Ltd. All rights reserved. +# Copyright © 2023, 2026 Genome Research Ltd. All rights reserved. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -19,8 +19,9 @@ from pytest import mark as m -from conftest import BEGIN, LATEST +from helpers import BEGIN, LATE, LATEST from npg_irods.cli.locate_data_objects import illumina_updates +from npg_irods.db.mlwh import Sample @m.describe("Locating data objects in iRODS") @@ -59,3 +60,64 @@ def test_illumina_updates( assert np == 7, "Number of MLWH records processed" assert ne == 0, "Number of errors" assert stdout_lines == expected + + @m.context("When MLWH updates are timestamp-only for samples and studies") + @m.it("Should return fewer updates after priming the MLWH cache") + def test_illumina_updates_cache_reduces_updates( + self, capsys, tmp_path, illumina_synthetic_irods, illumina_synthetic_mlwh + ): + np_uncached, ne_uncached = illumina_updates( + illumina_synthetic_mlwh, BEGIN, LATEST + ) + + assert np_uncached == 7, "Number of MLWH records processed without cache" + assert ne_uncached == 0, "Number of errors without cache" + capsys.readouterr() # Clear output from the uncached run + + cache_path = tmp_path / "mlwh_cache.sqlite" + np_cached, ne_cached = illumina_updates( + illumina_synthetic_mlwh, + BEGIN, + LATEST, + cache_path=cache_path, + prime_cache=True, + ) + stdout_lines = [line for line in capsys.readouterr().out.split("\n") if line] + + assert np_cached == 0, "Number of MLWH records processed with cache" + assert ne_cached == 0, "Number of errors with cache" + assert stdout_lines == [] + + @m.context("When a cached MLWH sample has content changes in the time window") + @m.it("Should process only the affected components") + def test_illumina_updates_cache_processes_changed_samples( + self, capsys, tmp_path, illumina_synthetic_irods, illumina_synthetic_mlwh + ): + cache_path = tmp_path / "mlwh_cache.sqlite" + illumina_updates( + illumina_synthetic_mlwh, + BEGIN, + LATEST, + cache_path=cache_path, + prime_cache=True, + ) + capsys.readouterr() # Clear output from the priming run + + sample = ( + illumina_synthetic_mlwh.query(Sample) + .filter_by(id_sample_lims="id_sample_lims2") + .one() + ) + sample.supplier_name = "supplier_name2_updated" + sample.recorded_at = LATEST + illumina_synthetic_mlwh.commit() + + np_cached, ne_cached = illumina_updates( + illumina_synthetic_mlwh, + LATE, + LATEST, + cache_path=cache_path, + ) + + assert np_cached == 2, "Number of MLWH records processed with cache" + assert ne_cached == 0, "Number of errors with cache"