diff --git a/.github/workflows/cibuild.yml b/.github/workflows/cibuild.yml index 85c8430..1f40095 100644 --- a/.github/workflows/cibuild.yml +++ b/.github/workflows/cibuild.yml @@ -9,6 +9,7 @@ jobs: max-parallel: 4 matrix: python-version: ["3.11", "3.12", "3.13"] + fail-fast: false steps: - uses: actions/checkout@v1 - name: Set up Python ${{ matrix.python-version }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 1433eaa..a27a69e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](http://semver.org/). - Implemented Python 3.12 support by integrating happer package and increasing minimum version of MicroHapDB dependency (#193). - Updated working directory organization to provide additional structure (#194). - Snakemake is now invoked using its new Python API, requiring a minimum version of 8.0 and Python 3.11+ (#195). +- FASTQ intake is now handled by a dedicated package ezfastq, which was based on MicroHapulator code (#196). ### Fixed - Bug with handling marker vs. locus identifiers when running `mhpl8r seq` (#190). diff --git a/microhapulator/cli/pipe.py b/microhapulator/cli/pipe.py index 43b15e2..6283a0f 100644 --- a/microhapulator/cli/pipe.py +++ b/microhapulator/cli/pipe.py @@ -12,12 +12,12 @@ from argparse import SUPPRESS -from collections import defaultdict +import ezfastq +from ezfastq.pair import PairMode from importlib.resources import files from microhapulator.marker import MicrohapIndex -from os import cpu_count, symlink +from os import cpu_count from pathlib import Path -from shutil import copy from snakemake.api import SnakemakeApi as smk_api from snakemake.exceptions import WorkflowError from snakemake.settings import types as smk_types @@ -26,11 +26,10 @@ def main(args): validate_panel_config(args.markerrefr, args.markerdefn) - samples, filenames = get_input_files(args.samples, args.seqpath, args.reads_are_paired) - workingfiles = link_or_copy_input(filenames, args.workdir, docopy=args.copy_input) + pair_mode = PairMode.PairedEnd if args.reads_are_paired else PairMode.SingleEnd + ezfastq.api.copy(args.samples, args.seqpath, pair_mode=pair_mode, workdir=Path(args.workdir)) config = dict( - samples=samples, - readfiles=workingfiles, + samples=args.samples, mhrefr=str(Path(args.markerrefr).resolve()), mhdefn=str(Path(args.markerdefn).resolve()), hg38path=str(Path(args.hg38).resolve()), @@ -73,106 +72,6 @@ def validate_panel_config(markerseqs, markerdefn): index.validate() -def get_input_files(sample_names, seqpath, reads_are_paired=True, suffixes=None): - """Find input files for each sample - - This function traverses `seqpath` and any of its sub-directories for FASTQ files. Any FASTQ - file matching one of the sample names is stored in a dictionary with that sample name. Then the - function checks each sample to test whether the number of files found matches the number of - expected files. Files for samples that pass this test are stored in a list to be passed to - Snakemake. (I would prefer to pass the data as a dictionary, but Snakemake complains when the - config object is a nested dictionary. So instead we'll reconstruct the dictionary from this - list in the Snakefile.) - """ - if suffixes is None: - suffixes = (".fastq", ".fastq.gz", ".fq", ".fq.gz") - sample_names = check_sample_names(sample_names) - files = defaultdict(list) - for filepath in traverse(seqpath): - if not filepath.name.endswith(suffixes): - continue - for sample in sample_names: - if sample in filepath.name: - files[sample].append(filepath) - final_file_list = list() - for sample in sample_names: - filelist = files[sample] - filelist_ok = validate_sample_input_files(len(filelist), sample, reads_are_paired) - if filelist_ok: - final_file_list.extend(filelist) - unique_file_names = set([filepath.name for filepath in final_file_list]) - if len(unique_file_names) != len(final_file_list): - raise ValueError("duplicate FASTQ file names found; refusing to proceed") - return sample_names, final_file_list - - -def check_sample_names(samples): - """Parse sample names and perform sanity checks - - Input is expected to be a list of strings corresponding to sample names. This function ensures - that there are no duplicate sample names and no sample names that are substrings of other - sample names. - - If the list contains only a single string and that string corresponds to a valid file path, it - is assumed to be a file containing a list of sample names, one per line. - """ - if len(samples) == 1 and Path(samples[0]).is_file(): - with open(samples[0], "r") as fh: - samples = fh.read().strip().split("\n") - samples = list(sorted(samples)) - for s1 in samples: - for s2 in samples: - if s1 == s2: - continue - if s1 in s2: - message = f"cannot correctly process a sample name that is a substring of another sample name: {s1} vs. {s2}" - raise ValueError(message) - return samples - - -def traverse(dirpath): - dirpath = Path(dirpath) - if not dirpath.is_dir(): - return - for subpath in dirpath.iterdir(): - if subpath.is_dir(): - yield from traverse(subpath) - else: - yield subpath - - -def validate_sample_input_files(numfiles, sample, reads_are_paired=True): - if numfiles == 0: - raise FileNotFoundError(f"sample {sample}: found 0 FASTQ files") - if reads_are_paired: - exp_num_fastq_files = 2 - mode = "paired" - else: - exp_num_fastq_files = 1 - mode = "single" - if numfiles != exp_num_fastq_files: - message = ( - f"sample {sample}: found {numfiles} FASTQ files" - f", expected {exp_num_fastq_files} in {mode}-end mode" - ) - raise ValueError(message) - return True - - -def link_or_copy_input(filenames, workdir, docopy=False): - seqdir = Path(workdir) / "seq" - seqdir.mkdir(parents=True, exist_ok=True) - workingfiles = list() - for filepath in filenames: - workingpath = seqdir / filepath.name - workingfiles.append(Path("seq") / workingpath.name) - if workingpath.is_file(): - continue - createfunc = copy if docopy else symlink - createfunc(filepath.resolve(), workingpath) - return [str(wf) for wf in workingfiles] - - def subparser(subparsers): desc = "Perform a complete end-to-end microhap analysis pipeline" cli = subparsers.add_parser("pipe", description=desc) @@ -258,11 +157,6 @@ def subparser(subparsers): action="store_false", help="accept single-end reads only; by default, only paired-end reads are accepted", ) - cli.add_argument( - "--copy-input", - action="store_true", - help="copy input files to working directory; by default, input files are symlinked", - ) cli.add_argument( "--hg38", default=files("microhapulator") / "data" / "hg38.fasta.gz", diff --git a/microhapulator/tests/test_pipe.py b/microhapulator/tests/test_pipe.py index ae969fe..0e53b9e 100644 --- a/microhapulator/tests/test_pipe.py +++ b/microhapulator/tests/test_pipe.py @@ -13,26 +13,12 @@ from glob import glob import microhapulator import microhapulator.api as mhapi -from microhapulator.cli.pipe import validate_sample_input_files from microhapulator.profile import SimulatedProfile, TypingResult from microhapulator.tests import data_file import pandas as pd import pytest -def test_validate_sample_input_files(): - assert validate_sample_input_files(1, "S1", reads_are_paired=False) is True - assert validate_sample_input_files(2, "S2", reads_are_paired=True) is True - with pytest.raises(ValueError, match=r"expected 2 in paired-end mode"): - validate_sample_input_files(1, "S3", reads_are_paired=True) - with pytest.raises(ValueError, match=r"expected 1 in single-end mode"): - validate_sample_input_files(2, "S4", reads_are_paired=False) - with pytest.raises(FileNotFoundError): - validate_sample_input_files(0, "S5") - with pytest.raises(ValueError, match=r"expected 2 in paired-end mode"): - validate_sample_input_files(4, "S6", reads_are_paired=True) - - def test_pipe_missing_files(tmp_path): arglist = [ "pipe", diff --git a/microhapulator/workflows/preproc-paired.smk b/microhapulator/workflows/preproc-paired.smk index a66d127..9726341 100644 --- a/microhapulator/workflows/preproc-paired.smk +++ b/microhapulator/workflows/preproc-paired.smk @@ -28,7 +28,8 @@ summary_aux_files = (expand("analysis/{sample}/flash.log", sample=config["sample rule fastqc: input: - lambda wildcards: sorted([fq for fq in config["readfiles"] if wildcards.sample in fq]), + r1="seq/{sample}_R1.fastq.gz", + r2="seq/{sample}_R2.fastq.gz", output: r1="analysis/{sample}/01preprocessing/fastqc/R1-fastqc.html", r2="analysis/{sample}/01preprocessing/fastqc/R2-fastqc.html", @@ -61,7 +62,8 @@ rule multiqc: rule filter_ambiguous: input: - lambda wildcards: sorted([fq for fq in config["readfiles"] if wildcards.sample in fq]), + r1="seq/{sample}_R1.fastq.gz", + r2="seq/{sample}_R2.fastq.gz", output: filtered_r1="analysis/{sample}/01preprocessing/{sample}-ambig-filtered-R1.fastq.gz", filtered_r2="analysis/{sample}/01preprocessing/{sample}-ambig-filtered-R2.fastq.gz", @@ -125,22 +127,17 @@ rule filter_length: rule calculate_read_lengths: input: - lambda wildcards: sorted([fq for fq in config["readfiles"] if wildcards.sample in fq]), - rules.merge.output.mergedfq, + r1="seq/{sample}_R1.fastq.gz", + r2="seq/{sample}_R2.fastq.gz", + rm=rules.merge.output.mergedfq, output: l1="analysis/{sample}/01preprocessing/{sample}-r1-read-lengths.json", l2="analysis/{sample}/01preprocessing/{sample}-r2-read-lengths.json", merged="analysis/{sample}/01preprocessing/{sample}-merged-read-lengths.json", run: - mhapi.calculate_read_lengths( - input[0], - output.l1, - ) - mhapi.calculate_read_lengths( - input[1], - output.l2, - ) - mhapi.calculate_read_lengths(input[2], output.merged) + mhapi.calculate_read_lengths(input.r1, output.l1) + mhapi.calculate_read_lengths(input.r2, output.l2) + mhapi.calculate_read_lengths(input.rm, output.merged) rule plot_read_length_distributions: diff --git a/microhapulator/workflows/preproc-single.smk b/microhapulator/workflows/preproc-single.smk index f90b28d..bb240ff 100644 --- a/microhapulator/workflows/preproc-single.smk +++ b/microhapulator/workflows/preproc-single.smk @@ -23,7 +23,7 @@ summary_aux_files = list() rule fastqc: input: - lambda wildcards: sorted([fq for fq in config["readfiles"] if wildcards.sample in fq]), + fastq="seq/{sample}.fastq.gz", output: html="analysis/{sample}/01preprocessing/fastqc/report.html", params: @@ -49,7 +49,7 @@ rule multiqc: rule filter_ambiguous: input: - lambda wildcards: sorted([fq for fq in config["readfiles"] if wildcards.sample in fq]), + fastq="seq/{sample}.fastq.gz", output: filtered_fq="analysis/{sample}/01preprocessing/{sample}-ambig-filtered.fastq.gz", counts="analysis/{sample}/01preprocessing/{sample}-ambig-read-counts.txt", @@ -84,14 +84,11 @@ rule filter_length: rule calculate_read_lengths: input: - lambda wildcards: sorted([fq for fq in config["readfiles"] if wildcards.sample in fq]), + fastq="seq/{sample}.fastq.gz", output: json="analysis/{sample}/01preprocessing/{sample}-read-lengths.json", run: - mhapi.calculate_read_lengths( - input[0], - output.json, - ) + mhapi.calculate_read_lengths(input.fastq, output.json) rule plot_read_length_distributions: diff --git a/setup.py b/setup.py index 210d6f4..d841e68 100644 --- a/setup.py +++ b/setup.py @@ -44,6 +44,7 @@ include_package_data=True, install_requires=[ "biopython", + "ezfastq", "insilicoseq>=1.5.4,<2.0", "jsonschema>=4.0", "matplotlib>=3.0",