From 2ddbb8422a4ba5728d7481ce2dac2a16c390df98 Mon Sep 17 00:00:00 2001 From: Keith James Date: Mon, 9 Feb 2026 10:00:56 +0000 Subject: [PATCH 1/6] Add MLWH cache hack Add a local SQLite cache of sample and study data hashes so that we can tell when certain column values have changed in the MLWH. There are columns in these tables in the MLWH that advertise change timestamps, but they often wrongly reflect the last time a row was replaced without changing its data. This can be removed once thie issue is addressed upstream. --- src/npg_irods/cli/locate_data_objects.py | 156 ++++++- src/npg_irods/db/mlwh.py | 8 +- src/npg_irods/db/mlwh_cache.py | 469 +++++++++++++++++++++ src/npg_irods/illumina.py | 63 ++- src/npg_irods/ont.py | 50 ++- src/npg_irods/pacbio.py | 47 ++- tests/illumina/conftest.py | 4 + tests/illumina/test_locate_data_objects.py | 29 +- 8 files changed, 778 insertions(+), 48 deletions(-) create mode 100644 src/npg_irods/db/mlwh_cache.py diff --git a/src/npg_irods/cli/locate_data_objects.py b/src/npg_irods/cli/locate_data_objects.py index adcf53fa..87000124 100644 --- a/src/npg_irods/cli/locate_data_objects.py +++ b/src/npg_irods/cli/locate_data_objects.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright © 2023, 2024 Genome Research Ltd. All rights reserved. +# Copyright © 2023, 2024, 2026 Genome Research Ltd. All rights reserved. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -21,6 +21,7 @@ import argparse import sys from datetime import datetime +from pathlib import Path from typing import Any, Iterator import sqlalchemy @@ -51,6 +52,7 @@ find_updated_samples, find_updated_studies, ) +from npg_irods.db.mlwh_cache import MlwhChangeCache from npg_irods.exception import CollectionNotFound from npg_irods.illumina import find_qc_collection from npg_irods.metadata import infinium @@ -162,6 +164,7 @@ def consent_withdrawn(cli_args: argparse.ArgumentParser): def illumina_updates_cli(cli_args: argparse.ArgumentParser): """Process the command line arguments for finding Illumina data objects and execute the command.""" + dbconfig = IniData(db.Config).from_file(cli_args.db_config.name, "mlwh_ro") engine = sqlalchemy.create_engine( dbconfig.url, pool_pre_ping=True, pool_recycle=3600 @@ -171,10 +174,19 @@ def illumina_updates_cli(cli_args: argparse.ArgumentParser): skip_absent_runs = cli_args.skip_absent_runs json = cli_args.report_json zone = cli_args.zone + cache_path = cli_args.mlwh_cache + prime_cache = cli_args.prime_mlwh_cache with Session(engine) as sess: num_proc, num_errors = illumina_updates( - sess, since, until, skip_absent_runs=skip_absent_runs, json=json, zone=zone + sess, + since, + until, + skip_absent_runs=skip_absent_runs, + json=json, + zone=zone, + cache_path=Path(cache_path), + prime_cache=prime_cache, ) if num_errors: @@ -188,6 +200,8 @@ def illumina_updates( skip_absent_runs: int = None, json: bool = False, zone: str = None, + cache_path: Path | None = None, + prime_cache: bool = False, ) -> tuple[int, int]: """Find recently updated Illumina data in the ML warehouse, locate corresponding data objects in iRODS and print their paths. @@ -200,10 +214,13 @@ def illumina_updates( this number of attempts. json: Print output in JSON format. zone: iRODS zone to query. + cache_path: Path to local content-hash cache for MLWH change detection. + prime_cache: Prime the cache with all samples and studies in the MLWH. Returns: The number of ML warehouse records processed, the number of errors encountered. """ + num_processed = num_errors = 0 attempts = successes = 0 to_print = set() @@ -211,8 +228,18 @@ def illumina_updates( if skip_absent_runs is not None: log.info("Skipping absent runs after n attempts", n=skip_absent_runs) + changed_sample_ids, changed_study_ids = _load_mlwh_change_ids( + sess, since, until, cache_path, prime_cache=prime_cache + ) + for prev, curr in with_previous( - illumina.find_updated_components(sess, since=since, until=until) + illumina.find_updated_components( + sess, + since=since, + until=until, + changed_sample_ids=changed_sample_ids, + changed_study_ids=changed_study_ids, + ) ): if curr is None: # Last item when this is reached continue @@ -276,6 +303,7 @@ def illumina_updates( def ont_updates_cli(cli_args: argparse.ArgumentParser): """Process the command line arguments for finding ONT data objects and execute the command.""" + dbconfig = IniData(db.Config).from_file(cli_args.db_config.name, "mlwh_ro") engine = sqlalchemy.create_engine( dbconfig.url, pool_pre_ping=True, pool_recycle=3600 @@ -285,10 +313,19 @@ def ont_updates_cli(cli_args: argparse.ArgumentParser): report_tags = cli_args.report_tags json = cli_args.report_json zone = cli_args.zone + cache_path = cli_args.mlwh_cache + prime_cache = cli_args.prime_mlwh_cache with Session(engine) as sess: num_proc, num_errors = ont_updates( - sess, since, until, report_tags=report_tags, json=json, zone=zone + sess, + since, + until, + report_tags=report_tags, + json=json, + zone=zone, + cache_path=Path(cache_path), + prime_cache=prime_cache, ) if num_errors: @@ -302,12 +339,23 @@ def ont_updates( report_tags: bool = False, json: bool = False, zone: str = None, + cache_path: Path | None = None, + prime_cache: bool = False, ) -> tuple[int, int]: num_processed = num_errors = 0 + changed_sample_ids, changed_study_ids = _load_mlwh_change_ids( + sess, since, until, cache_path, prime_cache=prime_cache + ) + for i, c in enumerate( ont.find_updated_components( - sess, since=since, until=until, include_tags=report_tags + sess, + since=since, + until=until, + include_tags=report_tags, + changed_sample_ids=changed_sample_ids, + changed_study_ids=changed_study_ids, ) ): num_processed += 1 @@ -346,6 +394,7 @@ def ont_updates( def ont_run_collections_created_cli(cli_args: argparse.ArgumentParser): """Process the command line arguments for finding ONT runfolder collections selected on the time they were created in iRODS, and execute the command.""" + since = cli_args.begin_date until = cli_args.end_date json = cli_args.report_json @@ -387,6 +436,7 @@ def ont_run_collections_created( def pacbio_updates_cli(cli_args: argparse.ArgumentParser): """Process the command line arguments for finding PacBio data objects and execute the command.""" + dbconfig = IniData(db.Config).from_file(cli_args.db_config.name, "mlwh_ro") engine = sqlalchemy.create_engine( dbconfig.url, pool_pre_ping=True, pool_recycle=3600 @@ -396,10 +446,19 @@ def pacbio_updates_cli(cli_args: argparse.ArgumentParser): skip_absent_runs = cli_args.skip_absent_runs json = cli_args.report_json zone = cli_args.zone + cache_path = cli_args.mlwh_cache + prime_cache = cli_args.prime_mlwh_cache with Session(engine) as sess: num_proc, num_errors = pacbio_updates( - sess, since, until, skip_absent_runs=skip_absent_runs, json=json, zone=zone + sess, + since, + until, + skip_absent_runs=skip_absent_runs, + json=json, + zone=zone, + cache_path=Path(cache_path), + prime_cache=prime_cache, ) if num_errors: @@ -413,7 +472,9 @@ def pacbio_updates( skip_absent_runs: int = None, json: bool = False, zone: str = None, -) -> (int, int): + cache_path: Path | None = None, + prime_cache: bool = False, +) -> tuple[int, int]: num_processed = num_errors = 0 attempts = successes = 0 to_print = set() @@ -421,8 +482,18 @@ def pacbio_updates( if skip_absent_runs is not None: log.info("Skipping absent runs after n attempts", n=skip_absent_runs) + changed_sample_ids, changed_study_ids = _load_mlwh_change_ids( + sess, since, until, cache_path, prime_cache=prime_cache + ) + for prev, curr in with_previous( - pacbio.find_updated_components(sess, since=since, until=until) + pacbio.find_updated_components( + sess, + since=since, + until=until, + changed_sample_ids=changed_sample_ids, + changed_study_ids=changed_study_ids, + ) ): if curr is None: # Last item when this is reached continue @@ -481,6 +552,7 @@ def pacbio_updates( def infinium_updates_cli(cli_args: argparse.ArgumentParser): """Process the command line arguments for finding Infinium microarray data objects and execute the command.""" + dbconfig = IniData(db.Config).from_file(cli_args.db_config.name, "mlwh_ro") engine = sqlalchemy.create_engine( dbconfig.url, pool_pre_ping=True, pool_recycle=3600 @@ -519,6 +591,7 @@ def infinium_microarray_updates( def sequenom_updates_cli(cli_args: argparse.ArgumentParser): """Process the command line arguments for finding Sequenom genotype data objects and execute the command.""" + dbconfig = IniData(db.Config).from_file(cli_args.db_config.name, "mlwh_ro") engine = sqlalchemy.create_engine( dbconfig.url, pool_pre_ping=True, pool_recycle=3600 @@ -547,6 +620,33 @@ def sequenom_genotype_updates( return num_processed, num_errors +def _load_mlwh_change_ids( + sess: Session, + since: datetime, + until: datetime, + cache_path: Path | None, + prime_cache: bool = False, +) -> tuple[set[str] | None, set[str] | None]: + if cache_path is None: + if prime_cache: + log.warning("MLWH cache priming requested without cache path") + + return None, None + + with MlwhChangeCache(cache_path, prime_cache=prime_cache) as cache: + sample_ids = cache.changed_sample_ids(sess, since, until) + study_ids = cache.changed_study_ids(sess, since, until) + + log.info( + "Filtering MLWH updates using cache", + cache=str(cache_path), + samples=len(sample_ids), + studies=len(study_ids), + ) + + return sample_ids, study_ids + + def _print_data_objects_updated_in_mlwh( sess: Session, query: list[AVU], @@ -576,7 +676,7 @@ def _print_data_objects_updated_in_mlwh( def _find_and_print_data_objects( attr: Any, - values: Iterator[int], + values: Iterator[str], query: list[AVU], since: datetime, until: datetime, @@ -697,6 +797,18 @@ def main(): help="Print output in JSON format.", action="store_true", ) + ilup_parser.add_argument( + "--mlwh-cache", + "--mlwh_cache", + help="Path to a SQLite cache used to filter Sample/Study updates by content.", + type=str, + ) + ilup_parser.add_argument( + "--prime-mlwh-cache", + "--prime_mlwh_cache", + help="Prime the MLWH cache with all Sample/Study rows before filtering.", + action="store_true", + ) ilup_parser.set_defaults(func=illumina_updates_cli) ontcre_parser = subparsers.add_parser( @@ -731,6 +843,18 @@ def main(): help="Print output in JSON format.", action="store_true", ) + ontup_parser.add_argument( + "--mlwh-cache", + "--mlwh_cache", + help="Path to a SQLite cache used to filter Sample/Study updates by content.", + type=str, + ) + ontup_parser.add_argument( + "--prime-mlwh-cache", + "--prime_mlwh_cache", + help="Prime the MLWH cache with all Sample/Study rows before filtering.", + action="store_true", + ) ontup_parser.set_defaults(func=ont_updates_cli) pbup_parser = subparsers.add_parser( @@ -756,6 +880,20 @@ def main(): help="Print output in JSON format.", action="store_true", ) + pbup_parser.add_argument( + "--mlwh-cache", + "--mlwh_cache", + help="Path to a SQLite cache used to filter Sample/Study updates by content.", + type=str, + ) + pbup_parser.add_argument( + "--prime-mlwh-cache", + "--prime_mlwh_cache", + help="Prime the MLWH cache with all Sample/Study rows before filtering. This " + "is only useful when the cache is stale or empty. Do not use this option when " + "you want to detect updates.", + action="store_true", + ) pbup_parser.set_defaults(func=pacbio_updates_cli) imup_parser = subparsers.add_parser( diff --git a/src/npg_irods/db/mlwh.py b/src/npg_irods/db/mlwh.py index 7a91288d..6ee7e99c 100644 --- a/src/npg_irods/db/mlwh.py +++ b/src/npg_irods/db/mlwh.py @@ -22,7 +22,7 @@ import enum from contextlib import contextmanager from datetime import datetime, timedelta -from typing import Iterator, Type +from typing import Any, Generator, Iterator, Type import structlog from sqlalchemy import ( @@ -364,7 +364,7 @@ class SeqProductIrodsLocations(Base): @contextmanager -def session_context(engine: Engine) -> Session: +def session_context(engine: Engine) -> Generator[Session, Any, None]: """Yield a session and close, or rollback on error. This context manager does not handle exceptions and will raise them to the caller.""" @@ -426,7 +426,7 @@ def find_sample_by_sample_id(sess: Session, sample_id: str) -> Sample: def find_updated_samples( sess: Session, since: datetime, until: datetime -) -> Iterator[int]: +) -> Iterator[str]: """Return IDs of Samples that have been updated in the ML warehouse. Args: @@ -454,7 +454,7 @@ def find_updated_samples( def find_updated_studies( sess: Session, since: datetime, until: datetime -) -> Iterator[int]: +) -> Iterator[str]: """Return IDs of Studies that have been updated in the ML warehouse. Args: diff --git a/src/npg_irods/db/mlwh_cache.py b/src/npg_irods/db/mlwh_cache.py new file mode 100644 index 00000000..61e9e57c --- /dev/null +++ b/src/npg_irods/db/mlwh_cache.py @@ -0,0 +1,469 @@ +# -*- coding: utf-8 -*- +# +# Copyright © 2026 Genome Research Ltd. All rights reserved. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +"""SQLite-backed cache of ML warehouse content hashes.""" + +import hashlib +import json +import sqlite3 +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Iterable, Sequence + +from sqlalchemy import asc, select +from sqlalchemy.orm import Session +from structlog import get_logger + +from npg_irods.db.mlwh import Sample, Study, find_updated_samples, find_updated_studies + + +def logger(): + """Return a logger for this module.""" + + return get_logger(__name__) + + +CACHE_SCHEMA_VERSION = 1 +SQLITE_BUSY_TIMEOUT_MS = 5000 +CACHE_CHUNK_SIZE = ( + 500 # Don't make this bigger than 32,766 (maximum size of an SQLite IN clause) +) + +SAMPLE_HASH_FIELDS = ( + "id_lims", + "id_sample_lims", + "consent_withdrawn", + "name", + "organism", + "accession_number", + "common_name", + "cohort", + "sanger_sample_id", + "supplier_name", + "public_name", + "donor_id", + "date_of_consent_withdrawn", + "marked_as_consent_withdrawn_by", + "uuid_sample_lims", +) + +STUDY_HASH_FIELDS = ( + "id_lims", + "id_study_lims", + "name", + "accession_number", + "description", + "contains_human_dna", + "contaminated_human_dna", + "remove_x_and_autosomes", + "separate_y_chromosome_data", + "ena_project_id", + "study_title", + "study_visibility", + "ega_dac_accession_number", + "data_access_group", +) + + +SAMPLE_CACHE_CREATE_SQL = ( + "CREATE TABLE IF NOT EXISTS sample_cache (" + "id_sample_lims TEXT PRIMARY KEY, " + "content_hash TEXT NOT NULL, " + "hash_schema_version INTEGER NOT NULL, " + "last_changed_at TEXT NOT NULL)" +) +STUDY_CACHE_CREATE_SQL = ( + "CREATE TABLE IF NOT EXISTS study_cache (" + "id_study_lims TEXT PRIMARY KEY, " + "content_hash TEXT NOT NULL, " + "hash_schema_version INTEGER NOT NULL, " + "last_changed_at TEXT NOT NULL)" +) + +SAMPLE_CACHE_UPSERT_SQL = ( + "INSERT INTO sample_cache (id_sample_lims, content_hash, hash_schema_version, " + "last_changed_at) VALUES (?, ?, ?, ?) " + "ON CONFLICT(id_sample_lims) DO UPDATE SET " + "content_hash=excluded.content_hash, " + "hash_schema_version=excluded.hash_schema_version, " + "last_changed_at=excluded.last_changed_at " + "WHERE content_hash != excluded.content_hash " + "OR hash_schema_version != excluded.hash_schema_version" +) +STUDY_CACHE_UPSERT_SQL = ( + "INSERT INTO study_cache (id_study_lims, content_hash, hash_schema_version, last_changed_at) " + "VALUES (?, ?, ?, ?) " + "ON CONFLICT(id_study_lims) DO UPDATE SET " + "content_hash=excluded.content_hash, " + "hash_schema_version=excluded.hash_schema_version, " + "last_changed_at=excluded.last_changed_at " + "WHERE content_hash != excluded.content_hash " + "OR hash_schema_version != excluded.hash_schema_version" +) + + +@dataclass +class MlwhChangeCache: + """Cache for detecting ML warehouse Sample/Study content changes. + + Uses an SQLite database to store content hashes for rows in the ML warehouse so + that timestamp-only updates can be filtered out. + + The timestamp columns in the study and sample tables are not sufficient to for us + to tell whether or not row data values have changed. This is because the timestamp + columns are updated whenever a row is modified, even if no values have changed. + (Possibly related to the MLWH update mechanism which deletes and inserts new rows + rather than updating existing rows.) + + Therefore, we need to use content hashes to detect changes in the actual data. + + Args: + path: Filesystem path to the SQLite cache file. + hash_schema_version: Version number for the hashing schema. + busy_timeout_ms: SQLite busy timeout in milliseconds. + prime_cache: When True, populate the cache with all rows before filtering + for changes. + """ + + path: Path + hash_schema_version: int = CACHE_SCHEMA_VERSION + busy_timeout_ms: int = SQLITE_BUSY_TIMEOUT_MS + prime_cache: bool = False + + _conn: sqlite3.Connection | None = None + + def __enter__(self): + """Open the cache and ensure required tables exist. + + Returns: + The open cache instance. + """ + + path = self.path.resolve() + path.parent.mkdir(parents=True, exist_ok=True) + + self._conn = sqlite3.connect( + path.as_posix(), timeout=self.busy_timeout_ms / 1000 + ) + self._conn.execute("PRAGMA journal_mode=WAL") + self._conn.execute(f"PRAGMA busy_timeout = {self.busy_timeout_ms}") + + _ensure_schema(self._conn) + + return self + + def __exit__(self, err_type, err, traceback): + """Close the cache connection. + + Args: + err_type: Exception type raised within the context, if any. + err: Exception raised within the context, if any. + traceback: Traceback for the exception, if any. + """ + + if self._conn is not None: + self._conn.close() + self._conn = None + + def changed_sample_ids( + self, sess: Session, since: datetime, until: datetime + ) -> set[str]: + """Return Sample IDs with content changes in the given time range. + + Args: + sess: Open SQLAlchemy session for the ML warehouse. + since: Start of the recorded_at time window. + until: End of the recorded_at time window. + + Returns: + Set of sample IDs whose content has changed since the last cache run. + """ + + if self.prime_cache: + self.prime_samples(sess) + + return _filter_changed_rows( + sess, + self._active_conn(), + Sample, + "id_sample_lims", + SAMPLE_HASH_FIELDS, + find_updated_samples(sess, since, until), + self.hash_schema_version, + _load_sample_cache, + _upsert_sample_cache, + ) + + def changed_study_ids( + self, sess: Session, since: datetime, until: datetime + ) -> set[str]: + """Return Study IDs with content changes in the given time range. + + Args: + sess: Open SQLAlchemy session for the ML warehouse. + since: Start of the recorded_at time window. + until: End of the recorded_at time window. + + Returns: + Set of study IDs whose content has changed since the last cache run. + """ + + if self.prime_cache: + self.prime_studies(sess) + + return _filter_changed_rows( + sess, + self._active_conn(), + Study, + "id_study_lims", + STUDY_HASH_FIELDS, + find_updated_studies(sess, since, until), + self.hash_schema_version, + _load_study_cache, + _upsert_study_cache, + ) + + def prime_samples(self, sess: Session) -> int: + """Populate the sample cache with hashes for all MLWH Sample rows.""" + + total = _prime_cache( + sess, + self._active_conn(), + Sample, + "id_sample_lims", + SAMPLE_HASH_FIELDS, + self.hash_schema_version, + _upsert_sample_cache, + ) + logger().info("Primed sample cache", cache=self.path.as_posix(), rows=total) + + return total + + def prime_studies(self, sess: Session) -> int: + """Populate the study cache with hashes for all MLWH Study rows.""" + + total = _prime_cache( + sess, + self._active_conn(), + Study, + "id_study_lims", + STUDY_HASH_FIELDS, + self.hash_schema_version, + _upsert_study_cache, + ) + logger().info("Primed study cache", cache=self.path.as_posix(), rows=total) + + return total + + def _active_conn(self) -> sqlite3.Connection: + """Return the active SQLite connection or raise if not open.""" + + if self._conn is None: + raise RuntimeError("Cache is not open") + + return self._conn + + +def _filter_changed_rows( + sess: Session, + conn: sqlite3.Connection, + model, + id_attr: str, + hash_fields: Sequence[str], + candidate_ids: Iterable[str], + hash_schema_version: int, + load_cache, + upsert_cache, +) -> set[str]: + """Return IDs whose cached hashes differ from the current content.""" + + changed: set[str] = set() + + for chunk in _chunked(candidate_ids, CACHE_CHUNK_SIZE): + ids = list(dict.fromkeys(chunk)) + if not ids: + continue + + rows = ( + sess.execute(select(model).where(getattr(model, id_attr).in_(ids))) + .scalars() + .all() + ) + if not rows: + continue + + cache_map = load_cache(conn, [getattr(row, id_attr) for row in rows]) + updates: list[tuple[str, str, int, str]] = [] + now = datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat() + + for row in rows: + row_id = getattr(row, id_attr) + content_hash = _stable_hash(_payload(row, hash_fields)) + cached = cache_map.get(row_id) + if cached is None: + changed.add(row_id) + updates.append((row_id, content_hash, hash_schema_version, now)) + elif cached[0] != content_hash or cached[1] != hash_schema_version: + changed.add(row_id) + updates.append((row_id, content_hash, hash_schema_version, now)) + + upsert_cache(conn, updates) + + return changed + + +def _prime_cache( + sess: Session, + conn: sqlite3.Connection, + model, + id_attr: str, + hash_fields: Sequence[str], + hash_schema_version: int, + upsert_cache, +) -> int: + """Insert hashes for all rows in the model into the cache.""" + + updates: list[tuple[str, str, int, str]] = [] + total = 0 + query = sess.query(model).order_by(asc(getattr(model, id_attr))) + + for row in query.yield_per(CACHE_CHUNK_SIZE): + row_id = getattr(row, id_attr) + content_hash = _stable_hash(_payload(row, hash_fields)) + now = datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat() + updates.append((row_id, content_hash, hash_schema_version, now)) + + if len(updates) >= CACHE_CHUNK_SIZE: + upsert_cache(conn, updates) + total += len(updates) + updates.clear() + + if updates: + upsert_cache(conn, updates) + total += len(updates) + + return total + + +def _load_sample_cache(conn: sqlite3.Connection, ids: list[str]) -> dict: + """Load cached sample hashes for the given IDs.""" + + if not ids: + return {} + + placeholders = ",".join("?" for _ in ids) + query = ( + "SELECT id_sample_lims, content_hash, hash_schema_version " + f"FROM sample_cache WHERE id_sample_lims IN ({placeholders})" + ) + rows = conn.execute(query, ids).fetchall() + logger().debug( + "Loaded sample cache rows", num_requested=len(ids), num_loaded=len(rows) + ) + + return {row[0]: (row[1], row[2]) for row in rows} + + +def _load_study_cache(conn: sqlite3.Connection, ids: list[str]) -> dict: + """Load cached study hashes for the given IDs.""" + + if not ids: + return {} + + placeholders = ",".join("?" for _ in ids) + query = ( + "SELECT id_study_lims, content_hash, hash_schema_version " + "FROM study_cache " + f"WHERE id_study_lims IN ({placeholders})" + ) + rows = conn.execute(query, ids).fetchall() + logger().debug( + "Loaded study cache rows", num_requested=len(ids), num_loaded=len(rows) + ) + + return {row[0]: (row[1], row[2]) for row in rows} + + +def _upsert_sample_cache( + conn: sqlite3.Connection, updates: list[tuple[str, str, int, str]] +) -> None: + """Insert or update sample cache rows.""" + + if not updates: + return + + conn.executemany(SAMPLE_CACHE_UPSERT_SQL, updates) + conn.commit() + logger().debug("Upserted new sample cache rows", n=len(updates)) + + +def _upsert_study_cache( + conn: sqlite3.Connection, updates: list[tuple[str, str, int, str]] +) -> None: + """Insert or update study cache rows.""" + + if not updates: + return + + conn.executemany(STUDY_CACHE_UPSERT_SQL, updates) + conn.commit() + logger().debug("Upserted new study cache rows", n=len(updates)) + + +def _ensure_schema(conn: sqlite3.Connection) -> None: + """Create cache tables if missing.""" + + conn.execute(SAMPLE_CACHE_CREATE_SQL) + conn.execute(STUDY_CACHE_CREATE_SQL) + conn.commit() + + +def _chunked(values: Iterable[str], size: int) -> Iterable[list[str]]: + """Yield lists of values in fixed-size chunks.""" + + chunk: list[str] = [] + for value in values: + chunk.append(value) + if len(chunk) >= size: + yield chunk + chunk = [] + + if chunk: + yield chunk + + +def _payload(row, fields: Sequence[str]) -> dict: + """Build a dict payload of selected attributes for hashing.""" + + def _normalise_value(value): + """Return a JSON-safe representation for hashing.""" + + if isinstance(value, datetime): + return value.isoformat() + + return value + + return {field: _normalise_value(getattr(row, field)) for field in fields} + + +def _stable_hash(payload: dict) -> str: + """Return a stable SHA-256 hash of the payload.""" + + s = json.dumps(payload, sort_keys=True, separators=(",", ":"), default=str) + return hashlib.sha256(s.encode("utf-8")).hexdigest() diff --git a/src/npg_irods/illumina.py b/src/npg_irods/illumina.py index 3bf41bf8..b635a7fe 100644 --- a/src/npg_irods/illumina.py +++ b/src/npg_irods/illumina.py @@ -24,11 +24,11 @@ from enum import Enum, unique from functools import lru_cache from pathlib import PurePath -from typing import Iterator, Optional, Type +from typing import Any, Generator, Iterable, Iterator, Optional, Type from partisan.irods import AVU, Collection, DataObject from partisan.metadata import AsValueEnum -from sqlalchemy import asc, not_ +from sqlalchemy import and_, asc, not_, or_ from sqlalchemy.orm import Session from structlog import get_logger @@ -488,8 +488,12 @@ def find_flowcells_by_component( def find_updated_components( - sess: Session, since: datetime, until: datetime -) -> Iterator[Component]: + sess: Session, + since: datetime, + until: datetime, + changed_sample_ids: Optional[Iterable[str]] = None, + changed_study_ids: Optional[Iterable[str]] = None, +) -> Generator[Component, Any, None]: """Find in the ML warehouse any Illumina sequence components whose tracking metadata has been changed within the given time range. @@ -504,19 +508,58 @@ def find_updated_components( the number of rows returned by the query by a factor of 10. Args: - sess: An open ML warehouse session. + sess: An open ML warehouse session. since: The start of the time range. until: The end of the time range. + changed_sample_ids: Optional set of Sample IDs whose content has changed + within the time range. If provided, Sample updates are filtered to + these IDs rather than using recorded_at. + changed_study_ids: Optional set of Study IDs whose content has changed + within the time range. If provided, Study updates are filtered to + these IDs rather than using recorded_at. Returns: - An iterator over Components whose tracking metadata have changed. + A generator over Components whose tracking metadata have changed. """ # Test that the created date is at least 1 day before the recorded date because we # want to avoid rows that have had their recorded_at timestamp changed simply # because they were recently created. + recent_creation = since - timedelta(days=1) + # When cache-based filtering is enabled, an empty set means “no content changes”. + # In that case, do not fall back to time-window-based filtering. + if ( + changed_sample_ids is not None + and changed_study_ids is not None + and not changed_sample_ids + and not changed_study_ids + ): + return + + filters = [IseqFlowcell.recorded_at.between(since, until)] + + if changed_sample_ids is None: + filters.append( + and_( + Sample.recorded_at.between(since, until), + not_(Sample.created.between(recent_creation, since)), + ) + ) + elif changed_sample_ids: + filters.append(Sample.id_sample_lims.in_(changed_sample_ids)) + + if changed_study_ids is None: + filters.append( + and_( + Study.recorded_at.between(since, until), + not_(Study.created.between(recent_creation, since)), + ) + ) + elif changed_study_ids: + filters.append(Study.id_study_lims.in_(changed_study_ids)) + query = ( sess.query( IseqProductMetrics.id_run, IseqFlowcell.position, IseqFlowcell.tag_index @@ -525,13 +568,7 @@ def find_updated_components( .join(IseqFlowcell.sample) .join(IseqFlowcell.study) .join(IseqFlowcell.iseq_product_metrics) - .filter( - Sample.recorded_at.between(since, until) - & not_(Sample.created.between(recent_creation, since)) - | Study.recorded_at.between(since, until) - & not_(Study.created.between(recent_creation, since)) - | IseqFlowcell.recorded_at.between(since, until) - ) + .filter(or_(*filters)) .order_by( asc(IseqProductMetrics.id_run), asc(IseqFlowcell.position), diff --git a/src/npg_irods/ont.py b/src/npg_irods/ont.py index ade027c3..97c16a2c 100644 --- a/src/npg_irods/ont.py +++ b/src/npg_irods/ont.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # -# Copyright © 2021, 2022, 2023, 2024 Genome Research Ltd. All rights reserved. +# Copyright © 2021, 2022, 2023, 2024, 2026 Genome Research Ltd. All +# rights reserved. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -25,12 +26,12 @@ from datetime import datetime from os import PathLike from pathlib import PurePath -from typing import Iterator, Optional, Type +from typing import Any, Generator, Iterable, Optional, Type from partisan.exception import RodsError from partisan.icommands import iquest from partisan.irods import AVU, Collection, DataObject, query_metadata -from sqlalchemy import asc, distinct +from sqlalchemy import asc, distinct, or_ from sqlalchemy.orm import Session from structlog import get_logger @@ -359,8 +360,13 @@ def find_recent_expt(sess: Session, since: datetime) -> list[str]: def find_updated_components( - sess: Session, since: datetime, until: datetime, include_tags=True -) -> Iterator[Component]: + sess: Session, + since: datetime, + until: datetime, + include_tags=True, + changed_sample_ids: Optional[Iterable[str]] = None, + changed_study_ids: Optional[Iterable[str]] = None, +) -> Generator[Component, Any, None]: """Return the components of runs whose ML warehouse metadata has been updated at or since the given date and time. @@ -370,25 +376,47 @@ def find_updated_components( until: A datetime. include_tags: Resolve the components to the granularity of individual tags, rather than as whole runs. Optional, defaults to True. + changed_sample_ids: Optional set of Sample IDs whose content has changed + within the time range. If provided, Sample updates are filtered to + these IDs rather than using recorded_at. + changed_study_ids: Optional set of Study IDs whose content has changed + within the time range. If provided, Study updates are filtered to + these IDs rather than using recorded_at. Returns: - An iterator over the matching components. + An generator over the matching components. """ columns = [OseqFlowcell.experiment_name, OseqFlowcell.instrument_slot] if include_tags: columns.append(OseqFlowcell.tag_identifier) + if ( + changed_sample_ids is not None + and changed_study_ids is not None + and not changed_sample_ids + and not changed_study_ids + ): + return + + filters = [OseqFlowcell.recorded_at.between(since, until)] + + if changed_sample_ids is None: + filters.append(Sample.recorded_at.between(since, until)) + elif changed_sample_ids: + filters.append(Sample.id_sample_lims.in_(changed_sample_ids)) + + if changed_study_ids is None: + filters.append(Study.recorded_at.between(since, until)) + elif changed_study_ids: + filters.append(Study.id_study_lims.in_(changed_study_ids)) + query = ( sess.query(*columns) .distinct() .join(OseqFlowcell.sample) .join(OseqFlowcell.study) - .filter( - Sample.recorded_at.between(since, until) - | Study.recorded_at.between(since, until) - | OseqFlowcell.recorded_at.between(since, until) - ) + .filter(or_(*filters)) .group_by(*columns) ) diff --git a/src/npg_irods/pacbio.py b/src/npg_irods/pacbio.py index 7186803d..68f503e3 100644 --- a/src/npg_irods/pacbio.py +++ b/src/npg_irods/pacbio.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright © 2023, 2024 Genome Research Ltd. All rights reserved. +# Copyright © 2023, 2024, 2026 Genome Research Ltd. All rights reserved. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -22,9 +22,10 @@ from dataclasses import dataclass from datetime import datetime from pathlib import PurePath -from typing import Iterator, Optional, Type +from typing import Any, Generator, Iterable, Iterator, Optional, Type from partisan.irods import AVU, Collection, DataObject +from sqlalchemy import or_ from sqlalchemy.orm import Session from structlog import get_logger @@ -248,8 +249,12 @@ def find_runs_by_component( def find_updated_components( - sess: Session, since: datetime, until: datetime -) -> Iterator[Component]: + sess: Session, + since: datetime, + until: datetime, + changed_sample_ids: Optional[Iterable[str]] = None, + changed_study_ids: Optional[Iterable[str]] = None, +) -> Generator[Component, Any, None]: """Find in the ML warehouse any PacBio sequence components whose tracking metadata has been changed within the given time range. @@ -260,11 +265,37 @@ def find_updated_components( sess: An open ML warehouse session. since: The start of the time range. until: The end of the time range. + changed_sample_ids: Optional set of Sample IDs whose content has changed + within the time range. If provided, Sample updates are filtered to + these IDs rather than using recorded_at. + changed_study_ids: Optional set of Study IDs whose content has changed + within the time range. If provided, Study updates are filtered to + these IDs rather than using recorded_at. Returns: - An iterator over Components whose tracking metadata have changed. + A generator over Components whose tracking metadata have changed. """ + if ( + changed_sample_ids is not None + and changed_study_ids is not None + and not changed_sample_ids + and not changed_study_ids + ): + return + + filters = [PacBioRun.recorded_at.between(since, until)] + + if changed_sample_ids is None: + filters.append(Sample.recorded_at.between(since, until)) + elif changed_sample_ids: + filters.append(Sample.id_sample_lims.in_(changed_sample_ids)) + + if changed_study_ids is None: + filters.append(Study.recorded_at.between(since, until)) + elif changed_study_ids: + filters.append(Study.id_study_lims.in_(changed_study_ids)) + query = ( sess.query( PacBioRun.pac_bio_run_name, @@ -275,11 +306,7 @@ def find_updated_components( .distinct() .join(PacBioRun.sample) .join(PacBioRun.study) - .filter( - Sample.recorded_at.between(since, until) - | Study.recorded_at.between(since, until) - | PacBioRun.recorded_at.between(since, until) - ) + .filter(or_(*filters)) .order_by( PacBioRun.pac_bio_run_name, PacBioRun.well_label, diff --git a/tests/illumina/conftest.py b/tests/illumina/conftest.py index c7650888..75c31fb7 100644 --- a/tests/illumina/conftest.py +++ b/tests/illumina/conftest.py @@ -43,6 +43,7 @@ def initialize_mlwh_illumina_synthetic(session: Session): This is represented by the files in ./tests/data/illumina/synthetic """ + default_timestamps = { "created": CREATED, "last_updated": BEGIN, @@ -196,6 +197,7 @@ def make_sample(n): def initialize_mlwh_illumina_backfill(sess: Session): """Insert ML warehouse test data for Illumina product iRODS paths.""" + changed_study = Study( id_lims="LIMS_05", id_study_lims="4000", @@ -383,6 +385,7 @@ def initialize_mlwh_illumina_backfill(sess: Session): @pytest.fixture(scope="function") def illumina_synthetic_mlwh(mlwh_session) -> Generator[Session, Any, None]: """An ML warehouse database fixture populated with Illumina-related records.""" + initialize_mlwh_illumina_synthetic(mlwh_session) yield mlwh_session @@ -391,6 +394,7 @@ def illumina_synthetic_mlwh(mlwh_session) -> Generator[Session, Any, None]: def illumina_backfill_mlwh(mlwh_session) -> Generator[Session, Any, None]: """An ML warehouse database fixture populated with Illumina iRODS path backfill records.""" + initialize_mlwh_illumina_backfill(mlwh_session) yield mlwh_session diff --git a/tests/illumina/test_locate_data_objects.py b/tests/illumina/test_locate_data_objects.py index c3231a47..13a18410 100644 --- a/tests/illumina/test_locate_data_objects.py +++ b/tests/illumina/test_locate_data_objects.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright © 2023 Genome Research Ltd. All rights reserved. +# Copyright © 2023, 2026 Genome Research Ltd. All rights reserved. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -59,3 +59,30 @@ def test_illumina_updates( assert np == 7, "Number of MLWH records processed" assert ne == 0, "Number of errors" assert stdout_lines == expected + + @m.context("When MLWH updates are timestamp-only for samples and studies") + @m.it("Should return fewer updates after priming the MLWH cache") + def test_illumina_updates_cache_reduces_updates( + self, capsys, tmp_path, illumina_synthetic_irods, illumina_synthetic_mlwh + ): + np_uncached, ne_uncached = illumina_updates( + illumina_synthetic_mlwh, BEGIN, LATEST + ) + + assert np_uncached == 7, "Number of MLWH records processed without cache" + assert ne_uncached == 0, "Number of errors without cache" + capsys.readouterr() # Clear output from the uncached run + + cache_path = tmp_path / "mlwh_cache.sqlite" + np_cached, ne_cached = illumina_updates( + illumina_synthetic_mlwh, + BEGIN, + LATEST, + cache_path=cache_path, + prime_cache=True, + ) + stdout_lines = [line for line in capsys.readouterr().out.split("\n") if line] + + assert np_cached == 0, "Number of MLWH records processed with cache" + assert ne_cached == 0, "Number of errors with cache" + assert stdout_lines == [] From 9e567774a583c9050e78791facf80e13438bbd3c Mon Sep 17 00:00:00 2001 From: Keith James Date: Tue, 24 Feb 2026 15:45:07 +0000 Subject: [PATCH 2/6] Extend caching tests --- tests/illumina/test_locate_data_objects.py | 37 +++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/tests/illumina/test_locate_data_objects.py b/tests/illumina/test_locate_data_objects.py index 13a18410..e27ff38b 100644 --- a/tests/illumina/test_locate_data_objects.py +++ b/tests/illumina/test_locate_data_objects.py @@ -19,8 +19,9 @@ from pytest import mark as m -from conftest import BEGIN, LATEST +from conftest import BEGIN, LATE, LATEST from npg_irods.cli.locate_data_objects import illumina_updates +from npg_irods.db.mlwh import Sample @m.describe("Locating data objects in iRODS") @@ -86,3 +87,37 @@ def test_illumina_updates_cache_reduces_updates( assert np_cached == 0, "Number of MLWH records processed with cache" assert ne_cached == 0, "Number of errors with cache" assert stdout_lines == [] + + @m.context("When a cached MLWH sample has content changes in the time window") + @m.it("Should process only the affected components") + def test_illumina_updates_cache_processes_changed_samples( + self, capsys, tmp_path, illumina_synthetic_irods, illumina_synthetic_mlwh + ): + cache_path = tmp_path / "mlwh_cache.sqlite" + illumina_updates( + illumina_synthetic_mlwh, + BEGIN, + LATEST, + cache_path=cache_path, + prime_cache=True, + ) + capsys.readouterr() # Clear output from the priming run + + sample = ( + illumina_synthetic_mlwh.query(Sample) + .filter_by(id_sample_lims="id_sample_lims2") + .one() + ) + sample.supplier_name = "supplier_name2_updated" + sample.recorded_at = LATEST + illumina_synthetic_mlwh.commit() + + np_cached, ne_cached = illumina_updates( + illumina_synthetic_mlwh, + LATE, + LATEST, + cache_path=cache_path, + ) + + assert np_cached == 2, "Number of MLWH records processed with cache" + assert ne_cached == 0, "Number of errors with cache" From 74be012466f8eb46f1d9393df38e6c6a79f50a8b Mon Sep 17 00:00:00 2001 From: Keith James Date: Tue, 24 Feb 2026 15:46:42 +0000 Subject: [PATCH 3/6] Update iRODS and Python versions used in local tests --- Dockerfile.dev | 11 ++++++----- README.md | 6 +++--- docker-compose.yml | 6 ++---- docker/install_pyenv.sh | 2 +- pyproject.toml | 2 +- requirements.txt | 4 ++-- 6 files changed, 15 insertions(+), 16 deletions(-) diff --git a/Dockerfile.dev b/Dockerfile.dev index 9d1e5548..1d69b75f 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -1,6 +1,6 @@ -FROM ghcr.io/wtsi-npg/ub-18.04-baton-irods-4.2.11:latest +FROM ghcr.io/wtsi-npg/ub-22.04-baton-irods-4.3.4:latest -ARG PYTHON_VERSION=3.12 +ARG PYTHON_VERSION=3.14 ENV DEBIAN_FRONTEND=noninteractive @@ -18,6 +18,7 @@ RUN apt-get update && \ make \ libbz2-dev \ libncurses-dev \ + libsqlite3-dev \ libreadline-dev \ libssl-dev \ zlib1g-dev @@ -28,7 +29,7 @@ RUN echo "deb [arch=amd64] https://packages.irods.org/apt/ $(lsb_release -sc) ma tee /etc/apt/sources.list.d/renci-irods.list && \ apt-get update && \ apt-get install -q -y --no-install-recommends \ - irods-icommands="4.2.11-1~$(lsb_release -sc)" + irods-icommands="4.3.4-0~$(lsb_release -sc)" WORKDIR /app @@ -53,7 +54,7 @@ COPY requirements.txt test-requirements.txt /app/ RUN pip install --no-cache-dir -r requirements.txt && \ pip install --no-cache-dir -r test-requirements.txt -COPY . /app/ +COPY . /app RUN pip install --no-cache-dir . && \ git status && \ @@ -65,4 +66,4 @@ USER appuser ENTRYPOINT ["/app/docker/entrypoint.sh"] -CMD ["/bin/bash", "-c", "sleep infinity"] +CMD ["/bin/bash"] diff --git a/README.md b/README.md index 9d528e46..aa36dae3 100644 --- a/README.md +++ b/README.md @@ -62,12 +62,12 @@ To run the tests in a container, you will need to have Docker installed. With this in place, you can run the tests with the following command: - docker-compose run app pytest --it + docker compose run app pytest --it There will be a delay the first time this is run because the Docker image will be built. To pre-build the image, you can run: - docker-compose build + docker compose build ## Creating a release @@ -148,4 +148,4 @@ string. ## Architecture -- `publish-directory` removes public permissions unless explicitly specified by `--group public` to be able to publish privately whilst having iRODS inheritance enabled on sequencing runs collections ([ADR 1](/docs/decisions/adr-01-publish-directory-removes-public-permissions-by-default.md)) \ No newline at end of file +- `publish-directory` removes public permissions unless explicitly specified by `--group public` to be able to publish privately whilst having iRODS inheritance enabled on sequencing runs collections ([ADR 1](/docs/decisions/adr-01-publish-directory-removes-public-permissions-by-default.md)) diff --git a/docker-compose.yml b/docker-compose.yml index 2022890a..75c6d5e4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,7 +19,7 @@ services: irods-server: platform: linux/amd64 container_name: irods-server - image: "ghcr.io/wtsi-npg/ub-16.04-irods-4.2.7:latest" + image: "ghcr.io/wtsi-npg/ub-22.04-irods-4.3.4:latest" ports: - "127.0.0.1:1247:1247" - "127.0.0.1:20000-20199:20000-20199" @@ -37,10 +37,8 @@ services: context: . dockerfile: Dockerfile.dev restart: always - volumes: - - "./tests/.irods:/home/appuser/.irods/" environment: - IRODS_ENVIRONMENT_FILE: "/home/appuser/.irods/irods_environment.json" + IRODS_ENVIRONMENT_FILE: "/app/tests/.irods/irods_environment.json" IRODS_PASSWORD: "irods" depends_on: irods-server: diff --git a/docker/install_pyenv.sh b/docker/install_pyenv.sh index 677f1bab..97762639 100755 --- a/docker/install_pyenv.sh +++ b/docker/install_pyenv.sh @@ -2,7 +2,7 @@ set -ex -PYENV_RELEASE_VERSION=${PYENV_RELEASE_VERSION:="2.4.16"} +PYENV_RELEASE_VERSION=${PYENV_RELEASE_VERSION:="2.6.16"} export PYENV_GIT_TAG="v${PYENV_RELEASE_VERSION}" PYENV_ROOT=${PYENV_ROOT:-"$HOME/.pyenv"} diff --git a/pyproject.toml b/pyproject.toml index a2e8c089..ebf92ec8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ dynamic = ["version"] dependencies = [ "npg-python-lib >= 1.0.0,<2", "npg_id_generation >=5.0.1", - "partisan >=4.0.2,<5", + "partisan >=4.1,<5", "pymysql >=1.1.1", "python-dateutil >=2.9.0,<3", "rich >=13.6.0", diff --git a/requirements.txt b/requirements.txt index c8e3fcee..e4ce53a9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ cryptography==46.0.3 npg_id_generation@https://github.com/wtsi-npg/npg_id_generation/releases/download/5.0.1/npg_id_generation-5.0.1.tar.gz -npg-python-lib@https://github.com/wtsi-npg/npg-python-lib/releases/download/1.1.0/npg_python_lib-1.1.0.tar.gz -partisan@https://github.com/wtsi-npg/partisan/releases/download/4.0.2/partisan-4.0.2.tar.gz +npg-python-lib@https://github.com/wtsi-npg/npg-python-lib/releases/download/1.2.0/npg_python_lib-1.2.0.tar.gz +partisan@https://github.com/wtsi-npg/partisan/releases/download/4.1.2/partisan-4.1.2.tar.gz pymysql==1.1.2 python-dateutil==2.9.0.post0 rich==14.0.0 From 620df6ea46d909c7947790c58440759c04d65336 Mon Sep 17 00:00:00 2001 From: Keith James Date: Tue, 24 Feb 2026 15:47:26 +0000 Subject: [PATCH 4/6] Refactor CLI setup --- src/npg_irods/cli/locate_data_objects.py | 82 +++++++++++------------- 1 file changed, 37 insertions(+), 45 deletions(-) diff --git a/src/npg_irods/cli/locate_data_objects.py b/src/npg_irods/cli/locate_data_objects.py index 87000124..91f38be5 100644 --- a/src/npg_irods/cli/locate_data_objects.py +++ b/src/npg_irods/cli/locate_data_objects.py @@ -620,6 +620,20 @@ def sequenom_genotype_updates( return num_processed, num_errors +def _add_mlwh_cache_arguments(parser: argparse.ArgumentParser): + parser.add_argument( + "--mlwh-cache", + "--mlwh_cache", + help="Path to a SQLite cache used to filter Sample/Study updates by content.", + type=str, + ) + parser.add_argument( + "--prime-mlwh-cache", + "--prime_mlwh_cache", + help="Prime the MLWH cache with all Sample/Study rows before filtering.", + action="store_true", + ) + def _load_mlwh_change_ids( sess: Session, since: datetime, @@ -740,27 +754,28 @@ def _print_batch(items: set[RodsItem], json: bool = False): def main(): - parser = argparse.ArgumentParser( + root_parser = argparse.ArgumentParser( description=description, formatter_class=argparse.RawDescriptionHelpFormatter ) - add_logging_arguments(parser) - add_db_config_arguments(parser) + add_logging_arguments(root_parser) + add_db_config_arguments(root_parser) - parser.add_argument( + root_parser.add_argument( "--zone", help="Specify a federated iRODS zone in which to find data objects to check. " "This is not required if the target collections are in the local zone.", type=str, ) - parser.add_argument( + root_parser.add_argument( "--version", help="Print the version and exit.", action="version", version=version(), ) - subparsers = parser.add_subparsers(title="Sub-commands", required=True) + subparsers = root_parser.add_subparsers(title="Sub-commands", required=True) + # CLI to locate data with consent withdrawn metadata updates cwdr_parser = subparsers.add_parser( "consent-withdrawn", help="Find data objects related to samples whose consent for data use has " @@ -774,12 +789,15 @@ def main(): ) cwdr_parser.set_defaults(func=consent_withdrawn) + # CLI to find Illumina run metadata updates ilup_parser = subparsers.add_parser( "illumina-updates", help="Find data objects, which are components of Illumina runs, whose tracking " "metadata in the ML warehouse have changed since a specified time.", ) add_date_range_arguments(ilup_parser) + _add_mlwh_cache_arguments(ilup_parser) + ilup_parser.add_argument( "--skip-absent-runs", "--skip_absent_runs", @@ -797,26 +815,16 @@ def main(): help="Print output in JSON format.", action="store_true", ) - ilup_parser.add_argument( - "--mlwh-cache", - "--mlwh_cache", - help="Path to a SQLite cache used to filter Sample/Study updates by content.", - type=str, - ) - ilup_parser.add_argument( - "--prime-mlwh-cache", - "--prime_mlwh_cache", - help="Prime the MLWH cache with all Sample/Study rows before filtering.", - action="store_true", - ) ilup_parser.set_defaults(func=illumina_updates_cli) + # CLI to find ONT runfolder collections created within a specified time range ontcre_parser = subparsers.add_parser( "ont-run-creation", help="Find ONT runfolder collections created in iRODS within a specified time " "range.", ) add_date_range_arguments(ontcre_parser) + ontcre_parser.add_argument( "--report-json", "--report_json", @@ -825,12 +833,15 @@ def main(): ) ontcre_parser.set_defaults(func=ont_run_collections_created_cli) + # CLI to find ONT run metadata updates ontup_parser = subparsers.add_parser( "ont-updates", help="Find collections, containing data objects for ONT runs, whose tracking" "metadata in the ML warehouse have changed since a specified time.", ) add_date_range_arguments(ontup_parser) + _add_mlwh_cache_arguments(ontup_parser) + ontup_parser.add_argument( "--report-tags", "--report_tags", @@ -843,26 +854,17 @@ def main(): help="Print output in JSON format.", action="store_true", ) - ontup_parser.add_argument( - "--mlwh-cache", - "--mlwh_cache", - help="Path to a SQLite cache used to filter Sample/Study updates by content.", - type=str, - ) - ontup_parser.add_argument( - "--prime-mlwh-cache", - "--prime_mlwh_cache", - help="Prime the MLWH cache with all Sample/Study rows before filtering.", - action="store_true", - ) ontup_parser.set_defaults(func=ont_updates_cli) + # CLI to find PacBio run metadata updates pbup_parser = subparsers.add_parser( "pacbio-updates", help="Find data objects, which are components of PacBio runs, whose tracking " "metadata in the ML warehouse have changed since a specified time.", ) add_date_range_arguments(pbup_parser) + _add_mlwh_cache_arguments(pbup_parser) + pbup_parser.add_argument( "--skip-absent-runs", "--skip_absent_runs", @@ -880,28 +882,16 @@ def main(): help="Print output in JSON format.", action="store_true", ) - pbup_parser.add_argument( - "--mlwh-cache", - "--mlwh_cache", - help="Path to a SQLite cache used to filter Sample/Study updates by content.", - type=str, - ) - pbup_parser.add_argument( - "--prime-mlwh-cache", - "--prime_mlwh_cache", - help="Prime the MLWH cache with all Sample/Study rows before filtering. This " - "is only useful when the cache is stale or empty. Do not use this option when " - "you want to detect updates.", - action="store_true", - ) pbup_parser.set_defaults(func=pacbio_updates_cli) + # CLI to find Infinium microarray run metadata updates imup_parser = subparsers.add_parser( "infinium-updates", help="Find data objects related to Infinium microarray samples whose tracking " "metadata in the ML warehouse have changed since a specified time.", ) add_date_range_arguments(imup_parser) + imup_parser.add_argument( "--report-json", "--report_json", @@ -910,12 +900,14 @@ def main(): ) imup_parser.set_defaults(func=infinium_updates_cli) + # CLI to find Sequenom genotype run metadata updates squp_parser = subparsers.add_parser( "sequenom-updates", help="Find data objects related to Sequenom genotype samples whose tracking " "metadata in the ML warehouse have changed since a specified time.", ) add_date_range_arguments(squp_parser) + squp_parser.add_argument( "--report-json", "--report_json", @@ -924,7 +916,7 @@ def main(): ) squp_parser.set_defaults(func=sequenom_updates_cli) - args = parser.parse_args() + args = root_parser.parse_args() configure_structlog( config_file=args.log_config, debug=args.debug, From d8ad2f0e9239f5bc07fec12bf2dd884f6853ac03 Mon Sep 17 00:00:00 2001 From: Keith James Date: Tue, 24 Feb 2026 16:38:02 +0000 Subject: [PATCH 5/6] Black format --- src/npg_irods/cli/locate_data_objects.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/npg_irods/cli/locate_data_objects.py b/src/npg_irods/cli/locate_data_objects.py index 91f38be5..dccac24f 100644 --- a/src/npg_irods/cli/locate_data_objects.py +++ b/src/npg_irods/cli/locate_data_objects.py @@ -622,18 +622,19 @@ def sequenom_genotype_updates( def _add_mlwh_cache_arguments(parser: argparse.ArgumentParser): parser.add_argument( - "--mlwh-cache", - "--mlwh_cache", - help="Path to a SQLite cache used to filter Sample/Study updates by content.", - type=str, + "--mlwh-cache", + "--mlwh_cache", + help="Path to a SQLite cache used to filter Sample/Study updates by content.", + type=str, ) parser.add_argument( - "--prime-mlwh-cache", - "--prime_mlwh_cache", - help="Prime the MLWH cache with all Sample/Study rows before filtering.", - action="store_true", + "--prime-mlwh-cache", + "--prime_mlwh_cache", + help="Prime the MLWH cache with all Sample/Study rows before filtering.", + action="store_true", ) + def _load_mlwh_change_ids( sess: Session, since: datetime, From 0eaed84d23b15b122279a9209f0a458bb9625933 Mon Sep 17 00:00:00 2001 From: Keith James Date: Tue, 24 Feb 2026 16:53:20 +0000 Subject: [PATCH 6/6] Fix imports --- tests/illumina/test_locate_data_objects.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/illumina/test_locate_data_objects.py b/tests/illumina/test_locate_data_objects.py index e27ff38b..c1fa4f05 100644 --- a/tests/illumina/test_locate_data_objects.py +++ b/tests/illumina/test_locate_data_objects.py @@ -19,7 +19,7 @@ from pytest import mark as m -from conftest import BEGIN, LATE, LATEST +from helpers import BEGIN, LATE, LATEST from npg_irods.cli.locate_data_objects import illumina_updates from npg_irods.db.mlwh import Sample