From e9520371eb96a4a90c78d75418fa43f50b6e2941 Mon Sep 17 00:00:00 2001 From: Seth Stadick Date: Sun, 3 May 2026 06:17:42 -0400 Subject: [PATCH 1/5] feat: add experimental seqair pileup backend --- Cargo.lock | 215 +++++++++++-- Cargo.toml | 5 + README.md | 14 + examples/par_granges_example.rs | 12 +- src/commands/base_depth.rs | 390 +++++++++++++++++++++--- src/commands/only_depth.rs | 4 +- src/lib/mod.rs | 12 +- src/lib/position/mate_fix.rs | 451 ++++++++++++++++------------ src/lib/position/pileup_position.rs | 176 +++++++---- src/lib/read_filter.rs | 38 ++- 10 files changed, 999 insertions(+), 318 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f7d97cb..86f89e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,7 +123,7 @@ dependencies = [ "proc-macro2 1.0.103", "quote 1.0.42", "regex", - "rustc-hash", + "rustc-hash 1.1.0", "shlex", "syn 2.0.111", ] @@ -162,7 +162,7 @@ dependencies = [ "statrs", "strum 0.23.0", "strum_macros 0.26.4", - "thiserror 2.0.17", + "thiserror 2.0.18", "triple_accel", "vec_map", ] @@ -207,6 +207,16 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" +[[package]] +name = "borsh" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfd1e3f8955a5d7de9fab72fc8373fade9fb8a703968cb200ae3dc6cf08e185a" +dependencies = [ + "bytes", + "cfg_aliases", +] + [[package]] name = "bstr" version = "1.12.1" @@ -240,6 +250,26 @@ version = "0.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e" +[[package]] +name = "bytemuck" +version = "1.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8efb64bd706a16a1bdde310ae86b351e4d21550d98d056f22f8a7f7a2183fec" +dependencies = [ + "bytemuck_derive", +] + +[[package]] +name = "bytemuck_derive" +version = "1.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9abbd1bc6865053c427f7198e6af43bfdedc55ab791faed4fbd361d789575ff" +dependencies = [ + "proc-macro2 1.0.103", + "quote 1.0.42", + "syn 2.0.111", +] + [[package]] name = "byteorder" version = "1.5.0" @@ -252,6 +282,15 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +[[package]] +name = "bzip2" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a53fac24f34a81bc9954b5d6cfce0c21e18ec6959f44f56e8e90e4bb7c346c" +dependencies = [ + "libbz2-rs-sys", +] + [[package]] name = "cc" version = "1.2.49" @@ -279,6 +318,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "clang-sys" version = "1.8.1" @@ -564,9 +609,9 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flate2" -version = "1.1.5" +version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfe33edd8e85a12a67454e37f8c75e730830d83e313556ab9ebf9ee7fbeb3bfb" +checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c" dependencies = [ "crc32fast", "libz-ng-sys", @@ -752,7 +797,7 @@ dependencies = [ "libz-ng-sys", "log", "num_cpus", - "thiserror 2.0.17", + "thiserror 2.0.18", ] [[package]] @@ -841,7 +886,7 @@ dependencies = [ "icu_normalizer_data", "icu_properties", "icu_provider", - "smallvec", + "smallvec 1.15.1", "zerovec", ] @@ -893,7 +938,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" dependencies = [ "idna_adapter", - "smallvec", + "smallvec 1.15.1", "utf8_iter", ] @@ -1018,6 +1063,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "libbz2-rs-sys" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3a6a8c165077efc8f3a971534c50ea6a1a18b329ef4a66e897a7e3a1494565f" + [[package]] name = "libc" version = "0.2.178" @@ -1026,18 +1077,18 @@ checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" [[package]] name = "libdeflate-sys" -version = "1.25.0" +version = "1.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23bd6304ebf75390d8a99b88bdf2a266f62647838140cb64af8e6702f6e3fddc" +checksum = "72753e0008ea87963d2f0770042d0df7abe51fafbb8dcaf618ac440f2f1fec0a" dependencies = [ "cc", ] [[package]] name = "libdeflater" -version = "1.25.0" +version = "1.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5d4880e6d634d3d029d65fa016038e788cc728a17b782684726fb34ee140caf" +checksum = "d1ee41cf6fb1bb6030dfb59ffb7bc01ab26aade44142084c87f0fc7a1658fe71" dependencies = [ "libdeflate-sys", ] @@ -1120,6 +1171,17 @@ version = "0.11.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9106e1d747ffd48e6be5bb2d97fa706ed25b144fbee4d5c02eae110cd8d6badd" +[[package]] +name = "lzma-sys" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fda04ab3764e6cde78b9974eec4f779acaba7c4e84b36eca3cf77c581b85d27" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "matrixmultiply" version = "0.3.10" @@ -1130,6 +1192,12 @@ dependencies = [ "rawpointer", ] +[[package]] +name = "md5" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae960838283323069879657ca3de837e9f7bbb4c7bf6ea7f1b290d5e9476d2e0" + [[package]] name = "memchr" version = "2.7.6" @@ -1328,6 +1396,8 @@ dependencies = [ "rstest", "rust-htslib", "rust-lapper", + "seqair", + "seqair-types", "serde", "smartstring", "structopt", @@ -1711,7 +1781,7 @@ dependencies = [ "linear-map", "newtype_derive", "regex", - "thiserror 2.0.17", + "thiserror 2.0.18", "url", ] @@ -1730,6 +1800,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc-hash" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94300abf3f1ae2e2b8ffb7b58043de3d399c73fa6f4b73826402a5c457614dbe" + [[package]] name = "rustc_version" version = "0.1.7" @@ -1803,6 +1879,39 @@ version = "1.0.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" +[[package]] +name = "seqair" +version = "0.1.0" +source = "git+https://tangled.org/softleif.se/seqair.git?rev=28766a811596437f685f76aac753f963cde96f7f#28766a811596437f685f76aac753f963cde96f7f" +dependencies = [ + "bytemuck", + "bzip2", + "flate2", + "indexmap", + "itoa", + "libdeflater", + "md5", + "rustc-hash 2.1.2", + "ryu", + "seqair-types", + "thiserror 2.0.18", + "tracing", + "xz2", +] + +[[package]] +name = "seqair-types" +version = "0.1.0" +source = "git+https://tangled.org/softleif.se/seqair.git?rev=28766a811596437f685f76aac753f963cde96f7f#28766a811596437f685f76aac753f963cde96f7f" +dependencies = [ + "bytemuck", + "serde", + "smallvec 2.0.0-alpha.12", + "smol_str", + "thiserror 2.0.18", + "winnow 1.0.2", +] + [[package]] name = "serde" version = "1.0.228" @@ -1869,6 +1978,15 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "smallvec" +version = "2.0.0-alpha.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef784004ca8777809dcdad6ac37629f0a97caee4c685fcea805278d81dd8b857" +dependencies = [ + "serde_core", +] + [[package]] name = "smartstring" version = "1.0.1" @@ -1881,6 +1999,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "smol_str" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4aaa7368fcf4852a4c2dd92df0cace6a71f2091ca0a23391ce7f3a31833f1523" +dependencies = [ + "borsh", + "serde_core", +] + [[package]] name = "spin" version = "0.9.8" @@ -2071,11 +2199,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.17" +version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" +checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl 2.0.17", + "thiserror-impl 2.0.18", ] [[package]] @@ -2091,9 +2219,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.17" +version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" +checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2 1.0.103", "quote 1.0.42", @@ -2128,7 +2256,7 @@ dependencies = [ "indexmap", "toml_datetime", "toml_parser", - "winnow", + "winnow 0.7.14", ] [[package]] @@ -2137,7 +2265,38 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e" dependencies = [ - "winnow", + "winnow 0.7.14", +] + +[[package]] +name = "tracing" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2 1.0.103", + "quote 1.0.42", + "syn 2.0.111", +] + +[[package]] +name = "tracing-core" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" +dependencies = [ + "once_cell", ] [[package]] @@ -2351,6 +2510,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "winnow" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ee1708bef14716a11bae175f579062d4554d95be2c6829f518df847b7b3fdd0" +dependencies = [ + "memchr", +] + [[package]] name = "wit-bindgen" version = "0.46.0" @@ -2363,6 +2531,15 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +[[package]] +name = "xz2" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388c44dc09d76f1536602ead6d325eb532f5c122f17782bd57fb47baeeb767e2" +dependencies = [ + "lzma-sys", +] + [[package]] name = "yoke" version = "0.8.1" diff --git a/Cargo.toml b/Cargo.toml index f2d7f96..4861906 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,9 @@ path = "src/lib/mod.rs" name = "perbase" path = "src/main.rs" +[features] +seqair-pileup = ["dep:seqair", "dep:seqair-types"] + [dependencies] anyhow = "1.0.41" bio = "3.0.0" @@ -39,6 +42,8 @@ lru_time_cache = "0.11.1" num_cpus = "1.13.0" rayon = "1.4.0" rust-htslib = {version = "0.51", default-features = false, features = ["libdeflate"]} +seqair = { git = "https://tangled.org/softleif.se/seqair.git", rev = "28766a811596437f685f76aac753f963cde96f7f", optional = true } +seqair-types = { git = "https://tangled.org/softleif.se/seqair.git", rev = "28766a811596437f685f76aac753f963cde96f7f", optional = true } rust-lapper = "1.0" serde = { version = "1.0.116", features = ["derive"] } smartstring = { version = "1.0.1", features = ["serde"] } diff --git a/README.md b/README.md index c119e8a..74de650 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,17 @@ If the `--mate-fix` flag is passed, each position will first check if there are If the `--reference-fasta` is supplied, the `REF_BASE` field will be filled in. The reference must be indexed and match the BAM/CRAM header of the input. +#### Experimental seqair pileup backend + +`base-depth` can optionally be built with an experimental seqair-backed pileup engine: + +```bash +cargo build --release --features seqair-pileup +perbase base-depth --seqair-pileup ./test/test.bam +``` + +The htslib-backed path remains the default. The seqair path currently requires `--ref-fasta` for CRAM input. + The output can be compressed and indexed as follows: ```bash @@ -173,6 +184,9 @@ FLAGS: **NOTE** If this is set it could result in duplicate output entries for regions that overlap. **NOTE** This may cause issues with downstream tooling. + --seqair-pileup + Use the experimental seqair-backed pileup engine + -V, --version Prints version information diff --git a/examples/par_granges_example.rs b/examples/par_granges_example.rs index f73357f..5852420 100644 --- a/examples/par_granges_example.rs +++ b/examples/par_granges_example.rs @@ -3,9 +3,9 @@ use anyhow::Result; use perbase_lib::{ par_granges::{self, RegionProcessor}, position::pileup_position::PileupPosition, - read_filter::ReadFilter, + read_filter::{ReadFilter, ReadView}, }; -use rust_htslib::bam::{self, Read, pileup::Alignment, record::Record}; +use rust_htslib::bam::{self, Read}; use std::path::PathBuf; // To use ParGranges you will need to implement a [`RegionProcessor`](par_granges::RegionProcessor), @@ -32,11 +32,11 @@ struct BasicReadFilter { impl ReadFilter for BasicReadFilter { // Filter reads based SAM flags and mapping quality, true means pass #[inline] - fn filter_read(&self, read: &Record, _alignment: Option<&Alignment>) -> bool { + fn filter_read(&self, read: &R) -> bool { let flags = read.flags(); - (!flags) & &self.include_flags == 0 - && flags & &self.exclude_flags == 0 - && &read.mapq() >= &self.min_mapq + (!flags) & self.include_flags == 0 + && flags & self.exclude_flags == 0 + && read.mapq() >= self.min_mapq } } diff --git a/src/commands/base_depth.rs b/src/commands/base_depth.rs index 6715caf..37a9ea9 100644 --- a/src/commands/base_depth.rs +++ b/src/commands/base_depth.rs @@ -76,6 +76,11 @@ pub struct BaseDepth { #[structopt(long, short = "m")] mate_fix: bool, + /// Use the experimental seqair-backed pileup engine. + #[cfg(feature = "seqair-pileup")] + #[structopt(long = "seqair-pileup")] + seqair_pileup: bool, + /// If `mate_fix` is true, select the method to use for mate fixing. #[structopt(long, default_value = "original")] mate_resolution_strategy: MateResolutionStrategy, @@ -114,9 +119,67 @@ pub struct BaseDepth { max_depth: u32, } +#[cfg(feature = "seqair-pileup")] +fn path_looks_like_cram(reads: &std::path::Path) -> bool { + reads + .extension() + .and_then(|extension| extension.to_str()) + .is_some_and(|extension| extension.eq_ignore_ascii_case("cram")) +} + +#[cfg(feature = "seqair-pileup")] +enum SeqairReaderTemplate { + Indexed(seqair::reader::IndexedReader), + Readers(seqair::Readers), +} + +#[cfg(feature = "seqair-pileup")] +impl SeqairReaderTemplate { + fn open(reads: &std::path::Path, ref_fasta: Option<&std::path::Path>) -> Result { + if let Some(ref_fasta) = ref_fasta { + Ok(Self::Readers(seqair::Readers::open(reads, ref_fasta)?)) + } else { + Ok(Self::Indexed(seqair::reader::IndexedReader::open(reads)?)) + } + } + + fn fork(&self) -> Result { + match self { + Self::Indexed(reader) => Ok(Self::Indexed(reader.fork()?)), + Self::Readers(readers) => Ok(Self::Readers(readers.fork()?)), + } + } + + fn target_name(&self, tid: u32) -> Option<&str> { + match self { + Self::Indexed(reader) => reader.header().target_name(tid), + Self::Readers(readers) => readers.header().target_name(tid), + } + } + + fn fetch_into( + &mut self, + tid: u32, + start: seqair::bam::Pos0, + end: seqair::bam::Pos0, + store: &mut seqair::bam::RecordStore, + ) -> Result { + match self { + Self::Indexed(reader) => Ok(reader.fetch_into(tid, start, end, store)?), + Self::Readers(readers) => Ok(readers.fetch_into(tid, start, end, store)?), + } + } +} + impl BaseDepth { pub fn run(self) -> Result<()> { info!("Running base-depth on: {:?}", self.reads); + #[cfg(feature = "seqair-pileup")] + if self.seqair_pileup && self.ref_fasta.is_none() && path_looks_like_cram(&self.reads) { + anyhow::bail!( + "The experimental seqair pileup path requires --ref-fasta for CRAM input" + ); + } let cpus = utils::determine_allowed_cpus(self.threads)?; let mut writer = utils::get_writer( @@ -141,6 +204,8 @@ impl BaseDepth { self.ref_cache_size, self.min_base_quality_score, ); + #[cfg(feature = "seqair-pileup")] + let base_processor = base_processor.with_seqair_pileup(self.seqair_pileup)?; let par_granges_runner = par_granges::ParGranges::new( self.reads.clone(), @@ -183,12 +248,18 @@ struct BaseProcessor { coord_base: u32, /// implementation of [position::ReadFilter] that will be used read_filter: F, - /// max depth to pass to htslib pileup engine, max value is MAX(i32) + /// Max depth requested for pileup; the htslib engine is capped at `i32::MAX`. max_depth: u32, /// the cutoff at which we start logging warnings about depth being close to max depth max_depth_warnings_cutoff: u32, /// an optional base quality score. If Some(number) if the base quality is not >= that number the base is treated as an `N` min_base_quality_score: Option, + /// Use seqair's pileup engine instead of htslib's pileup engine. + #[cfg(feature = "seqair-pileup")] + seqair_pileup: bool, + /// Template seqair reader to fork per region, sharing parsed header/index. + #[cfg(feature = "seqair-pileup")] + seqair_reader_template: Option, } impl BaseProcessor { @@ -225,7 +296,138 @@ impl BaseProcessor { // Set cutoff to 1% of whatever max_depth is. max_depth_warnings_cutoff: max_depth - (max_depth as f64 * 0.01) as u32, min_base_quality_score, + #[cfg(feature = "seqair-pileup")] + seqair_pileup: false, + #[cfg(feature = "seqair-pileup")] + seqair_reader_template: None, + } + } + + #[cfg(feature = "seqair-pileup")] + fn with_seqair_pileup(mut self, seqair_pileup: bool) -> Result { + self.seqair_pileup = seqair_pileup; + if seqair_pileup { + self.seqair_reader_template = Some(SeqairReaderTemplate::open( + &self.reads, + self.ref_fasta.as_deref(), + )?); } + Ok(self) + } + + fn finalize_pileup_position(&self, pos: &mut PileupPosition, pileup_depth: u32) { + // Add the ref base if reference is available. `pos.pos` is still 0-based here. + if let Some(buffer) = &self.ref_buffer { + let seq = buffer + .seq(&pos.ref_seq) + .expect("Fetched reference sequence"); + pos.ref_base = Some(char::from( + *seq.get(pos.pos as usize) + .expect("Input SAM does not match reference"), + )); + } + if pileup_depth > self.max_depth_warnings_cutoff { + pos.near_max_depth = true; + } + pos.pos += self.coord_base; + } + + fn fill_zero_positions( + &self, + result: Vec, + ref_name: smartstring::alias::String, + start: u32, + stop: u32, + ) -> Vec { + if !self.keep_zeros { + return result; + } + + let mut new_result = vec![]; + let mut pos = start; + + for position in result.into_iter() { + while pos < (position.pos - self.coord_base) { + new_result.push(PileupPosition::new(ref_name.clone(), pos + self.coord_base)); + pos += 1; + } + new_result.push(position); + pos += 1; + } + while pos < stop { + new_result.push(PileupPosition::new(ref_name.clone(), pos + self.coord_base)); + pos += 1; + } + new_result + } + + #[cfg(feature = "seqair-pileup")] + fn process_region_seqair(&self, tid: u32, start: u32, stop: u32) -> Vec { + if start >= stop { + return Vec::new(); + } + + let start_pos = seqair::bam::Pos0::new(start).expect("seqair start position"); + // seqair uses inclusive end positions; perbase regions are half-open [start, stop). + let end_pos = seqair::bam::Pos0::new(stop - 1).expect("seqair end position"); + let mut store = seqair::bam::RecordStore::new(); + + let mut reader = self + .seqair_reader_template + .as_ref() + .expect("seqair reader template") + .fork() + .expect("forked seqair reader"); + let ref_name = reader + .target_name(tid) + .expect("seqair target name") + .to_owned(); + reader + .fetch_into(tid, start_pos, end_pos, &mut store) + .expect("seqair fetched a region"); + let ref_name = smartstring::alias::String::from(ref_name.as_str()); + + self.positions_from_seqair_store(ref_name, store, start_pos, end_pos, start, stop) + } + + #[cfg(feature = "seqair-pileup")] + fn positions_from_seqair_store( + &self, + ref_name: smartstring::alias::String, + store: seqair::bam::RecordStore, + start_pos: seqair::bam::Pos0, + end_pos: seqair::bam::Pos0, + start: u32, + stop: u32, + ) -> Vec { + let mut engine = seqair::bam::pileup::PileupEngine::new(store, start_pos, end_pos); + engine.set_max_depth(self.max_depth); + + let mut result = Vec::new(); + while let Some(column) = engine.pileups() { + let pileup_depth = u32::try_from(column.depth()).unwrap_or(u32::MAX); + let mut pos = if self.mate_fix { + PileupPosition::from_seqair_column_mate_aware( + ref_name.clone(), + &column, + &self.read_filter, + self.min_base_quality_score, + self.mate_fix_strategy, + ) + } else { + PileupPosition::from_seqair_column( + ref_name.clone(), + &column, + &self.read_filter, + self.min_base_quality_score, + ) + }; + + self.finalize_pileup_position(&mut pos, pileup_depth); + result.push(pos); + } + + self.fill_zero_positions(result, ref_name, start, stop) } } @@ -239,6 +441,11 @@ impl RegionProcessor for BaseProcessor { /// the defined filters fn process_region(&self, tid: u32, start: u32, stop: u32) -> Vec { trace!("Processing region {}(tid):{}-{}", tid, start, stop); + #[cfg(feature = "seqair-pileup")] + if self.seqair_pileup { + return self.process_region_seqair(tid, start, stop); + } + // Create a reader let mut reader = bam::IndexedReader::from_path(&self.reads).expect("Indexed Reader for region"); @@ -276,20 +483,7 @@ impl RegionProcessor for BaseProcessor { self.min_base_quality_score, ) }; - // Add the ref base if reference is available - if let Some(buffer) = &self.ref_buffer { - let seq = buffer - .seq(&pos.ref_seq) - .expect("Fetched reference sequence"); - pos.ref_base = Some(char::from( - *seq.get(pos.pos as usize) - .expect("Input SAM does not match reference"), - )); - } - if pileup_depth > (self.max_depth_warnings_cutoff) { - pos.near_max_depth = true; - } - pos.pos += self.coord_base; + self.finalize_pileup_position(&mut pos, pileup_depth); Some(pos) } else { None @@ -297,27 +491,8 @@ impl RegionProcessor for BaseProcessor { }) .collect(); - if self.keep_zeros { - let mut new_result = vec![]; - let name = PileupPosition::compact_refseq(&header, tid); - let mut pos = start; - - for position in result.into_iter() { - while pos < (position.pos - self.coord_base) { - new_result.push(PileupPosition::new(name.clone(), pos + self.coord_base)); - pos += 1; - } - new_result.push(position); - pos += 1; - } - while pos < stop { - new_result.push(PileupPosition::new(name.clone(), pos + self.coord_base)); - pos += 1; - } - new_result - } else { - result - } + let name = PileupPosition::compact_refseq(&header, tid); + self.fill_zero_positions(result, name, start, stop) } } @@ -1002,4 +1177,147 @@ mod tests { assert_eq!(positions.get("chr2").unwrap()[43].n, if !base_qual_filtered_all { 0 } else { 2 - awareness_modifier }); // mate overlap assert_eq!(positions.get("chr2").unwrap()[44].n, if !base_qual_filtered_all { 0 } else { 1 }); } + + #[cfg(feature = "seqair-pileup")] + #[allow(clippy::too_many_arguments)] + fn collect_region_positions( + bam_path: PathBuf, + seqair_pileup: bool, + mate_fix: bool, + keep_zeros: bool, + coord_base: u32, + base_quality: Option, + mate_fix_strategy: MateResolutionStrategy, + ) -> Vec { + let read_filter = DefaultReadFilter::new(0, 512, 0); + let base_processor = BaseProcessor::new( + bam_path, + None, + mate_fix, + mate_fix_strategy, + keep_zeros, + coord_base, + read_filter, + 500_000, + 8, + base_quality, + ) + .with_seqair_pileup(seqair_pileup) + .unwrap(); + + let mut positions = Vec::new(); + positions.extend(base_processor.process_region(0, 0, 100)); + positions.extend(base_processor.process_region(1, 0, 100)); + positions + } + + #[cfg(feature = "seqair-pileup")] + #[rstest( + mate_fix, + keep_zeros, + coord_base, + base_quality, + mate_fix_strategy, + case::default(false, false, 1, None, MateResolutionStrategy::Original), + case::mate_aware(true, false, 1, None, MateResolutionStrategy::Original), + case::keep_zeros(false, true, 1, None, MateResolutionStrategy::Original), + case::mate_aware_keep_zeros(true, true, 1, None, MateResolutionStrategy::Original), + case::zero_based(false, false, 0, None, MateResolutionStrategy::Original), + case::base_qual_1(false, false, 1, Some(1), MateResolutionStrategy::Original), + case::base_qual_3(false, false, 1, Some(3), MateResolutionStrategy::Original), + case::mate_aware_base_qual_3(true, false, 1, Some(3), MateResolutionStrategy::Original), + case::mate_aware_iupac(true, false, 1, None, MateResolutionStrategy::IUPAC), + case::mate_aware_n(true, false, 1, None, MateResolutionStrategy::N), + case::mate_aware_baseq_mapq_iupac( + true, + false, + 1, + None, + MateResolutionStrategy::BaseQualMapQualIUPAC + ), + case::mate_aware_mapq_baseq_iupac( + true, + false, + 1, + None, + MateResolutionStrategy::MapQualBaseQualIUPAC + ) + )] + fn seqair_matches_htslib_process_region( + bamfile: (PathBuf, TempDir), + mate_fix: bool, + keep_zeros: bool, + coord_base: u32, + base_quality: Option, + mate_fix_strategy: MateResolutionStrategy, + ) { + let htslib_positions = collect_region_positions( + bamfile.0.clone(), + false, + mate_fix, + keep_zeros, + coord_base, + base_quality, + mate_fix_strategy, + ); + let seqair_positions = collect_region_positions( + bamfile.0.clone(), + true, + mate_fix, + keep_zeros, + coord_base, + base_quality, + mate_fix_strategy, + ); + + assert_eq!(htslib_positions, seqair_positions); + } + + #[cfg(feature = "seqair-pileup")] + fn base_depth_for_test(reads: PathBuf, output: PathBuf, seqair_pileup: bool) -> BaseDepth { + BaseDepth { + reads, + ref_fasta: None, + bed_file: None, + bcf_file: None, + output: Some(output), + bgzip: false, + threads: 4, + compression_threads: 1, + compression_level: 2, + chunksize: 100, + channel_size_modifier: 0.001, + include_flags: 0, + exclude_flags: 512, + mate_fix: true, + seqair_pileup, + mate_resolution_strategy: MateResolutionStrategy::Original, + keep_zeros: false, + skip_merging_intervals: false, + min_mapq: 0, + min_base_quality_score: None, + zero_base: false, + ref_cache_size: 8, + max_depth: 500_000, + } + } + + #[cfg(feature = "seqair-pileup")] + #[test] + fn seqair_base_depth_run_matches_htslib_run() { + let (bam_path, tempdir) = bamfile(); + let htslib_output = tempdir.path().join("htslib.tsv"); + let seqair_output = tempdir.path().join("seqair.tsv"); + + base_depth_for_test(bam_path.clone(), htslib_output.clone(), false) + .run() + .unwrap(); + base_depth_for_test(bam_path, seqair_output.clone(), true) + .run() + .unwrap(); + + let htslib_output = std::fs::read_to_string(htslib_output).unwrap(); + let seqair_output = std::fs::read_to_string(seqair_output).unwrap(); + assert_eq!(htslib_output, seqair_output); + } } diff --git a/src/commands/only_depth.rs b/src/commands/only_depth.rs index 1033a42..ef4c4a6 100644 --- a/src/commands/only_depth.rs +++ b/src/commands/only_depth.rs @@ -308,7 +308,7 @@ impl OnlyDepthProcessor { for record in reader .rc_records() .map(|r| r.expect("Read record")) - .filter(|read| self.read_filter.filter_read(read, None)) + .filter(|read| self.read_filter.filter_read(read)) .flat_map(|record| IterAlignedBlocks::new(record, self.mate_fix)) { let rec_start = u32::try_from(record.0).expect("check overflow"); @@ -397,7 +397,7 @@ impl OnlyDepthProcessor { for record in reader .rc_records() .map(|r| r.expect("Read record")) - .filter(|read| self.read_filter.filter_read(read, None)) + .filter(|read| self.read_filter.filter_read(read)) { let rec_start = u32::try_from(record.reference_start()).expect("check overflow"); let rec_stop = u32::try_from(record.reference_end()).expect("check overflow"); diff --git a/src/lib/mod.rs b/src/lib/mod.rs index ee10c97..1c3c343 100644 --- a/src/lib/mod.rs +++ b/src/lib/mod.rs @@ -15,9 +15,9 @@ //! use perbase_lib::{ //! par_granges::{self, RegionProcessor}, //! position::pileup_position::PileupPosition, -//! read_filter::ReadFilter, +//! read_filter::{ReadFilter, ReadView}, //! }; -//! use rust_htslib::bam::{self, record::Record, Read, pileup::Alignment}; +//! use rust_htslib::bam::{self, Read}; //! use std::path::PathBuf; //! //! // To use ParGranges you will need to implement a [`RegionProcessor`](par_granges::RegionProcessor), @@ -44,11 +44,11 @@ //! impl ReadFilter for BasicReadFilter { //! // Filter reads based SAM flags and mapping quality, true means pass //! #[inline] -//! fn filter_read(&self, read: &Record, _: Option<&Alignment>) -> bool { +//! fn filter_read(&self, read: &R) -> bool { //! let flags = read.flags(); -//! (!flags) & &self.include_flags == 0 -//! && flags & &self.exclude_flags == 0 -//! && &read.mapq() >= &self.min_mapq +//! (!flags) & self.include_flags == 0 +//! && flags & self.exclude_flags == 0 +//! && read.mapq() >= self.min_mapq //! } //! } //! diff --git a/src/lib/position/mate_fix.rs b/src/lib/position/mate_fix.rs index a7902cf..6bd8dcb 100644 --- a/src/lib/position/mate_fix.rs +++ b/src/lib/position/mate_fix.rs @@ -38,7 +38,7 @@ //! //! ## MateResolutionStrategy::Original //! MAPQ -> first in pair -use crate::read_filter::ReadFilter; +use crate::read_filter::{ReadFilter, ReadView}; use rust_htslib::bam::{pileup::Alignment, record::Record}; use strum::EnumString; @@ -218,16 +218,158 @@ pub enum MateResolutionStrategy { Original, } +/// Pileup-level read data needed by base counting and mate resolution. +pub(crate) trait PileupReadView: ReadView { + /// Query name for grouping mates. + fn qname(&self) -> &[u8]; + + /// Query position for this pileup observation, if it has one. + fn qpos(&self) -> Option; + + /// Base at this pileup observation, if available. + fn base(&self) -> Option; + + /// Base quality at this pileup observation, if available. + fn base_qual(&self) -> Option; + + /// Whether this observation is a reference skip. + fn is_refskip(&self) -> bool; + + /// Whether this observation is a deletion or reference skip. + fn is_del(&self) -> bool; + + /// Whether this observation has an insertion immediately after it. + fn has_insertion(&self) -> bool; +} + +impl<'a> ReadView for (Alignment<'a>, Record) { + #[inline(always)] + fn flags(&self) -> u16 { + self.1.flags() + } + + #[inline(always)] + fn mapq(&self) -> u8 { + self.1.mapq() + } +} + +impl<'a> PileupReadView for (Alignment<'a>, Record) { + #[inline(always)] + fn qname(&self) -> &[u8] { + self.1.qname() + } + + #[inline(always)] + fn qpos(&self) -> Option { + self.0.qpos() + } + + #[inline(always)] + fn base(&self) -> Option { + let qpos = self.qpos()?; + let seq = self.1.seq(); + if qpos >= seq.len() { + return None; + } + Some(Base::from(seq[qpos] as char)) + } + + #[inline(always)] + fn base_qual(&self) -> Option { + let qpos = self.qpos()?; + self.1.qual().get(qpos).copied() + } + + #[inline(always)] + fn is_refskip(&self) -> bool { + self.0.is_refskip() + } + + #[inline(always)] + fn is_del(&self) -> bool { + self.0.is_del() + } + + #[inline(always)] + fn has_insertion(&self) -> bool { + matches!(self.0.indel(), rust_htslib::bam::pileup::Indel::Ins(_)) + } +} + +#[cfg(feature = "seqair-pileup")] +impl<'a, 'store, U> ReadView for seqair::bam::pileup::AlignmentView<'a, 'store, U> { + #[inline(always)] + fn flags(&self) -> u16 { + self.flags.raw() + } + + #[inline(always)] + fn mapq(&self) -> u8 { + self.mapq + } +} + +#[cfg(feature = "seqair-pileup")] +impl<'a, 'store, U> PileupReadView for seqair::bam::pileup::AlignmentView<'a, 'store, U> { + #[inline(always)] + fn qname(&self) -> &[u8] { + seqair::bam::pileup::AlignmentView::qname(self) + } + + #[inline(always)] + fn qpos(&self) -> Option { + seqair::bam::pileup::PileupAlignment::qpos(self) + } + + #[inline(always)] + fn base(&self) -> Option { + seqair::bam::pileup::PileupAlignment::base(self).map(seqair_base_to_base) + } + + #[inline(always)] + fn base_qual(&self) -> Option { + seqair::bam::pileup::PileupAlignment::qual(self).and_then(|qual| qual.get()) + } + + #[inline(always)] + fn is_refskip(&self) -> bool { + seqair::bam::pileup::PileupAlignment::is_refskip(self) + } + + #[inline(always)] + fn is_del(&self) -> bool { + seqair::bam::pileup::PileupAlignment::is_del(self) + } + + #[inline(always)] + fn has_insertion(&self) -> bool { + seqair::bam::pileup::PileupAlignment::insert_len(self) > 0 + } +} + +#[cfg(feature = "seqair-pileup")] +#[inline(always)] +fn seqair_base_to_base(base: seqair_types::Base) -> Base { + match base { + seqair_types::Base::A => Base::A, + seqair_types::Base::C => Base::C, + seqair_types::Base::G => Base::G, + seqair_types::Base::T => Base::T, + seqair_types::Base::Unknown => Base::N, + } +} + impl MateResolutionStrategy { - pub(crate) fn cmp( - &self, - a: &(Alignment<'_>, Record), - b: &(Alignment<'_>, Record), - read_filter: &F, - ) -> MateResolution { + pub(crate) fn cmp(&self, a: &A, b: &B, read_filter: &F) -> MateResolution + where + A: PileupReadView + ?Sized, + B: PileupReadView + ?Sized, + F: ReadFilter, + { // Handle user-set filters. - let a_pass = read_filter.filter_read(&a.1, Some(&a.0)); - let b_pass = read_filter.filter_read(&b.1, Some(&b.0)); + let a_pass = read_filter.filter_read(a); + let b_pass = read_filter.filter_read(b); if a_pass && !b_pass { return MateResolution::new(Ordering::Greater, None); } else if b_pass && !a_pass { @@ -247,109 +389,80 @@ impl MateResolutionStrategy { } MateResolutionStrategy::MapQualBaseQualIUPAC => Self::map_qual_base_qual_iupac(a, b), MateResolutionStrategy::MapQualBaseQualN => Self::map_qual_base_qual_n(a, b), - MateResolutionStrategy::IUPAC => Self::resolve_base::(a, b), - MateResolutionStrategy::N => Self::resolve_base::(a, b), + MateResolutionStrategy::IUPAC => Self::resolve_base::(a, b), + MateResolutionStrategy::N => Self::resolve_base::(a, b), MateResolutionStrategy::Original => Self::original(a, b), } } - pub(crate) fn resolve_base( - a: &(Alignment<'_>, Record), - b: &(Alignment<'_>, Record), - ) -> MateResolution { - // First check that we have a base - let a_is_not_base = a.0.qpos().is_none(); - let b_is_not_base = b.0.qpos().is_none(); - if a_is_not_base || b_is_not_base { + pub(crate) fn resolve_base(a: &A, b: &B) -> MateResolution + where + A: PileupReadView + ?Sized, + B: PileupReadView + ?Sized, + { + let Some(a_base) = a.base() else { return Self::original(a, b); - } + }; + let Some(b_base) = b.base() else { + return Self::original(a, b); + }; - let a_base = Base::from(a.1.seq()[a.0.qpos().unwrap()] as char); - let b_base = Base::from(b.1.seq()[b.0.qpos().unwrap()] as char); MateResolution::new( Ordering::Greater, Some(Base::either_or::(a_base, b_base)), ) } - pub(crate) fn base_qual_map_qual_n( - a: &(Alignment<'_>, Record), - b: &(Alignment<'_>, Record), - ) -> MateResolution { - // First check that we have a base - let a_is_not_base = a.0.qpos().is_none(); - let b_is_not_base = b.0.qpos().is_none(); - if a_is_not_base || b_is_not_base { - return Self::original(a, b); - } - - // compare baseq -> mapq -> default to first in pair - let a_qual = a.1.qual()[a.0.qpos().unwrap()]; - let b_qual = b.1.qual()[b.0.qpos().unwrap()]; + pub(crate) fn base_qual_map_qual_n(a: &A, b: &B) -> MateResolution + where + A: PileupReadView + ?Sized, + B: PileupReadView + ?Sized, + { + Self::base_qual_map_qual::(a, b) + } - match a_qual.cmp(&b_qual) { - Ordering::Greater => MateResolution::new(Ordering::Greater, None), - Ordering::Less => MateResolution::new(Ordering::Less, None), - Ordering::Equal => match a.1.mapq().cmp(&b.1.mapq()) { - Ordering::Greater => MateResolution::new(Ordering::Greater, None), - Ordering::Less => MateResolution::new(Ordering::Less, None), - Ordering::Equal => { - let a_base = Base::from(a.1.seq()[a.0.qpos().unwrap()] as char); - let b_base = Base::from(b.1.seq()[b.0.qpos().unwrap()] as char); - let base = Base::either_or::(a_base, b_base); - MateResolution::new(Ordering::Greater, Some(base)) // how to pass this back - } - }, - } + pub(crate) fn base_qual_map_qual_iupac(a: &A, b: &B) -> MateResolution + where + A: PileupReadView + ?Sized, + B: PileupReadView + ?Sized, + { + Self::base_qual_map_qual::(a, b) } - pub(crate) fn base_qual_map_qual_iupac( - a: &(Alignment<'_>, Record), - b: &(Alignment<'_>, Record), - ) -> MateResolution { - // First check that we have a base - let a_is_not_base = a.0.qpos().is_none(); - let b_is_not_base = b.0.qpos().is_none(); - if a_is_not_base || b_is_not_base { + fn base_qual_map_qual(a: &A, b: &B) -> MateResolution + where + A: PileupReadView + ?Sized, + B: PileupReadView + ?Sized, + { + let Some(a_qual) = a.base_qual() else { return Self::original(a, b); - } - - // compare baseq -> mapq -> default to first in pair - let a_qual = a.1.qual()[a.0.qpos().unwrap()]; - let b_qual = b.1.qual()[b.0.qpos().unwrap()]; + }; + let Some(b_qual) = b.base_qual() else { + return Self::original(a, b); + }; match a_qual.cmp(&b_qual) { Ordering::Greater => MateResolution::new(Ordering::Greater, None), Ordering::Less => MateResolution::new(Ordering::Less, None), - Ordering::Equal => match a.1.mapq().cmp(&b.1.mapq()) { + Ordering::Equal => match a.mapq().cmp(&b.mapq()) { Ordering::Greater => MateResolution::new(Ordering::Greater, None), Ordering::Less => MateResolution::new(Ordering::Less, None), - Ordering::Equal => { - let a_base = Base::from(a.1.seq()[a.0.qpos().unwrap()] as char); - let b_base = Base::from(b.1.seq()[b.0.qpos().unwrap()] as char); - MateResolution::new( - Ordering::Greater, - Some(Base::either_or::(a_base, b_base)), - ) - } + Ordering::Equal => Self::resolve_base::(a, b), }, } } - pub(crate) fn base_qual_map_qual_first_in_pair( - a: &(Alignment<'_>, Record), - b: &(Alignment<'_>, Record), - ) -> MateResolution { - // First check that we have a base - let a_is_not_base = a.0.qpos().is_none(); - let b_is_not_base = b.0.qpos().is_none(); - if a_is_not_base || b_is_not_base { + pub(crate) fn base_qual_map_qual_first_in_pair(a: &A, b: &B) -> MateResolution + where + A: PileupReadView + ?Sized, + B: PileupReadView + ?Sized, + { + let Some(a_qual) = a.base_qual() else { return Self::original(a, b); - } - - // compare baseq -> mapq -> default to first in pair - let a_qual = a.1.qual()[a.0.qpos().unwrap()]; - let b_qual = b.1.qual()[b.0.qpos().unwrap()]; + }; + let Some(b_qual) = b.base_qual() else { + return Self::original(a, b); + }; match a_qual.cmp(&b_qual) { Ordering::Greater => MateResolution::new(Ordering::Greater, None), @@ -358,127 +471,93 @@ impl MateResolutionStrategy { } } - pub(crate) fn map_qual_base_qual_n( - a: &(Alignment<'_>, Record), - b: &(Alignment<'_>, Record), - ) -> MateResolution { - // First check that we have a base - let a_is_not_base = a.0.qpos().is_none(); - let b_is_not_base = b.0.qpos().is_none(); - if a_is_not_base || b_is_not_base { - return Self::original(a, b); - } + pub(crate) fn map_qual_base_qual_n(a: &A, b: &B) -> MateResolution + where + A: PileupReadView + ?Sized, + B: PileupReadView + ?Sized, + { + Self::map_qual_base_qual::(a, b) + } - match a.1.mapq().cmp(&b.1.mapq()) { - Ordering::Greater => MateResolution::new(Ordering::Greater, None), - Ordering::Less => MateResolution::new(Ordering::Less, None), - Ordering::Equal => { - let a_qual = a.1.qual()[a.0.qpos().unwrap()]; - let b_qual = b.1.qual()[b.0.qpos().unwrap()]; - - match a_qual.cmp(&b_qual) { - Ordering::Greater => MateResolution::new(Ordering::Greater, None), - Ordering::Less => MateResolution::new(Ordering::Less, None), - Ordering::Equal => { - let a_base = Base::from(a.1.seq()[a.0.qpos().unwrap()] as char); - let b_base = Base::from(b.1.seq()[b.0.qpos().unwrap()] as char); - let base = Base::either_or::(a_base, b_base); - MateResolution::new(Ordering::Greater, Some(base)) // how to pass this back - } - } - } - } + pub(crate) fn map_qual_base_qual_iupac(a: &A, b: &B) -> MateResolution + where + A: PileupReadView + ?Sized, + B: PileupReadView + ?Sized, + { + Self::map_qual_base_qual::(a, b) } - pub(crate) fn map_qual_base_qual_iupac( - a: &(Alignment<'_>, Record), - b: &(Alignment<'_>, Record), - ) -> MateResolution { - // First check that we have a base - let a_is_not_base = a.0.qpos().is_none(); - let b_is_not_base = b.0.qpos().is_none(); - if a_is_not_base || b_is_not_base { + fn map_qual_base_qual(a: &A, b: &B) -> MateResolution + where + A: PileupReadView + ?Sized, + B: PileupReadView + ?Sized, + { + let Some(a_qual) = a.base_qual() else { return Self::original(a, b); - } + }; + let Some(b_qual) = b.base_qual() else { + return Self::original(a, b); + }; - match a.1.mapq().cmp(&b.1.mapq()) { + match a.mapq().cmp(&b.mapq()) { Ordering::Greater => MateResolution::new(Ordering::Greater, None), Ordering::Less => MateResolution::new(Ordering::Less, None), - Ordering::Equal => { - let a_qual = a.1.qual()[a.0.qpos().unwrap()]; - let b_qual = b.1.qual()[b.0.qpos().unwrap()]; - - match a_qual.cmp(&b_qual) { - Ordering::Greater => MateResolution::new(Ordering::Greater, None), - Ordering::Less => MateResolution::new(Ordering::Less, None), - Ordering::Equal => { - let a_base = Base::from(a.1.seq()[a.0.qpos().unwrap()] as char); - let b_base = Base::from(b.1.seq()[b.0.qpos().unwrap()] as char); - MateResolution::new( - Ordering::Greater, - Some(Base::either_or::(a_base, b_base)), - ) - } - } - } + Ordering::Equal => match a_qual.cmp(&b_qual) { + Ordering::Greater => MateResolution::new(Ordering::Greater, None), + Ordering::Less => MateResolution::new(Ordering::Less, None), + Ordering::Equal => Self::resolve_base::(a, b), + }, } } - pub(crate) fn map_qual_base_qual_first_in_pair( - a: &(Alignment<'_>, Record), - b: &(Alignment<'_>, Record), - ) -> MateResolution { - // First check that we have a base - let a_is_not_base = a.0.qpos().is_none(); - let b_is_not_base = b.0.qpos().is_none(); - if a_is_not_base || b_is_not_base { + pub(crate) fn map_qual_base_qual_first_in_pair(a: &A, b: &B) -> MateResolution + where + A: PileupReadView + ?Sized, + B: PileupReadView + ?Sized, + { + let Some(a_qual) = a.base_qual() else { return Self::original(a, b); - } + }; + let Some(b_qual) = b.base_qual() else { + return Self::original(a, b); + }; - match a.1.mapq().cmp(&b.1.mapq()) { + match a.mapq().cmp(&b.mapq()) { Ordering::Greater => MateResolution::new(Ordering::Greater, None), Ordering::Less => MateResolution::new(Ordering::Less, None), - Ordering::Equal => { - let a_qual = a.1.qual()[a.0.qpos().unwrap()]; - let b_qual = b.1.qual()[b.0.qpos().unwrap()]; - - match a_qual.cmp(&b_qual) { - Ordering::Greater => MateResolution::new(Ordering::Greater, None), - Ordering::Less => MateResolution::new(Ordering::Less, None), - Ordering::Equal => { - if a.1.flags() & 64 != 0 { - MateResolution::new(Ordering::Greater, None) - } else if b.1.flags() & 64 != 0 { - MateResolution::new(Ordering::Less, None) - } else { - // Default to `a` in the event that there is no first in pair for some reason - MateResolution::new(Ordering::Greater, None) - } - } - } - } + Ordering::Equal => match a_qual.cmp(&b_qual) { + Ordering::Greater => MateResolution::new(Ordering::Greater, None), + Ordering::Less => MateResolution::new(Ordering::Less, None), + Ordering::Equal => Self::first_in_pair(a, b), + }, } } /// Whichever has higher MAPQ, or if equal, whichever is first in pair - pub(crate) fn original( - a: &(Alignment<'_>, Record), - b: &(Alignment<'_>, Record), - ) -> MateResolution { - match a.1.mapq().cmp(&b.1.mapq()) { + pub(crate) fn original(a: &A, b: &B) -> MateResolution + where + A: PileupReadView + ?Sized, + B: PileupReadView + ?Sized, + { + match a.mapq().cmp(&b.mapq()) { Ordering::Greater => MateResolution::new(Ordering::Greater, None), Ordering::Less => MateResolution::new(Ordering::Less, None), - Ordering::Equal => { - // Check if a is first in pair - if a.1.flags() & 64 != 0 { - MateResolution::new(Ordering::Greater, None) - } else if b.1.flags() & 64 != 0 { - MateResolution::new(Ordering::Less, None) - } else { - // Default to `a` in the event that there is no first in pair for some reason - MateResolution::new(Ordering::Greater, None) - } - } + Ordering::Equal => Self::first_in_pair(a, b), + } + } + + fn first_in_pair(a: &A, b: &B) -> MateResolution + where + A: PileupReadView + ?Sized, + B: PileupReadView + ?Sized, + { + if a.flags() & 64 != 0 { + MateResolution::new(Ordering::Greater, None) + } else if b.flags() & 64 != 0 { + MateResolution::new(Ordering::Less, None) + } else { + // Default to `a` in the event that there is no first in pair for some reason + MateResolution::new(Ordering::Greater, None) } } } diff --git a/src/lib/position/pileup_position.rs b/src/lib/position/pileup_position.rs index a7476c6..40c036f 100644 --- a/src/lib/position/pileup_position.rs +++ b/src/lib/position/pileup_position.rs @@ -2,21 +2,17 @@ use crate::position::Position; use crate::read_filter::ReadFilter; use itertools::Itertools; -use rust_htslib::bam::{ - self, HeaderView, - pileup::{Alignment, Pileup}, - record::Record, -}; +use rust_htslib::bam::{self, HeaderView, pileup::Pileup}; use serde::Serialize; use smartstring::{LazyCompact, SmartString, alias::String}; use std::default; -use super::mate_fix::{Base, MateResolutionStrategy}; +use super::mate_fix::{Base, MateResolutionStrategy, PileupReadView}; /// Hold all information about a position. // NB: The max depth that htslib will return is i32::MAX, and the type of pos for htlib is u32 // There is no reason to go bigger, for now at least -#[derive(Debug, Serialize, Default)] +#[derive(Debug, Serialize, Default, PartialEq)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] pub struct PileupPosition { /// Reference sequence name. @@ -78,28 +74,30 @@ impl Position for PileupPosition { } impl PileupPosition { - /// Given a record, update the counts at this position + /// Given a pileup read observation, update the counts at this position #[inline(always)] - fn update( + fn update( &mut self, - alignment: &Alignment, - record: &Record, + read: &R, read_filter: &F, base_filter: Option, recommended_base: Option, mates_resolved: bool, - ) { - if !read_filter.filter_read(record, Some(alignment)) { + ) where + R: PileupReadView + ?Sized, + F: ReadFilter, + { + if !read_filter.filter_read(read) { self.depth -= 1; self.fail += 1; return; } // NB: Order matters here, a refskip is true for both is_del and is_refskip // while a true del is only true for is_del - if alignment.is_refskip() { + if read.is_refskip() { self.ref_skip += 1; self.depth -= 1; - } else if alignment.is_del() { + } else if read.is_del() { self.del += 1; } else { // We have an actual base! @@ -107,37 +105,18 @@ impl PileupPosition { // Check if we are checking the base quality score // && Check if the base quality score is greater or equal to than the cutoff if let Some(base_qual_filter) = base_filter - && (record.seq().is_empty() - || record.qual()[alignment.qpos().unwrap()] < base_qual_filter) + && read.base_qual().is_none_or(|qual| qual < base_qual_filter) { self.n += 1 } else if let Some(b) = recommended_base { - match b { - Base::A => self.a += 1, - Base::C => self.c += 1, - Base::T => self.t += 1, - Base::G => self.g += 1, - Base::R => self.r += 1, - Base::Y => self.y += 1, - Base::S => self.s += 1, - Base::W => self.w += 1, - Base::K => self.k += 1, - Base::M => self.m += 1, - _ => self.n += 1, - } - } else if record.seq().is_empty() { - self.n += 1 + self.add_base(b); + } else if let Some(base) = read.base() { + self.add_base(base); } else { - match (record.seq()[alignment.qpos().unwrap()] as char).to_ascii_uppercase() { - 'A' => self.a += 1, - 'C' => self.c += 1, - 'T' | 'U' => self.t += 1, - 'G' => self.g += 1, - _ => self.n += 1, - } + self.n += 1; } // Check for insertions - if let bam::pileup::Indel::Ins(_len) = alignment.indel() { + if read.has_insertion() { self.ins += 1; } } @@ -147,6 +126,23 @@ impl PileupPosition { } } + #[inline(always)] + fn add_base(&mut self, base: Base) { + match base { + Base::A => self.a += 1, + Base::C => self.c += 1, + Base::T => self.t += 1, + Base::G => self.g += 1, + Base::R => self.r += 1, + Base::Y => self.y += 1, + Base::S => self.s += 1, + Base::W => self.w += 1, + Base::K => self.k += 1, + Base::M => self.m += 1, + _ => self.n += 1, + } + } + /// Convert a pileup into a `Position`. /// /// This will walk over each of the alignments and count the number each nucleotide it finds. @@ -172,15 +168,8 @@ impl PileupPosition { for alignment in pileup.alignments() { let record = alignment.record(); - Self::update( - &mut pos, - &alignment, - &record, - read_filter, - base_filter, - None, - false, - ); + let read = (alignment, record); + Self::update(&mut pos, &read, read_filter, base_filter, None, false); } pos } @@ -221,9 +210,9 @@ impl PileupPosition { let record = aln.record(); (aln, record) }) - .sorted_by(|a, b| Ord::cmp(a.1.qname(), b.1.qname())) + .sorted_by(|a, b| Ord::cmp(a.qname(), b.qname())) // TODO: I'm not sure there is a good way to remove this allocation - .chunk_by(|a| a.1.qname().to_owned()); + .chunk_by(|a| a.qname().to_owned()); for (_qname, reads) in grouped_by_qname.into_iter() { let mut total_reads = 0; // count how many reads there were @@ -231,32 +220,89 @@ impl PileupPosition { let mut reads = reads .inspect(|_| total_reads += 1) .sorted_by(|a, b| mate_fix_strat.cmp(a, b, read_filter).ordering.reverse()); - let best = &reads.next().unwrap(); + let best = reads.next().unwrap(); - if let Some(second_best) = &reads.next().as_ref() { + if let Some(second_best) = reads.next() { // Deal explicitly with mate overlap, they are already ordered correctly - let result = mate_fix_strat.cmp(best, second_best, read_filter); + let result = mate_fix_strat.cmp(&best, &second_best, read_filter); pos.depth -= total_reads - 1; Self::update( &mut pos, - &best.0, - &best.1, + &best, read_filter, base_filter, result.recommended_base, true, ); } else { + pos.depth -= total_reads - 1; + Self::update(&mut pos, &best, read_filter, base_filter, None, false); + } + } + pos + } + + /// Convert a seqair pileup column into a `Position`. + #[cfg(feature = "seqair-pileup")] + #[inline] + pub fn from_seqair_column( + ref_seq: String, + column: &seqair::bam::pileup::PileupColumn<'_, U>, + read_filter: &F, + base_filter: Option, + ) -> Self { + let mut pos = Self::new(ref_seq, *column.pos()); + pos.depth = u32::try_from(column.depth()).unwrap_or(u32::MAX); + + for alignment in column.alignments() { + Self::update(&mut pos, &alignment, read_filter, base_filter, None, false); + } + pos + } + + /// Convert a seqair pileup column into a mate-aware `Position`. + #[cfg(feature = "seqair-pileup")] + #[inline] + pub fn from_seqair_column_mate_aware( + ref_seq: String, + column: &seqair::bam::pileup::PileupColumn<'_, U>, + read_filter: &F, + base_filter: Option, + mate_fix_strat: MateResolutionStrategy, + ) -> Self { + let mut pos = Self::new(ref_seq, *column.pos()); + pos.depth = u32::try_from(column.depth()).unwrap_or(u32::MAX); + + // Group records by qname + let grouped_by_qname = column + .alignments() + .sorted_by(|a, b| Ord::cmp(a.qname(), b.qname())) + // TODO: I'm not sure there is a good way to remove this allocation + .chunk_by(|a| a.qname().to_owned()); + + for (_qname, reads) in grouped_by_qname.into_iter() { + let mut total_reads = 0; // count how many reads there were + + let mut reads = reads + .inspect(|_| total_reads += 1) + .sorted_by(|a, b| mate_fix_strat.cmp(a, b, read_filter).ordering.reverse()); + let best = reads.next().unwrap(); + + if let Some(second_best) = reads.next() { + // Deal explicitly with mate overlap, they are already ordered correctly + let result = mate_fix_strat.cmp(&best, &second_best, read_filter); pos.depth -= total_reads - 1; Self::update( &mut pos, - &best.0, - &best.1, + &best, read_filter, base_filter, - None, - false, + result.recommended_base, + true, ); + } else { + pos.depth -= total_reads - 1; + Self::update(&mut pos, &best, read_filter, base_filter, None, false); } } pos @@ -822,4 +868,14 @@ mod tests { } } } + + /// TODO: enable once seqair yields pileup observations for empty SEQ (`*`) reads. + /// Current htslib behavior, covered above, counts those reads toward depth as `N`. + /// seqair currently appears to drop them because no base/quality exists at qpos. + #[cfg(feature = "seqair-pileup")] + #[test] + #[ignore = "requires upstream seqair empty-SEQ pileup support"] + fn seqair_empty_seq_matches_htslib_after_upstream_fix() { + todo!("construct the empty-SEQ fixture above with seqair and compare against htslib output") + } } diff --git a/src/lib/read_filter.rs b/src/lib/read_filter.rs index c0a6a82..3c7c26b 100644 --- a/src/lib/read_filter.rs +++ b/src/lib/read_filter.rs @@ -1,11 +1,43 @@ //! A trait and default implementation of a read filter. -use rust_htslib::bam::pileup::Alignment; use rust_htslib::bam::record::Record; +/// Minimal read-level data needed by [`ReadFilter`] implementations. +pub trait ReadView { + /// SAM flags for the read. + fn flags(&self) -> u16; + + /// Mapping quality for the read. + fn mapq(&self) -> u8; +} + +impl ReadView for Record { + #[inline(always)] + fn flags(&self) -> u16 { + self.flags() + } + + #[inline(always)] + fn mapq(&self) -> u8 { + self.mapq() + } +} + +impl ReadView for std::rc::Rc { + #[inline(always)] + fn flags(&self) -> u16 { + self.as_ref().flags() + } + + #[inline(always)] + fn mapq(&self) -> u8 { + self.as_ref().mapq() + } +} + /// Anything that implements ReadFilter can apply a filter set to read. pub trait ReadFilter { /// filters a read, true is pass, false if fail - fn filter_read(&self, read: &Record, alignment: Option<&Alignment>) -> bool; + fn filter_read(&self, read: &R) -> bool; } /// A straightforward read filter. @@ -29,7 +61,7 @@ impl DefaultReadFilter { impl ReadFilter for DefaultReadFilter { /// Filter reads based SAM flags and mapping quality #[inline(always)] - fn filter_read(&self, read: &Record, _alignment: Option<&Alignment>) -> bool { + fn filter_read(&self, read: &R) -> bool { let flags = read.flags(); (!flags) & self.include_flags == 0 && flags & self.exclude_flags == 0 From a26b50b4726debf789536aa4863860365d137420 Mon Sep 17 00:00:00 2001 From: Seth Stadick Date: Sun, 3 May 2026 06:59:44 -0400 Subject: [PATCH 2/5] perf: keep htslib pileup counting specialized --- src/lib/position/mate_fix.rs | 6 +++ src/lib/position/pileup_position.rs | 77 ++++++++++++++++++++++++++--- 2 files changed, 75 insertions(+), 8 deletions(-) diff --git a/src/lib/position/mate_fix.rs b/src/lib/position/mate_fix.rs index 6bd8dcb..3b824c0 100644 --- a/src/lib/position/mate_fix.rs +++ b/src/lib/position/mate_fix.rs @@ -233,12 +233,15 @@ pub(crate) trait PileupReadView: ReadView { fn base_qual(&self) -> Option; /// Whether this observation is a reference skip. + #[cfg(feature = "seqair-pileup")] fn is_refskip(&self) -> bool; /// Whether this observation is a deletion or reference skip. + #[cfg(feature = "seqair-pileup")] fn is_del(&self) -> bool; /// Whether this observation has an insertion immediately after it. + #[cfg(feature = "seqair-pileup")] fn has_insertion(&self) -> bool; } @@ -281,16 +284,19 @@ impl<'a> PileupReadView for (Alignment<'a>, Record) { self.1.qual().get(qpos).copied() } + #[cfg(feature = "seqair-pileup")] #[inline(always)] fn is_refskip(&self) -> bool { self.0.is_refskip() } + #[cfg(feature = "seqair-pileup")] #[inline(always)] fn is_del(&self) -> bool { self.0.is_del() } + #[cfg(feature = "seqair-pileup")] #[inline(always)] fn has_insertion(&self) -> bool { matches!(self.0.indel(), rust_htslib::bam::pileup::Indel::Ins(_)) diff --git a/src/lib/position/pileup_position.rs b/src/lib/position/pileup_position.rs index 40c036f..7da2aed 100644 --- a/src/lib/position/pileup_position.rs +++ b/src/lib/position/pileup_position.rs @@ -2,7 +2,11 @@ use crate::position::Position; use crate::read_filter::ReadFilter; use itertools::Itertools; -use rust_htslib::bam::{self, HeaderView, pileup::Pileup}; +use rust_htslib::bam::{ + self, HeaderView, + pileup::{Alignment, Pileup}, + record::Record, +}; use serde::Serialize; use smartstring::{LazyCompact, SmartString, alias::String}; use std::default; @@ -74,7 +78,8 @@ impl Position for PileupPosition { } impl PileupPosition { - /// Given a pileup read observation, update the counts at this position + /// Given a pileup read observation, update the counts at this position. + #[cfg(feature = "seqair-pileup")] #[inline(always)] fn update( &mut self, @@ -126,6 +131,63 @@ impl PileupPosition { } } + /// Given an htslib pileup read observation, update the counts at this position. + #[inline(always)] + fn update_htslib( + &mut self, + alignment: &Alignment, + record: &Record, + read_filter: &F, + base_filter: Option, + recommended_base: Option, + mates_resolved: bool, + ) { + if !read_filter.filter_read(record) { + self.depth -= 1; + self.fail += 1; + return; + } + // NB: Order matters here, a refskip is true for both is_del and is_refskip + // while a true del is only true for is_del + if alignment.is_refskip() { + self.ref_skip += 1; + self.depth -= 1; + } else if alignment.is_del() { + self.del += 1; + } else { + // We have an actual base! + + // Check if we are checking the base quality score + // && Check if the base quality score is greater or equal to than the cutoff + if let Some(base_qual_filter) = base_filter + && (record.seq().is_empty() + || record.qual()[alignment.qpos().unwrap()] < base_qual_filter) + { + self.n += 1 + } else if let Some(b) = recommended_base { + self.add_base(b); + } else if record.seq().is_empty() { + self.n += 1 + } else { + match (record.seq()[alignment.qpos().unwrap()] as char).to_ascii_uppercase() { + 'A' => self.a += 1, + 'C' => self.c += 1, + 'T' | 'U' => self.t += 1, + 'G' => self.g += 1, + _ => self.n += 1, + } + } + // Check for insertions + if let bam::pileup::Indel::Ins(_len) = alignment.indel() { + self.ins += 1; + } + } + + if mates_resolved { + self.count_of_mate_resolutions += 1; + } + } + #[inline(always)] fn add_base(&mut self, base: Base) { match base { @@ -168,8 +230,7 @@ impl PileupPosition { for alignment in pileup.alignments() { let record = alignment.record(); - let read = (alignment, record); - Self::update(&mut pos, &read, read_filter, base_filter, None, false); + pos.update_htslib(&alignment, &record, read_filter, base_filter, None, false); } pos } @@ -226,9 +287,9 @@ impl PileupPosition { // Deal explicitly with mate overlap, they are already ordered correctly let result = mate_fix_strat.cmp(&best, &second_best, read_filter); pos.depth -= total_reads - 1; - Self::update( - &mut pos, - &best, + pos.update_htslib( + &best.0, + &best.1, read_filter, base_filter, result.recommended_base, @@ -236,7 +297,7 @@ impl PileupPosition { ); } else { pos.depth -= total_reads - 1; - Self::update(&mut pos, &best, read_filter, base_filter, None, false); + pos.update_htslib(&best.0, &best.1, read_filter, base_filter, None, false); } } pos From 03afa040a24fb5a5fe7dd09396f3ace49a3c02d3 Mon Sep 17 00:00:00 2001 From: Seth Stadick Date: Sun, 3 May 2026 07:41:49 -0400 Subject: [PATCH 3/5] perf: specialize seqair non-mate counting --- src/lib/position/mate_fix.rs | 13 ++++++ src/lib/position/pileup_position.rs | 71 +++++++++++++++++++++++++++-- 2 files changed, 81 insertions(+), 3 deletions(-) diff --git a/src/lib/position/mate_fix.rs b/src/lib/position/mate_fix.rs index 3b824c0..12bfcfc 100644 --- a/src/lib/position/mate_fix.rs +++ b/src/lib/position/mate_fix.rs @@ -303,6 +303,19 @@ impl<'a> PileupReadView for (Alignment<'a>, Record) { } } +#[cfg(feature = "seqair-pileup")] +impl ReadView for seqair::bam::pileup::PileupAlignment { + #[inline(always)] + fn flags(&self) -> u16 { + self.flags.raw() + } + + #[inline(always)] + fn mapq(&self) -> u8 { + self.mapq + } +} + #[cfg(feature = "seqair-pileup")] impl<'a, 'store, U> ReadView for seqair::bam::pileup::AlignmentView<'a, 'store, U> { #[inline(always)] diff --git a/src/lib/position/pileup_position.rs b/src/lib/position/pileup_position.rs index 7da2aed..e7fcae6 100644 --- a/src/lib/position/pileup_position.rs +++ b/src/lib/position/pileup_position.rs @@ -131,6 +131,47 @@ impl PileupPosition { } } + /// Given a raw seqair pileup read observation, update the counts at this position. + #[cfg(feature = "seqair-pileup")] + #[inline(always)] + fn update_seqair_raw( + &mut self, + alignment: &seqair::bam::pileup::PileupAlignment, + read_filter: &F, + base_filter: Option, + ) { + if !read_filter.filter_read(alignment) { + self.depth -= 1; + self.fail += 1; + return; + } + + match alignment.op() { + seqair::bam::pileup::PileupOp::RefSkip => { + self.ref_skip += 1; + self.depth -= 1; + } + seqair::bam::pileup::PileupOp::Deletion { .. } => { + self.del += 1; + } + seqair::bam::pileup::PileupOp::ComplexIndel { is_refskip, .. } => { + if *is_refskip { + self.ref_skip += 1; + self.depth -= 1; + } else { + self.del += 1; + } + } + seqair::bam::pileup::PileupOp::Match { base, qual, .. } => { + self.update_seqair_base(*base, *qual, base_filter); + } + seqair::bam::pileup::PileupOp::Insertion { base, qual, .. } => { + self.update_seqair_base(*base, *qual, base_filter); + self.ins += 1; + } + } + } + /// Given an htslib pileup read observation, update the counts at this position. #[inline(always)] fn update_htslib( @@ -188,6 +229,30 @@ impl PileupPosition { } } + #[cfg(feature = "seqair-pileup")] + #[inline(always)] + fn update_seqair_base( + &mut self, + base: seqair_types::Base, + qual: seqair_types::BaseQuality, + base_filter: Option, + ) { + if let Some(base_qual_filter) = base_filter + && qual.get().is_none_or(|qual| qual < base_qual_filter) + { + self.n += 1; + return; + } + + match base { + seqair_types::Base::A => self.a += 1, + seqair_types::Base::C => self.c += 1, + seqair_types::Base::G => self.g += 1, + seqair_types::Base::T => self.t += 1, + seqair_types::Base::Unknown => self.n += 1, + } + } + #[inline(always)] fn add_base(&mut self, base: Base) { match base { @@ -305,7 +370,7 @@ impl PileupPosition { /// Convert a seqair pileup column into a `Position`. #[cfg(feature = "seqair-pileup")] - #[inline] + #[inline(always)] pub fn from_seqair_column( ref_seq: String, column: &seqair::bam::pileup::PileupColumn<'_, U>, @@ -315,8 +380,8 @@ impl PileupPosition { let mut pos = Self::new(ref_seq, *column.pos()); pos.depth = u32::try_from(column.depth()).unwrap_or(u32::MAX); - for alignment in column.alignments() { - Self::update(&mut pos, &alignment, read_filter, base_filter, None, false); + for alignment in column.raw_alignments() { + pos.update_seqair_raw(alignment, read_filter, base_filter); } pos } From db0d3b36311c8bae9cba8cf16c5e7850fd825a79 Mon Sep 17 00:00:00 2001 From: Seth Stadick Date: Sun, 3 May 2026 08:33:54 -0400 Subject: [PATCH 4/5] feat: add experimental seqair only-depth path --- src/commands/only_depth.rs | 527 +++++++++++++++++++++++++++++++++++++ 1 file changed, 527 insertions(+) diff --git a/src/commands/only_depth.rs b/src/commands/only_depth.rs index ef4c4a6..11ad8b0 100644 --- a/src/commands/only_depth.rs +++ b/src/commands/only_depth.rs @@ -21,6 +21,9 @@ use std::{collections::HashMap, path::PathBuf}; use std::{convert::TryFrom, rc::Rc}; use structopt::{StructOpt, clap::AppSettings}; +#[cfg(feature = "seqair-pileup")] +use perbase_lib::read_filter::ReadView; + /// Calculate only the depth at each base. #[derive(StructOpt)] #[structopt(author, setting = AppSettings::ArgRequiredElseHelp)] @@ -90,6 +93,11 @@ pub struct OnlyDepth { #[structopt(long, short = "x")] fast_mode: bool, + /// Use the experimental seqair-backed reader with raw pre-filtering. Requires unmapped reads to be excluded (e.g. -F 4, or -F 3852 instead of -F 3848). + #[cfg(feature = "seqair-pileup")] + #[structopt(long = "seqair")] + seqair: bool, + /// Skip merging adjacent bases that have the same depth. #[structopt(long, short = "n")] no_merge: bool, @@ -114,9 +122,87 @@ pub struct OnlyDepth { zero_base: bool, } +#[cfg(feature = "seqair-pileup")] +fn path_looks_like_cram(reads: &std::path::Path) -> bool { + reads + .extension() + .and_then(|extension| extension.to_str()) + .is_some_and(|extension| extension.eq_ignore_ascii_case("cram")) +} + +#[cfg(feature = "seqair-pileup")] +struct SeqairRawReadView { + flags: u16, + mapq: u8, +} + +#[cfg(feature = "seqair-pileup")] +impl ReadView for SeqairRawReadView { + #[inline(always)] + fn flags(&self) -> u16 { + self.flags + } + + #[inline(always)] + fn mapq(&self) -> u8 { + self.mapq + } +} + +#[cfg(feature = "seqair-pileup")] +struct OnlyDepthSeqairFilter<'a, F: ReadFilter> { + read_filter: &'a F, +} + +#[cfg(feature = "seqair-pileup")] +impl Copy for OnlyDepthSeqairFilter<'_, F> {} + +#[cfg(feature = "seqair-pileup")] +impl Clone for OnlyDepthSeqairFilter<'_, F> { + fn clone(&self) -> Self { + *self + } +} + +#[cfg(feature = "seqair-pileup")] +impl seqair::bam::record_store::CustomizeRecordStore + for OnlyDepthSeqairFilter<'_, F> +{ + type Extra = (); + + #[inline(always)] + fn filter_raw(&mut self, fields: &seqair::bam::record_store::FilterRawFields<'_>) -> bool { + let read = SeqairRawReadView { + flags: fields.flags.raw(), + mapq: fields.mapq, + }; + self.read_filter.filter_read(&read) + } + + #[inline(always)] + fn compute( + &mut self, + _rec: &seqair::bam::record_store::SlimRecord, + _store: &seqair::bam::RecordStore<()>, + ) { + } +} + impl OnlyDepth { pub fn run(self) -> Result<()> { info!("Running only-depth on: {:?}", self.reads); + #[cfg(feature = "seqair-pileup")] + if self.seqair && path_looks_like_cram(&self.reads) { + anyhow::bail!( + "The experimental seqair only-depth path currently supports BAM input only" + ); + } + #[cfg(feature = "seqair-pileup")] + if self.seqair && self.exclude_flags & 0x4 == 0 { + anyhow::bail!( + "The experimental seqair only-depth path requires unmapped reads to be excluded; add -F 4, or use -F 3852 if you would otherwise use -F 3848" + ); + } let cpus = utils::determine_allowed_cpus(self.threads)?; let mut writer = utils::get_writer( @@ -139,6 +225,8 @@ impl OnlyDepth { if self.zero_base { 0 } else { 1 }, read_filter, ); + #[cfg(feature = "seqair-pileup")] + let processor = processor.with_seqair(self.seqair)?; let par_granges_runner = par_granges::ParGranges::new( self.reads.clone(), @@ -181,6 +269,23 @@ impl OnlyDepth { // This is a heuristic, so it just have to never have a false negative and not have false positives > N% of the time (tlen / 2) < read_len || (record.mpos() - record.pos()).abs() < read_len } + + #[cfg(feature = "seqair-pileup")] + #[inline] + fn maybe_overlaps_mate_seqair( + rec: &seqair::bam::record_store::SlimRecord, + reference_stop: u32, + ) -> bool { + if !rec.flags.is_paired() || rec.tid != rec.next_ref_id { + return false; + } + + let tlen = i64::from(rec.template_len).abs(); + let read_len = i64::from(reference_stop) - rec.pos.as_i64(); + let mate_delta = i64::from(rec.next_pos) - rec.pos.as_i64(); + // Keep this heuristic byte-for-byte equivalent in intent to the htslib path above. + (tlen / 2) < read_len || mate_delta.abs() < read_len + } } /// Holds the info needed for [par_io::RegionProcessor] implementation @@ -201,6 +306,12 @@ struct OnlyDepthProcessor { read_filter: F, /// 0-based or 1-based coordinate output coord_base: u32, + /// Use seqair's indexed reader instead of htslib. + #[cfg(feature = "seqair-pileup")] + seqair: bool, + /// Template seqair reader to fork per region, sharing parsed header/index. + #[cfg(feature = "seqair-pileup")] + seqair_reader_template: Option, } impl OnlyDepthProcessor { @@ -225,7 +336,20 @@ impl OnlyDepthProcessor { keep_zeros, coord_base, read_filter, + #[cfg(feature = "seqair-pileup")] + seqair: false, + #[cfg(feature = "seqair-pileup")] + seqair_reader_template: None, + } + } + + #[cfg(feature = "seqair-pileup")] + fn with_seqair(mut self, seqair: bool) -> Result { + self.seqair = seqair; + if seqair { + self.seqair_reader_template = Some(seqair::reader::IndexedReader::open(&self.reads)?); } + Ok(self) } /// Sum the counts within the region to get the depths at each RangePosition @@ -456,6 +580,333 @@ impl OnlyDepthProcessor { let contig = std::str::from_utf8(header.tid2name(tid)).unwrap(); self.sum_counter(counter, contig, start, stop) } + + #[cfg(feature = "seqair-pileup")] + fn process_region_seqair(&self, tid: u32, start: u32, stop: u32) -> Vec { + if start >= stop { + return Vec::new(); + } + + let (store, contig) = self.fetch_seqair_store(tid, start, stop); + match (self.fast_mode, self.mate_fix) { + (false, false) => self.process_seqair_normal_no_mate(store, &contig, start, stop), + (false, true) => self.process_seqair_normal_mate_aware(store, &contig, start, stop), + (true, false) => self.process_seqair_fast_no_mate(store, &contig, start, stop), + (true, true) => self.process_seqair_fast_mate_aware(store, &contig, start, stop), + } + } + + #[cfg(feature = "seqair-pileup")] + fn fetch_seqair_store( + &self, + tid: u32, + start: u32, + stop: u32, + ) -> (seqair::bam::RecordStore<()>, String) { + let start_pos = seqair::bam::Pos0::new(start).expect("seqair start position"); + // seqair uses inclusive end positions; perbase regions are half-open [start, stop). + let end_pos = seqair::bam::Pos0::new(stop - 1).expect("seqair end position"); + let mut store = seqair::bam::RecordStore::new(); + let mut read_filter = OnlyDepthSeqairFilter { + read_filter: &self.read_filter, + }; + + let mut reader = self + .seqair_reader_template + .as_ref() + .expect("seqair reader template") + .fork() + .expect("forked seqair reader"); + let contig = String::from( + reader + .header() + .target_name(tid) + .expect("seqair target name"), + ); + reader + .fetch_into_customized(tid, start_pos, end_pos, &mut store, &mut read_filter) + .expect("seqair fetched a region"); + + (store, contig) + } + + #[cfg(feature = "seqair-pileup")] + #[inline(always)] + fn adjusted_seqair_interval( + counter_len: usize, + interval_start: u32, + interval_stop: u32, + region_start: u32, + region_stop: u32, + ) -> Option<(usize, usize, bool)> { + if interval_start >= region_stop || interval_stop <= region_start { + return None; + } + + let adjusted_start = if interval_start < region_start { + 0 + } else { + (interval_start - region_start) as usize + }; + + let mut dont_count_stop = false; + let adjusted_stop = if interval_stop >= region_stop { + dont_count_stop = true; + counter_len - 1 + } else { + (interval_stop - region_start) as usize + }; + + Some((adjusted_start, adjusted_stop, dont_count_stop)) + } + + #[cfg(feature = "seqair-pileup")] + #[inline(always)] + fn add_adjusted_seqair_interval( + counter: &mut [i32], + adjusted_start: usize, + adjusted_stop: usize, + dont_count_stop: bool, + ) { + counter[adjusted_start] += 1; + if !dont_count_stop { + counter[adjusted_stop] -= 1; + } + } + + #[cfg(feature = "seqair-pileup")] + #[inline(always)] + fn seqair_reference_stop( + rec: &seqair::bam::record_store::SlimRecord, + cigar: &[seqair::bam::CigarOp], + ) -> u32 { + if cigar.iter().any(|op| op.consumes_ref()) { + (*rec.end_pos) + .checked_add(1) + .expect("seqair end position overflow") + } else { + *rec.end_pos + } + } + + #[cfg(feature = "seqair-pileup")] + fn merge_seqair_mate_intervals( + counter: &mut [i32], + maties: HashMap>>, + ) { + for (_qname, ivs) in maties { + let mut lapper = Lapper::new(ivs); + lapper.merge_overlaps(); + for iv in lapper.intervals { + counter[iv.start] += 1; + if !iv.val { + counter[iv.stop] -= 1; + } + } + } + } + + #[cfg(feature = "seqair-pileup")] + fn process_seqair_normal_no_mate( + &self, + store: seqair::bam::RecordStore<()>, + contig: &str, + start: u32, + stop: u32, + ) -> Vec { + let mut counter: Vec = vec![0; (stop - start) as usize]; + + for idx in 0..store.len() { + let idx = u32::try_from(idx).expect("seqair record index"); + let rec = store.record(idx); + let mut ref_pos = *rec.pos; + for op in store.cigar(idx) { + let len = op.len(); + match op.op_type() { + seqair::bam::cigar::CigarOpType::Match + | seqair::bam::cigar::CigarOpType::SeqMatch + | seqair::bam::cigar::CigarOpType::SeqMismatch + | seqair::bam::cigar::CigarOpType::Deletion => { + let block_start = ref_pos; + let block_stop = ref_pos.checked_add(len).expect("seqair CIGAR overflow"); + if let Some((adjusted_start, adjusted_stop, dont_count_stop)) = + Self::adjusted_seqair_interval( + counter.len(), + block_start, + block_stop, + start, + stop, + ) + { + Self::add_adjusted_seqair_interval( + &mut counter, + adjusted_start, + adjusted_stop, + dont_count_stop, + ); + } + ref_pos = block_stop; + } + seqair::bam::cigar::CigarOpType::RefSkip => { + ref_pos = ref_pos.checked_add(len).expect("seqair CIGAR overflow"); + } + seqair::bam::cigar::CigarOpType::Insertion + | seqair::bam::cigar::CigarOpType::SoftClip + | seqair::bam::cigar::CigarOpType::HardClip + | seqair::bam::cigar::CigarOpType::Padding + | seqair::bam::cigar::CigarOpType::Unknown(_) => {} + } + } + } + + self.sum_counter(counter, contig, start, stop) + } + + #[cfg(feature = "seqair-pileup")] + fn process_seqair_normal_mate_aware( + &self, + store: seqair::bam::RecordStore<()>, + contig: &str, + start: u32, + stop: u32, + ) -> Vec { + let mut counter: Vec = vec![0; (stop - start) as usize]; + let mut maties: HashMap>> = HashMap::new(); + + for idx in 0..store.len() { + let idx = u32::try_from(idx).expect("seqair record index"); + let rec = store.record(idx); + let cigar = store.cigar(idx); + let reference_stop = Self::seqair_reference_stop(rec, cigar); + let maybe_mate_key = if OnlyDepth::maybe_overlaps_mate_seqair(rec, reference_stop) { + Some(String::from( + std::str::from_utf8(store.qname(idx)).expect("Convert qname"), + )) + } else { + None + }; + + let mut ref_pos = *rec.pos; + for op in cigar { + let len = op.len(); + match op.op_type() { + seqair::bam::cigar::CigarOpType::Match + | seqair::bam::cigar::CigarOpType::SeqMatch + | seqair::bam::cigar::CigarOpType::SeqMismatch + | seqair::bam::cigar::CigarOpType::Deletion => { + let block_start = ref_pos; + let block_stop = ref_pos.checked_add(len).expect("seqair CIGAR overflow"); + if let Some((adjusted_start, adjusted_stop, dont_count_stop)) = + Self::adjusted_seqair_interval( + counter.len(), + block_start, + block_stop, + start, + stop, + ) + { + if let Some(mate_key) = &maybe_mate_key { + maties.entry(mate_key.clone()).or_default().push(Interval { + start: adjusted_start, + stop: adjusted_stop, + val: dont_count_stop, + }); + } else { + Self::add_adjusted_seqair_interval( + &mut counter, + adjusted_start, + adjusted_stop, + dont_count_stop, + ); + } + } + ref_pos = block_stop; + } + seqair::bam::cigar::CigarOpType::RefSkip => { + ref_pos = ref_pos.checked_add(len).expect("seqair CIGAR overflow"); + } + seqair::bam::cigar::CigarOpType::Insertion + | seqair::bam::cigar::CigarOpType::SoftClip + | seqair::bam::cigar::CigarOpType::HardClip + | seqair::bam::cigar::CigarOpType::Padding + | seqair::bam::cigar::CigarOpType::Unknown(_) => {} + } + } + } + + Self::merge_seqair_mate_intervals(&mut counter, maties); + self.sum_counter(counter, contig, start, stop) + } + + #[cfg(feature = "seqair-pileup")] + fn process_seqair_fast_no_mate( + &self, + store: seqair::bam::RecordStore<()>, + contig: &str, + start: u32, + stop: u32, + ) -> Vec { + let mut counter: Vec = vec![0; (stop - start) as usize]; + + for idx in 0..store.len() { + let idx = u32::try_from(idx).expect("seqair record index"); + let rec = store.record(idx); + let reference_stop = Self::seqair_reference_stop(rec, store.cigar(idx)); + if let Some((adjusted_start, adjusted_stop, dont_count_stop)) = + Self::adjusted_seqair_interval(counter.len(), *rec.pos, reference_stop, start, stop) + { + Self::add_adjusted_seqair_interval( + &mut counter, + adjusted_start, + adjusted_stop, + dont_count_stop, + ); + } + } + + self.sum_counter(counter, contig, start, stop) + } + + #[cfg(feature = "seqair-pileup")] + fn process_seqair_fast_mate_aware( + &self, + store: seqair::bam::RecordStore<()>, + contig: &str, + start: u32, + stop: u32, + ) -> Vec { + let mut counter: Vec = vec![0; (stop - start) as usize]; + let mut maties: HashMap>> = HashMap::new(); + + for idx in 0..store.len() { + let idx = u32::try_from(idx).expect("seqair record index"); + let rec = store.record(idx); + let reference_stop = Self::seqair_reference_stop(rec, store.cigar(idx)); + if let Some((adjusted_start, adjusted_stop, dont_count_stop)) = + Self::adjusted_seqair_interval(counter.len(), *rec.pos, reference_stop, start, stop) + { + if OnlyDepth::maybe_overlaps_mate_seqair(rec, reference_stop) { + let qname = + String::from(std::str::from_utf8(store.qname(idx)).expect("Convert qname")); + maties.entry(qname).or_default().push(Interval { + start: adjusted_start, + stop: adjusted_stop, + val: dont_count_stop, + }); + } else { + Self::add_adjusted_seqair_interval( + &mut counter, + adjusted_start, + adjusted_stop, + dont_count_stop, + ); + } + } + } + + Self::merge_seqair_mate_intervals(&mut counter, maties); + self.sum_counter(counter, contig, start, stop) + } } /// Implement [par_io::RegionProcessor] for [SimpleProcessor] @@ -468,6 +919,11 @@ impl RegionProcessor for OnlyDepthProcessor { /// the defined filters fn process_region(&self, tid: u32, start: u32, stop: u32) -> Vec { trace!("Processing region {}(tid):{}-{}", tid, start, stop); + #[cfg(feature = "seqair-pileup")] + if self.seqair { + return self.process_region_seqair(tid, start, stop); + } + if self.fast_mode { self.process_region_fast(tid, start, stop) } else { @@ -858,6 +1314,77 @@ mod tests { positions } + #[cfg(feature = "seqair-pileup")] + fn collect_only_depth_tuples( + bam_path: PathBuf, + fast_mode: bool, + mate_fix: bool, + seqair: bool, + read_filter: DefaultReadFilter, + ) -> Vec<(std::string::String, u32, u32, u32)> { + let cpus = utils::determine_allowed_cpus(8).unwrap(); + let onlydepth_processor = OnlyDepthProcessor::new( + bam_path.clone(), + None, + mate_fix, + fast_mode, + false, + false, + 0, + read_filter, + ) + .with_seqair(seqair) + .unwrap(); + + let par_granges_runner = par_granges::ParGranges::new( + bam_path, + None, + None, + None, + true, + Some(cpus), + Some(1_000_000), + Some(0.001), + onlydepth_processor, + ); + let mut positions: Vec<_> = par_granges_runner + .process() + .unwrap() + .into_iter() + .map(|p| (p.ref_seq.to_string(), p.pos, p.end, p.depth)) + .collect(); + positions.sort(); + positions + } + + #[cfg(feature = "seqair-pileup")] + #[rstest( + fast_mode => [false, true], + mate_fix => [false, true] + )] + fn seqair_only_depth_matches_htslib( + fast_mode: bool, + mate_fix: bool, + bamfile: (PathBuf, TempDir), + ) { + let htslib_positions = collect_only_depth_tuples( + bamfile.0.clone(), + fast_mode, + mate_fix, + false, + DefaultReadFilter::new(0, 512, 0), + ); + let seqair_positions = collect_only_depth_tuples( + bamfile.0, + fast_mode, + mate_fix, + true, + DefaultReadFilter::new(0, 512, 0), + ); + + assert_eq!(htslib_positions, seqair_positions); + } + #[rstest( positions, awareness_modifier, From 92fa59341aecd95bc78d57f18e6b7176043786e6 Mon Sep 17 00:00:00 2001 From: Seth Stadick Date: Sat, 9 May 2026 06:21:39 -0400 Subject: [PATCH 5/5] chore: use seqair 0.1.0 APIs --- Cargo.lock | 6 +- Cargo.toml | 4 +- src/commands/only_depth.rs | 126 ++++++++++++++++++++++------ src/lib/position/pileup_position.rs | 91 ++++++++++++++++++-- 4 files changed, 191 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 86f89e7..bf330c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1882,7 +1882,8 @@ checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" [[package]] name = "seqair" version = "0.1.0" -source = "git+https://tangled.org/softleif.se/seqair.git?rev=28766a811596437f685f76aac753f963cde96f7f#28766a811596437f685f76aac753f963cde96f7f" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876e242a5707fad07fc7a0cc413365908e15e04df6a4aec0d3ea054b05bde72f" dependencies = [ "bytemuck", "bzip2", @@ -1902,7 +1903,8 @@ dependencies = [ [[package]] name = "seqair-types" version = "0.1.0" -source = "git+https://tangled.org/softleif.se/seqair.git?rev=28766a811596437f685f76aac753f963cde96f7f#28766a811596437f685f76aac753f963cde96f7f" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a696275aeb72a9d6c67c8885194311d41ca27c1e4ad2dfdc528a90e68bb08555" dependencies = [ "bytemuck", "serde", diff --git a/Cargo.toml b/Cargo.toml index 4861906..929e7a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,8 +42,8 @@ lru_time_cache = "0.11.1" num_cpus = "1.13.0" rayon = "1.4.0" rust-htslib = {version = "0.51", default-features = false, features = ["libdeflate"]} -seqair = { git = "https://tangled.org/softleif.se/seqair.git", rev = "28766a811596437f685f76aac753f963cde96f7f", optional = true } -seqair-types = { git = "https://tangled.org/softleif.se/seqair.git", rev = "28766a811596437f685f76aac753f963cde96f7f", optional = true } +seqair = { version = "0.1.0", optional = true } +seqair-types = { version = "0.1.0", optional = true } rust-lapper = "1.0" serde = { version = "1.0.116", features = ["derive"] } smartstring = { version = "1.0.1", features = ["serde"] } diff --git a/src/commands/only_depth.rs b/src/commands/only_depth.rs index 11ad8b0..46e3498 100644 --- a/src/commands/only_depth.rs +++ b/src/commands/only_depth.rs @@ -93,7 +93,7 @@ pub struct OnlyDepth { #[structopt(long, short = "x")] fast_mode: bool, - /// Use the experimental seqair-backed reader with raw pre-filtering. Requires unmapped reads to be excluded (e.g. -F 4, or -F 3852 instead of -F 3848). + /// Use the experimental seqair-backed reader with raw pre-filtering. #[cfg(feature = "seqair-pileup")] #[structopt(long = "seqair")] seqair: bool, @@ -197,12 +197,6 @@ impl OnlyDepth { "The experimental seqair only-depth path currently supports BAM input only" ); } - #[cfg(feature = "seqair-pileup")] - if self.seqair && self.exclude_flags & 0x4 == 0 { - anyhow::bail!( - "The experimental seqair only-depth path requires unmapped reads to be excluded; add -F 4, or use -F 3852 if you would otherwise use -F 3848" - ); - } let cpus = utils::determine_allowed_cpus(self.threads)?; let mut writer = utils::get_writer( @@ -347,7 +341,16 @@ impl OnlyDepthProcessor { fn with_seqair(mut self, seqair: bool) -> Result { self.seqair = seqair; if seqair { - self.seqair_reader_template = Some(seqair::reader::IndexedReader::open(&self.reads)?); + let reader = match seqair::reader::IndexedReader::open(&self.reads)? { + // `only-depth` consumes raw fetched records (like htslib `view`), not seqair's + // pileup stream. Keep placed-unmapped BAM records in the fetch stream and let + // perbase's normal read-filter semantics decide whether `-F 4` excludes them. + seqair::reader::IndexedReader::Bam(reader) => { + seqair::reader::IndexedReader::Bam(reader.keep_unmapped(true)) + } + reader => reader, + }; + self.seqair_reader_template = Some(reader); } Ok(self) } @@ -689,6 +692,24 @@ impl OnlyDepthProcessor { } } + #[cfg(feature = "seqair-pileup")] + #[inline(always)] + fn seqair_fast_reference_stop( + rec: &seqair::bam::record_store::SlimRecord, + cigar: &[seqair::bam::CigarOp], + ) -> u32 { + // Match rust-htslib's `Record::reference_end()` behavior for placed-unmapped + // records: even if a CIGAR is present, fast-mode treats the record as a + // one-base placed interval. Normal `only-depth` still walks CIGAR blocks. + if rec.flags.is_unmapped() { + (*rec.pos) + .checked_add(1) + .expect("seqair unmapped end position overflow") + } else { + Self::seqair_reference_stop(rec, cigar) + } + } + #[cfg(feature = "seqair-pileup")] fn merge_seqair_mate_intervals( counter: &mut [i32], @@ -716,11 +737,9 @@ impl OnlyDepthProcessor { ) -> Vec { let mut counter: Vec = vec![0; (stop - start) as usize]; - for idx in 0..store.len() { - let idx = u32::try_from(idx).expect("seqair record index"); - let rec = store.record(idx); + for rec in store.records() { let mut ref_pos = *rec.pos; - for op in store.cigar(idx) { + for op in rec.cigar(&store).expect("seqair CIGAR") { let len = op.len(); match op.op_type() { seqair::bam::cigar::CigarOpType::Match @@ -773,14 +792,13 @@ impl OnlyDepthProcessor { let mut counter: Vec = vec![0; (stop - start) as usize]; let mut maties: HashMap>> = HashMap::new(); - for idx in 0..store.len() { - let idx = u32::try_from(idx).expect("seqair record index"); - let rec = store.record(idx); - let cigar = store.cigar(idx); + for rec in store.records() { + let cigar = rec.cigar(&store).expect("seqair CIGAR"); let reference_stop = Self::seqair_reference_stop(rec, cigar); let maybe_mate_key = if OnlyDepth::maybe_overlaps_mate_seqair(rec, reference_stop) { Some(String::from( - std::str::from_utf8(store.qname(idx)).expect("Convert qname"), + std::str::from_utf8(rec.qname(&store).expect("seqair qname")) + .expect("Convert qname"), )) } else { None @@ -848,10 +866,9 @@ impl OnlyDepthProcessor { ) -> Vec { let mut counter: Vec = vec![0; (stop - start) as usize]; - for idx in 0..store.len() { - let idx = u32::try_from(idx).expect("seqair record index"); - let rec = store.record(idx); - let reference_stop = Self::seqair_reference_stop(rec, store.cigar(idx)); + for rec in store.records() { + let reference_stop = + Self::seqair_fast_reference_stop(rec, rec.cigar(&store).expect("seqair CIGAR")); if let Some((adjusted_start, adjusted_stop, dont_count_stop)) = Self::adjusted_seqair_interval(counter.len(), *rec.pos, reference_stop, start, stop) { @@ -878,16 +895,17 @@ impl OnlyDepthProcessor { let mut counter: Vec = vec![0; (stop - start) as usize]; let mut maties: HashMap>> = HashMap::new(); - for idx in 0..store.len() { - let idx = u32::try_from(idx).expect("seqair record index"); - let rec = store.record(idx); - let reference_stop = Self::seqair_reference_stop(rec, store.cigar(idx)); + for rec in store.records() { + let reference_stop = + Self::seqair_fast_reference_stop(rec, rec.cigar(&store).expect("seqair CIGAR")); if let Some((adjusted_start, adjusted_stop, dont_count_stop)) = Self::adjusted_seqair_interval(counter.len(), *rec.pos, reference_stop, start, stop) { if OnlyDepth::maybe_overlaps_mate_seqair(rec, reference_stop) { - let qname = - String::from(std::str::from_utf8(store.qname(idx)).expect("Convert qname")); + let qname = String::from( + std::str::from_utf8(rec.qname(&store).expect("seqair qname")) + .expect("Convert qname"), + ); maties.entry(qname).or_default().push(Interval { start: adjusted_start, stop: adjusted_stop, @@ -1385,6 +1403,60 @@ mod tests { assert_eq!(htslib_positions, seqair_positions); } + #[cfg(feature = "seqair-pileup")] + #[rstest(fast_mode => [false, true])] + fn seqair_only_depth_keeps_placed_unmapped_like_htslib_records(fast_mode: bool) { + let tempdir = tempdir().unwrap(); + let bam_path = tempdir.path().join("placed_unmapped.bam"); + + let mut header = bam::header::Header::new(); + let mut chr1 = bam::header::HeaderRecord::new(b"SQ"); + chr1.push_tag(b"SN", &"chr1".to_owned()); + chr1.push_tag(b"LN", &"100".to_owned()); + header.push_record(&chr1); + let view = bam::HeaderView::from_header(&header); + + let records = [ + Record::from_sam( + &view, + b"MAPPED\t0\tchr1\t1\t40\t10M\t*\t0\t0\tAAAAAAAAAA\tIIIIIIIIII", + ) + .unwrap(), + // Placed-unmapped records can be returned by htslib's raw record iterator after + // region fetch. seqair 0.1.0's `keep_unmapped(true)` lets perbase apply the same + // user read filters instead of dropping them before `filter_raw`. + Record::from_sam( + &view, + b"PLACED_UNMAPPED\t4\tchr1\t5\t0\t10M\t*\t0\t0\tCCCCCCCCCC\tIIIIIIIIII", + ) + .unwrap(), + ]; + + let mut writer = bam::Writer::from_path(&bam_path, &header, bam::Format::Bam).unwrap(); + for record in &records { + writer.write(record).unwrap(); + } + drop(writer); + bam::index::build(&bam_path, None, bam::index::Type::Bai, 1).unwrap(); + + let htslib_positions = collect_only_depth_tuples( + bam_path.clone(), + fast_mode, + false, + false, + DefaultReadFilter::new(0, 0, 0), + ); + let seqair_positions = collect_only_depth_tuples( + bam_path, + fast_mode, + false, + true, + DefaultReadFilter::new(0, 0, 0), + ); + + assert_eq!(htslib_positions, seqair_positions); + } + #[rstest( positions, awareness_modifier, diff --git a/src/lib/position/pileup_position.rs b/src/lib/position/pileup_position.rs index e7fcae6..7f726d9 100644 --- a/src/lib/position/pileup_position.rs +++ b/src/lib/position/pileup_position.rs @@ -995,13 +995,94 @@ mod tests { } } - /// TODO: enable once seqair yields pileup observations for empty SEQ (`*`) reads. - /// Current htslib behavior, covered above, counts those reads toward depth as `N`. - /// seqair currently appears to drop them because no base/quality exists at qpos. + /// seqair 0.1.0 yields `Base::Unknown` / unavailable quality for empty-SEQ (`*`) + /// pileup observations, matching htslib/perbase's depth-as-N behavior. #[cfg(feature = "seqair-pileup")] #[test] - #[ignore = "requires upstream seqair empty-SEQ pileup support"] fn seqair_empty_seq_matches_htslib_after_upstream_fix() { - todo!("construct the empty-SEQ fixture above with seqair and compare against htslib output") + use rust_htslib::bam::{IndexedReader, Writer, index}; + use std::path::Path; + use tempfile::tempdir; + + fn htslib_position( + bam_path: &Path, + read_filter: &DefaultReadFilter, + base_filter: Option, + ) -> PileupPosition { + let mut reader = IndexedReader::from_path(bam_path).unwrap(); + let header_view = reader.header().clone(); + reader.fetch(("chr1", 0, 1)).unwrap(); + for pileup_result in reader.pileup() { + let pileup = pileup_result.unwrap(); + if pileup.pos() == 0 { + return PileupPosition::from_pileup( + pileup, + &header_view, + read_filter, + base_filter, + ); + } + } + panic!("htslib pileup did not yield chr1:1"); + } + + fn seqair_position( + bam_path: &Path, + read_filter: &DefaultReadFilter, + base_filter: Option, + ) -> PileupPosition { + let mut reader = seqair::reader::IndexedReader::open(bam_path).unwrap(); + let tid = reader.header().tid("chr1").unwrap(); + let start = seqair::bam::Pos0::new(0).unwrap(); + let end = seqair::bam::Pos0::new(0).unwrap(); + let mut store = seqair::bam::RecordStore::new(); + reader.fetch_into(tid, start, end, &mut store).unwrap(); + let mut engine = seqair::bam::pileup::PileupEngine::new(store, start, end); + while let Some(column) = engine.pileups() { + if *column.pos() == 0 { + return PileupPosition::from_seqair_column( + String::from("chr1"), + &column, + read_filter, + base_filter, + ); + } + } + panic!("seqair pileup did not yield chr1:1"); + } + + let tempdir = tempdir().unwrap(); + let bam_path = tempdir.path().join("empty_seq_seqair.bam"); + + let mut header = bam::header::Header::new(); + let mut chr1 = bam::header::HeaderRecord::new(b"SQ"); + chr1.push_tag(b"SN", &"chr1".to_owned()); + chr1.push_tag(b"LN", &"100".to_owned()); + header.push_record(&chr1); + let view = bam::HeaderView::from_header(&header); + + let normal_record = Record::from_sam( + &view, + b"NORMAL\t0\tchr1\t1\t40\t25M\t*\t0\t0\tAAAAAAAAAAAAAAAAAAAAAAAAA\tIIIIIIIIIIIIIIIIIIIIIIIII", + ) + .unwrap(); + let empty_seq_record = + Record::from_sam(&view, b"EMPTY_SEQ\t0\tchr1\t1\t40\t25M\t*\t0\t0\t*\t*").unwrap(); + + let mut writer = Writer::from_path(&bam_path, &header, bam::Format::Bam).unwrap(); + writer.write(&normal_record).unwrap(); + writer.write(&empty_seq_record).unwrap(); + drop(writer); + index::build(&bam_path, None, index::Type::Bai, 1).unwrap(); + + let read_filter = DefaultReadFilter::new(0, 0, 0); + for base_filter in [None, Some(20)] { + let htslib = htslib_position(&bam_path, &read_filter, base_filter); + let seqair = seqair_position(&bam_path, &read_filter, base_filter); + assert_eq!(htslib, seqair); + assert_eq!(seqair.depth, 2); + assert_eq!(seqair.a, 1); + assert_eq!(seqair.n, 1); + } } }