Skip to content

Commit 2ddbb84

Browse files
committed
Add MLWH cache hack
Add a local SQLite cache of sample and study data hashes so that we can tell when certain column values have changed in the MLWH. There are columns in these tables in the MLWH that advertise change timestamps, but they often wrongly reflect the last time a row was replaced without changing its data. This can be removed once thie issue is addressed upstream.
1 parent 8e3bba0 commit 2ddbb84

8 files changed

Lines changed: 778 additions & 48 deletions

File tree

src/npg_irods/cli/locate_data_objects.py

Lines changed: 147 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# -*- coding: utf-8 -*-
22
#
3-
# Copyright © 2023, 2024 Genome Research Ltd. All rights reserved.
3+
# Copyright © 2023, 2024, 2026 Genome Research Ltd. All rights reserved.
44
#
55
# This program is free software: you can redistribute it and/or modify
66
# it under the terms of the GNU General Public License as published by
@@ -21,6 +21,7 @@
2121
import argparse
2222
import sys
2323
from datetime import datetime
24+
from pathlib import Path
2425
from typing import Any, Iterator
2526

2627
import sqlalchemy
@@ -51,6 +52,7 @@
5152
find_updated_samples,
5253
find_updated_studies,
5354
)
55+
from npg_irods.db.mlwh_cache import MlwhChangeCache
5456
from npg_irods.exception import CollectionNotFound
5557
from npg_irods.illumina import find_qc_collection
5658
from npg_irods.metadata import infinium
@@ -162,6 +164,7 @@ def consent_withdrawn(cli_args: argparse.ArgumentParser):
162164
def illumina_updates_cli(cli_args: argparse.ArgumentParser):
163165
"""Process the command line arguments for finding Illumina data objects and execute
164166
the command."""
167+
165168
dbconfig = IniData(db.Config).from_file(cli_args.db_config.name, "mlwh_ro")
166169
engine = sqlalchemy.create_engine(
167170
dbconfig.url, pool_pre_ping=True, pool_recycle=3600
@@ -171,10 +174,19 @@ def illumina_updates_cli(cli_args: argparse.ArgumentParser):
171174
skip_absent_runs = cli_args.skip_absent_runs
172175
json = cli_args.report_json
173176
zone = cli_args.zone
177+
cache_path = cli_args.mlwh_cache
178+
prime_cache = cli_args.prime_mlwh_cache
174179

175180
with Session(engine) as sess:
176181
num_proc, num_errors = illumina_updates(
177-
sess, since, until, skip_absent_runs=skip_absent_runs, json=json, zone=zone
182+
sess,
183+
since,
184+
until,
185+
skip_absent_runs=skip_absent_runs,
186+
json=json,
187+
zone=zone,
188+
cache_path=Path(cache_path),
189+
prime_cache=prime_cache,
178190
)
179191

180192
if num_errors:
@@ -188,6 +200,8 @@ def illumina_updates(
188200
skip_absent_runs: int = None,
189201
json: bool = False,
190202
zone: str = None,
203+
cache_path: Path | None = None,
204+
prime_cache: bool = False,
191205
) -> tuple[int, int]:
192206
"""Find recently updated Illumina data in the ML warehouse, locate corresponding
193207
data objects in iRODS and print their paths.
@@ -200,19 +214,32 @@ def illumina_updates(
200214
this number of attempts.
201215
json: Print output in JSON format.
202216
zone: iRODS zone to query.
217+
cache_path: Path to local content-hash cache for MLWH change detection.
218+
prime_cache: Prime the cache with all samples and studies in the MLWH.
203219
204220
Returns:
205221
The number of ML warehouse records processed, the number of errors encountered.
206222
"""
223+
207224
num_processed = num_errors = 0
208225
attempts = successes = 0
209226
to_print = set()
210227

211228
if skip_absent_runs is not None:
212229
log.info("Skipping absent runs after n attempts", n=skip_absent_runs)
213230

231+
changed_sample_ids, changed_study_ids = _load_mlwh_change_ids(
232+
sess, since, until, cache_path, prime_cache=prime_cache
233+
)
234+
214235
for prev, curr in with_previous(
215-
illumina.find_updated_components(sess, since=since, until=until)
236+
illumina.find_updated_components(
237+
sess,
238+
since=since,
239+
until=until,
240+
changed_sample_ids=changed_sample_ids,
241+
changed_study_ids=changed_study_ids,
242+
)
216243
):
217244
if curr is None: # Last item when this is reached
218245
continue
@@ -276,6 +303,7 @@ def illumina_updates(
276303
def ont_updates_cli(cli_args: argparse.ArgumentParser):
277304
"""Process the command line arguments for finding ONT data objects and execute the
278305
command."""
306+
279307
dbconfig = IniData(db.Config).from_file(cli_args.db_config.name, "mlwh_ro")
280308
engine = sqlalchemy.create_engine(
281309
dbconfig.url, pool_pre_ping=True, pool_recycle=3600
@@ -285,10 +313,19 @@ def ont_updates_cli(cli_args: argparse.ArgumentParser):
285313
report_tags = cli_args.report_tags
286314
json = cli_args.report_json
287315
zone = cli_args.zone
316+
cache_path = cli_args.mlwh_cache
317+
prime_cache = cli_args.prime_mlwh_cache
288318

289319
with Session(engine) as sess:
290320
num_proc, num_errors = ont_updates(
291-
sess, since, until, report_tags=report_tags, json=json, zone=zone
321+
sess,
322+
since,
323+
until,
324+
report_tags=report_tags,
325+
json=json,
326+
zone=zone,
327+
cache_path=Path(cache_path),
328+
prime_cache=prime_cache,
292329
)
293330

294331
if num_errors:
@@ -302,12 +339,23 @@ def ont_updates(
302339
report_tags: bool = False,
303340
json: bool = False,
304341
zone: str = None,
342+
cache_path: Path | None = None,
343+
prime_cache: bool = False,
305344
) -> tuple[int, int]:
306345
num_processed = num_errors = 0
307346

347+
changed_sample_ids, changed_study_ids = _load_mlwh_change_ids(
348+
sess, since, until, cache_path, prime_cache=prime_cache
349+
)
350+
308351
for i, c in enumerate(
309352
ont.find_updated_components(
310-
sess, since=since, until=until, include_tags=report_tags
353+
sess,
354+
since=since,
355+
until=until,
356+
include_tags=report_tags,
357+
changed_sample_ids=changed_sample_ids,
358+
changed_study_ids=changed_study_ids,
311359
)
312360
):
313361
num_processed += 1
@@ -346,6 +394,7 @@ def ont_updates(
346394
def ont_run_collections_created_cli(cli_args: argparse.ArgumentParser):
347395
"""Process the command line arguments for finding ONT runfolder collections
348396
selected on the time they were created in iRODS, and execute the command."""
397+
349398
since = cli_args.begin_date
350399
until = cli_args.end_date
351400
json = cli_args.report_json
@@ -387,6 +436,7 @@ def ont_run_collections_created(
387436
def pacbio_updates_cli(cli_args: argparse.ArgumentParser):
388437
"""Process the command line arguments for finding PacBio data objects and execute
389438
the command."""
439+
390440
dbconfig = IniData(db.Config).from_file(cli_args.db_config.name, "mlwh_ro")
391441
engine = sqlalchemy.create_engine(
392442
dbconfig.url, pool_pre_ping=True, pool_recycle=3600
@@ -396,10 +446,19 @@ def pacbio_updates_cli(cli_args: argparse.ArgumentParser):
396446
skip_absent_runs = cli_args.skip_absent_runs
397447
json = cli_args.report_json
398448
zone = cli_args.zone
449+
cache_path = cli_args.mlwh_cache
450+
prime_cache = cli_args.prime_mlwh_cache
399451

400452
with Session(engine) as sess:
401453
num_proc, num_errors = pacbio_updates(
402-
sess, since, until, skip_absent_runs=skip_absent_runs, json=json, zone=zone
454+
sess,
455+
since,
456+
until,
457+
skip_absent_runs=skip_absent_runs,
458+
json=json,
459+
zone=zone,
460+
cache_path=Path(cache_path),
461+
prime_cache=prime_cache,
403462
)
404463

405464
if num_errors:
@@ -413,16 +472,28 @@ def pacbio_updates(
413472
skip_absent_runs: int = None,
414473
json: bool = False,
415474
zone: str = None,
416-
) -> (int, int):
475+
cache_path: Path | None = None,
476+
prime_cache: bool = False,
477+
) -> tuple[int, int]:
417478
num_processed = num_errors = 0
418479
attempts = successes = 0
419480
to_print = set()
420481

421482
if skip_absent_runs is not None:
422483
log.info("Skipping absent runs after n attempts", n=skip_absent_runs)
423484

485+
changed_sample_ids, changed_study_ids = _load_mlwh_change_ids(
486+
sess, since, until, cache_path, prime_cache=prime_cache
487+
)
488+
424489
for prev, curr in with_previous(
425-
pacbio.find_updated_components(sess, since=since, until=until)
490+
pacbio.find_updated_components(
491+
sess,
492+
since=since,
493+
until=until,
494+
changed_sample_ids=changed_sample_ids,
495+
changed_study_ids=changed_study_ids,
496+
)
426497
):
427498
if curr is None: # Last item when this is reached
428499
continue
@@ -481,6 +552,7 @@ def pacbio_updates(
481552
def infinium_updates_cli(cli_args: argparse.ArgumentParser):
482553
"""Process the command line arguments for finding Infinium microarray data objects
483554
and execute the command."""
555+
484556
dbconfig = IniData(db.Config).from_file(cli_args.db_config.name, "mlwh_ro")
485557
engine = sqlalchemy.create_engine(
486558
dbconfig.url, pool_pre_ping=True, pool_recycle=3600
@@ -519,6 +591,7 @@ def infinium_microarray_updates(
519591
def sequenom_updates_cli(cli_args: argparse.ArgumentParser):
520592
"""Process the command line arguments for finding Sequenom genotype data objects
521593
and execute the command."""
594+
522595
dbconfig = IniData(db.Config).from_file(cli_args.db_config.name, "mlwh_ro")
523596
engine = sqlalchemy.create_engine(
524597
dbconfig.url, pool_pre_ping=True, pool_recycle=3600
@@ -547,6 +620,33 @@ def sequenom_genotype_updates(
547620
return num_processed, num_errors
548621

549622

623+
def _load_mlwh_change_ids(
624+
sess: Session,
625+
since: datetime,
626+
until: datetime,
627+
cache_path: Path | None,
628+
prime_cache: bool = False,
629+
) -> tuple[set[str] | None, set[str] | None]:
630+
if cache_path is None:
631+
if prime_cache:
632+
log.warning("MLWH cache priming requested without cache path")
633+
634+
return None, None
635+
636+
with MlwhChangeCache(cache_path, prime_cache=prime_cache) as cache:
637+
sample_ids = cache.changed_sample_ids(sess, since, until)
638+
study_ids = cache.changed_study_ids(sess, since, until)
639+
640+
log.info(
641+
"Filtering MLWH updates using cache",
642+
cache=str(cache_path),
643+
samples=len(sample_ids),
644+
studies=len(study_ids),
645+
)
646+
647+
return sample_ids, study_ids
648+
649+
550650
def _print_data_objects_updated_in_mlwh(
551651
sess: Session,
552652
query: list[AVU],
@@ -576,7 +676,7 @@ def _print_data_objects_updated_in_mlwh(
576676

577677
def _find_and_print_data_objects(
578678
attr: Any,
579-
values: Iterator[int],
679+
values: Iterator[str],
580680
query: list[AVU],
581681
since: datetime,
582682
until: datetime,
@@ -697,6 +797,18 @@ def main():
697797
help="Print output in JSON format.",
698798
action="store_true",
699799
)
800+
ilup_parser.add_argument(
801+
"--mlwh-cache",
802+
"--mlwh_cache",
803+
help="Path to a SQLite cache used to filter Sample/Study updates by content.",
804+
type=str,
805+
)
806+
ilup_parser.add_argument(
807+
"--prime-mlwh-cache",
808+
"--prime_mlwh_cache",
809+
help="Prime the MLWH cache with all Sample/Study rows before filtering.",
810+
action="store_true",
811+
)
700812
ilup_parser.set_defaults(func=illumina_updates_cli)
701813

702814
ontcre_parser = subparsers.add_parser(
@@ -731,6 +843,18 @@ def main():
731843
help="Print output in JSON format.",
732844
action="store_true",
733845
)
846+
ontup_parser.add_argument(
847+
"--mlwh-cache",
848+
"--mlwh_cache",
849+
help="Path to a SQLite cache used to filter Sample/Study updates by content.",
850+
type=str,
851+
)
852+
ontup_parser.add_argument(
853+
"--prime-mlwh-cache",
854+
"--prime_mlwh_cache",
855+
help="Prime the MLWH cache with all Sample/Study rows before filtering.",
856+
action="store_true",
857+
)
734858
ontup_parser.set_defaults(func=ont_updates_cli)
735859

736860
pbup_parser = subparsers.add_parser(
@@ -756,6 +880,20 @@ def main():
756880
help="Print output in JSON format.",
757881
action="store_true",
758882
)
883+
pbup_parser.add_argument(
884+
"--mlwh-cache",
885+
"--mlwh_cache",
886+
help="Path to a SQLite cache used to filter Sample/Study updates by content.",
887+
type=str,
888+
)
889+
pbup_parser.add_argument(
890+
"--prime-mlwh-cache",
891+
"--prime_mlwh_cache",
892+
help="Prime the MLWH cache with all Sample/Study rows before filtering. This "
893+
"is only useful when the cache is stale or empty. Do not use this option when "
894+
"you want to detect updates.",
895+
action="store_true",
896+
)
759897
pbup_parser.set_defaults(func=pacbio_updates_cli)
760898

761899
imup_parser = subparsers.add_parser(

src/npg_irods/db/mlwh.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import enum
2323
from contextlib import contextmanager
2424
from datetime import datetime, timedelta
25-
from typing import Iterator, Type
25+
from typing import Any, Generator, Iterator, Type
2626

2727
import structlog
2828
from sqlalchemy import (
@@ -364,7 +364,7 @@ class SeqProductIrodsLocations(Base):
364364

365365

366366
@contextmanager
367-
def session_context(engine: Engine) -> Session:
367+
def session_context(engine: Engine) -> Generator[Session, Any, None]:
368368
"""Yield a session and close, or rollback on error. This context manager does
369369
not handle exceptions and will raise them to the caller."""
370370

@@ -426,7 +426,7 @@ def find_sample_by_sample_id(sess: Session, sample_id: str) -> Sample:
426426

427427
def find_updated_samples(
428428
sess: Session, since: datetime, until: datetime
429-
) -> Iterator[int]:
429+
) -> Iterator[str]:
430430
"""Return IDs of Samples that have been updated in the ML warehouse.
431431
432432
Args:
@@ -454,7 +454,7 @@ def find_updated_samples(
454454

455455
def find_updated_studies(
456456
sess: Session, since: datetime, until: datetime
457-
) -> Iterator[int]:
457+
) -> Iterator[str]:
458458
"""Return IDs of Studies that have been updated in the ML warehouse.
459459
460460
Args:

0 commit comments

Comments
 (0)