diff --git a/CHANGELOG.md b/CHANGELOG.md index 02425a2..408cde5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,32 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +## 0.5.0 - 2026-06-13 + +### Added + +- **Caller-owned serial batched/buffered two-stage primitives** (additive): + - `SignBitmap::top_m_candidates_batched_serial_csr(&self, queries, m) -> CandidateBatch` + — serial (no rayon) CSR candidate generation; pair with the rerank below to + run a fully caller-scheduled two-stage search. + - `RankQuant::search_asymmetric_subset_batched_serial(..) -> SearchResults` and + `..._serial_into(.., &mut SubsetScratch, &mut out_scores, &mut out_indices)` + — serial batched subset rerank; the `_into` form is allocation-free after + scratch warmup (the integration contract for runtimes that own their own + thread pool / GIL release). + - New public types `CandidateBatch` (CSR candidate carrier) and `SubsetScratch` + (reusable rerank scratch). +- These primitives never enter rayon; the caller owns parallelism. No bundled + rayon convenience wrapper ships in this release — partition the query batch and + drive the serial `_into` primitive from your own pool. The existing + internally-parallel `top_m_candidates_batched` and `search_asymmetric*` are + unchanged. + +### Notes + +- The serial CSR candidate-gen is a correctness-first implementation; a future + release optimizes its internals behind the same signature. + ## 0.4.0 - 2026-06-04 ### Added diff --git a/Cargo.lock b/Cargo.lock index c8f16e7..918d07b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -601,7 +601,7 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "ordvec" -version = "0.4.0" +version = "0.5.0" dependencies = [ "rand", "rand_chacha", @@ -610,14 +610,14 @@ dependencies = [ [[package]] name = "ordvec-ffi" -version = "0.4.0" +version = "0.5.0" dependencies = [ "ordvec", ] [[package]] name = "ordvec-manifest" -version = "0.4.0" +version = "0.5.0" dependencies = [ "chrono", "clap", @@ -633,7 +633,7 @@ dependencies = [ [[package]] name = "ordvec-manifest-python" -version = "0.4.0" +version = "0.5.0" dependencies = [ "ordvec-manifest", "pyo3", @@ -643,7 +643,7 @@ dependencies = [ [[package]] name = "ordvec-python" -version = "0.4.0" +version = "0.5.0" dependencies = [ "numpy", "ordvec", diff --git a/Cargo.toml b/Cargo.toml index 11f4f60..3a936a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ordvec" -version = "0.4.0" +version = "0.5.0" edition = "2021" rust-version = "1.89" # AVX-512 intrinsics stabilized in 1.89.0; also clears the 1.87 floor from u64::is_multiple_of description = "Training-free ordinal & sign quantization for vector retrieval" diff --git a/README.md b/README.md index ca8ebbf..17433f5 100644 --- a/README.md +++ b/README.md @@ -109,7 +109,7 @@ Details in [`docs/RANK_MODES.md`](docs/RANK_MODES.md). ```toml [dependencies] -ordvec = "0.4" +ordvec = "0.5" # Or, to track unreleased `main`, use a git dependency instead: # ordvec = { git = "https://github.com/Fieldnote-Echo/ordvec" } diff --git a/THREAT_MODEL.md b/THREAT_MODEL.md index 6dcbaf3..aa8c086 100644 --- a/THREAT_MODEL.md +++ b/THREAT_MODEL.md @@ -1,6 +1,6 @@ # Threat Model — `ordvec` -> **Status:** v0.4.0 (pre-1.0), 2026-06-04. This is the maintained threat model +> **Status:** v0.5.0 (pre-1.0), 2026-06-13. This is the maintained threat model > for the `ordvec` Rust crate, C ABI, Go wrapper, PyO3/maturin Python bindings, > and the `ordvec-manifest` sidecar verifier. It is reviewed when the > attack surface changes (new persistence formats, new `unsafe` kernels, new diff --git a/fuzz/Cargo.lock b/fuzz/Cargo.lock index 5f9a897..46d6639 100644 --- a/fuzz/Cargo.lock +++ b/fuzz/Cargo.lock @@ -231,7 +231,7 @@ checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" [[package]] name = "ordvec" -version = "0.4.0" +version = "0.5.0" dependencies = [ "rayon", ] diff --git a/ordvec-ffi/Cargo.toml b/ordvec-ffi/Cargo.toml index 6d2af91..5dc9b78 100644 --- a/ordvec-ffi/Cargo.toml +++ b/ordvec-ffi/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ordvec-ffi" -version = "0.4.0" +version = "0.5.0" edition = "2021" rust-version = "1.89" publish = false diff --git a/ordvec-manifest-python/Cargo.toml b/ordvec-manifest-python/Cargo.toml index a65015f..e9f8ea4 100644 --- a/ordvec-manifest-python/Cargo.toml +++ b/ordvec-manifest-python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ordvec-manifest-python" -version = "0.4.0" +version = "0.5.0" edition = "2021" rust-version = "1.89" description = "Python bindings for ordvec-manifest index provenance verification" diff --git a/ordvec-manifest-python/pyproject.toml b/ordvec-manifest-python/pyproject.toml index 283ec39..fbea95d 100644 --- a/ordvec-manifest-python/pyproject.toml +++ b/ordvec-manifest-python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "maturin" [project] name = "ordvec-manifest" -version = "0.4.0" +version = "0.5.0" description = "Python bindings for ordvec index manifest verification" readme = "README.md" requires-python = ">=3.10" diff --git a/ordvec-manifest-python/python/ordvec_manifest/__init__.py b/ordvec-manifest-python/python/ordvec_manifest/__init__.py index dca8363..6730089 100644 --- a/ordvec-manifest-python/python/ordvec_manifest/__init__.py +++ b/ordvec-manifest-python/python/ordvec_manifest/__init__.py @@ -48,4 +48,4 @@ "create_manifest", ] -__version__ = "0.4.0" +__version__ = "0.5.0" diff --git a/ordvec-manifest/Cargo.toml b/ordvec-manifest/Cargo.toml index ccca316..26154d5 100644 --- a/ordvec-manifest/Cargo.toml +++ b/ordvec-manifest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ordvec-manifest" -version = "0.4.0" +version = "0.5.0" edition = "2021" rust-version = "1.89" license = "MIT OR Apache-2.0" @@ -28,7 +28,7 @@ required-features = ["cli"] chrono = { version = "0.4.44", default-features = false, features = ["clock", "std"] } clap = { version = "4.6.1", features = ["derive"], optional = true } hex = "0.4.3" -ordvec = { version = "0.4.0", path = ".." } +ordvec = { version = "0.5.0", path = ".." } rusqlite = { version = "0.40.0", optional = true } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/ordvec-python/Cargo.toml b/ordvec-python/Cargo.toml index 4adbe56..052e2d4 100644 --- a/ordvec-python/Cargo.toml +++ b/ordvec-python/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ordvec-python" -version = "0.4.0" +version = "0.5.0" edition = "2021" rust-version = "1.89" # inherits ordvec's AVX-512 MSRV floor description = "Python bindings for ordvec — training-free ordinal & sign vector quantization" diff --git a/ordvec-python/pyproject.toml b/ordvec-python/pyproject.toml index d9c1e99..b7e2eb9 100644 --- a/ordvec-python/pyproject.toml +++ b/ordvec-python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "maturin" [project] name = "ordvec" -version = "0.4.0" +version = "0.5.0" description = "Training-free ordinal & sign quantization for compressed vector retrieval" readme = "README.md" requires-python = ">=3.10" diff --git a/ordvec-python/python/ordvec/__init__.py b/ordvec-python/python/ordvec/__init__.py index f97388e..7bd55b5 100644 --- a/ordvec-python/python/ordvec/__init__.py +++ b/ordvec-python/python/ordvec/__init__.py @@ -105,4 +105,4 @@ "SignBitmapIndex", ] -__version__ = "0.4.0" +__version__ = "0.5.0" diff --git a/src/lib.rs b/src/lib.rs index 8515824..5e1e011 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,9 +63,11 @@ pub mod sign_bitmap; mod util; pub use bitmap::Bitmap; +pub use quant::SubsetScratch; pub use quant::{rankquant_eval_search, RankQuant, TwoStageCandidatePolicy}; pub use rank::Rank; pub use rank_io::{probe_index_metadata, IndexKind, IndexMetadata, IndexParams}; +pub use sign_bitmap::CandidateBatch; pub use sign_bitmap::SignBitmap; // `search_asymmetric_byte_lut` is a bench-only scoring reference: it diff --git a/src/quant.rs b/src/quant.rs index 2c66403..ed16fff 100644 --- a/src/quant.rs +++ b/src/quant.rs @@ -24,9 +24,60 @@ use crate::rank::{ rankquant_bytes_per_vec, rankquant_norm, }; use crate::sign_bitmap::SignBitmap; -use crate::util::{assert_all_finite, l2_normalise, result_buffer_len, TopK}; +use crate::util::{assert_all_finite, l2_normalise, l2_normalise_into, result_buffer_len, TopK}; use crate::{validate_candidate_ids, OrdvecError, SearchResults}; +/// Reusable scratch for the serial subset-rerank primitives. Grows to the +/// maximum shape seen, then reuses capacity — so a caller's bounded-pool worker +/// runs allocation-free after warmup. Opaque: fields are an implementation +/// detail. +pub struct SubsetScratch { + q_unit: Vec, + sub_packed: Vec, + top: TopK, + local_indices: Vec, + final_order: Vec<(f32, i64, i64, usize)>, +} + +impl Default for SubsetScratch { + fn default() -> Self { + Self { + q_unit: Vec::new(), + sub_packed: Vec::new(), + top: TopK::new(0), + local_indices: Vec::new(), + final_order: Vec::new(), + } + } +} + +impl SubsetScratch { + /// Empty scratch; buffers grow on first use. + pub fn new() -> Self { + Self::default() + } + /// Release all buffers (reclaim memory). NOT needed between `*_into` calls — + /// the scratch auto-resets and reuses capacity each call. Escape hatch for a + /// long-lived worker to free memory between bursts. + pub fn clear(&mut self) { + *self = Self::default(); + } + + /// Test-only capacity probe (scratch reuse / allocation-free assertions). + /// `#[doc(hidden)]` rather than `#[cfg(test)]` so the integration tests in + /// `tests/` (which compile the crate without `cfg(test)`) can reach it; it + /// is hidden from the public docs surface. + #[doc(hidden)] + pub fn capacities_for_test(&self) -> (usize, usize, usize, usize) { + ( + self.q_unit.capacity(), + self.sub_packed.capacity(), + self.local_indices.capacity(), + self.final_order.capacity(), + ) + } +} + fn check_eval_bits(bits: u8) { assert!((1..=7).contains(&bits), "bits must be in 1..=7"); } @@ -639,107 +690,335 @@ impl RankQuant { "search_asymmetric_subset: candidate id out of range (n_vectors {})", self.n_vectors, ); + let m = candidates.len(); + let out_k = k.min(m); + if out_k == 0 { + return (Vec::new(), Vec::new()); + } + let mut scratch = SubsetScratch::new(); + l2_normalise_into(&mut scratch.q_unit, query); + let mut scores = vec![f32::NEG_INFINITY; out_k]; + let mut indices = vec![-1i64; out_k]; + self.subset_rerank_row_into(candidates, &mut scores, &mut indices, &mut scratch); + (scores, indices) + } + + /// Validate a CSR candidate batch. Panics on any contract violation + /// (mirrors `search_asymmetric_subset`'s assert contract). The caller is the + /// serial batched rerank entry point added alongside this helper. + fn validate_csr_batch(&self, nq: usize, candidate_offsets: &[usize], candidates: &[u32]) { + assert_eq!( + candidate_offsets.len(), + nq + 1, + "candidate_offsets length {} must be nq+1 ({})", + candidate_offsets.len(), + nq + 1 + ); + assert_eq!(candidate_offsets[0], 0, "candidate_offsets[0] must be 0"); + assert_eq!( + *candidate_offsets.last().unwrap(), + candidates.len(), + "candidate_offsets[nq] must equal candidates.len()" + ); + for w in candidate_offsets.windows(2) { + assert!( + w[1] >= w[0], + "candidate_offsets must be monotonic non-decreasing" + ); + let row_len = w[1] - w[0]; + assert!( + row_len <= self.n_vectors, + "per-row candidate count {row_len} exceeds n_vectors {}", + self.n_vectors + ); + } + assert!( + candidates.iter().all(|&di| (di as usize) < self.n_vectors), + "candidate id out of range (n_vectors {})", + self.n_vectors + ); + } + + /// Rerank one candidate row into `out_scores`/`out_indices` (each length + /// `out_k`) using caller scratch. `scratch.q_unit` MUST already hold the + /// L2-normalised query. No heap allocation after warmup. NO rayon. Mirrors + /// the gather + SIMD dispatch + finalize of `search_asymmetric_subset`, + /// reading from `scratch.sub_packed` and emitting global ids. + fn subset_rerank_row_into( + &self, + candidates_row: &[u32], + out_scores: &mut [f32], + out_indices: &mut [i64], + scratch: &mut SubsetScratch, + ) { let dim = self.dim; let bits = self.bits; let bpv = self.bytes_per_vec(); let n_buckets = 1usize << bits; - let m = candidates.len(); - let k_eff = k.min(m); - if k_eff == 0 { - return (Vec::new(), Vec::new()); + let m = candidates_row.len(); + let out_k = out_scores.len(); + debug_assert_eq!(out_indices.len(), out_k); + if out_k == 0 || m == 0 { + // Defensive guard: both callers (`search_asymmetric_subset` and the + // batched `*_serial_into` loop) handle empty candidate rows / `out_k == 0` + // before reaching here, so this is normally unreachable. If it ever is + // hit, the caller owns sentinel-padding the row; returning before + // touching `scratch.top` is safe (the next non-empty row resets it). + return; } - let norm = rankquant_norm(dim, bits); let inv_norm = 1.0_f32 / norm; #[cfg(target_arch = "x86_64")] - let centre = ((1u32 << bits) as f32 - 1.0) / 2.0; - - // L2-normalise the query. - let q_unit = l2_normalise(query); - #[cfg(target_arch = "x86_64")] let centre_offset = { - let q_sum: f32 = q_unit.iter().sum(); + let centre = ((1u32 << bits) as f32 - 1.0) / 2.0; + let q_sum: f32 = scratch.q_unit.iter().sum(); -centre * q_sum * inv_norm }; - // Pack the candidate docs' bytes into a contiguous buffer so - // the SIMD kernels can scan them as if they were a small dense - // sub-index. Cost: m * bpv copy (small for typical m). - let sub_packed_len = m + // Gather candidate docs into the reused scratch buffer. + let sub_len = m .checked_mul(bpv) - .expect("search_asymmetric_subset: candidate scratch length overflows usize"); - let mut sub_packed = vec![0u8; sub_packed_len]; - for (i, &di) in candidates.iter().enumerate() { + .expect("subset rerank: candidate scratch length overflows usize"); + // `clear` + `reserve` + `extend_from_slice` (not `resize(.., 0)` + indexed + // copy): avoids zero-initialising `sub_len` bytes we immediately overwrite. + // Still allocation-free after warmup — `reserve` is a no-op once capacity + // covers `sub_len`, and `extend_from_slice` fills without reallocating. + scratch.sub_packed.clear(); + scratch.sub_packed.reserve(sub_len); + for &di in candidates_row { let src = (di as usize) * bpv; - sub_packed[i * bpv..(i + 1) * bpv].copy_from_slice(&self.packed[src..src + bpv]); + scratch + .sub_packed + .extend_from_slice(&self.packed[src..src + bpv]); } - // Dispatch: prefer AVX-512 → AVX2 → scalar LUT. Tier selection - // is gated on the kernel lane invariant for (dim, bits) via - // `select_simd_tier` — the same guard `search_asymmetric` uses — - // so a constructor-valid-but-SIMD-invalid dim (48 / 80 / 20) - // never reaches a kernel that would drop its tail chunk. #[cfg_attr(not(target_arch = "x86_64"), allow(unused_variables))] let simd_tier = select_simd_tier(dim, bits); - let mut top = TopK::new_with_tie_keys(k_eff, candidates); + scratch.top.reset_with_tie_keys(out_k, candidates_row); #[cfg(target_arch = "x86_64")] unsafe { match (simd_tier, bits) { (SimdTier::Avx512, 2) => { - top.set_score_offset(centre_offset); - scan_b2_asym_avx512(&sub_packed, m, dim, &q_unit, inv_norm, &mut top); + scratch.top.set_score_offset(centre_offset); + scan_b2_asym_avx512( + &scratch.sub_packed, + m, + dim, + &scratch.q_unit, + inv_norm, + &mut scratch.top, + ); } (SimdTier::Avx512, 4) => { - top.set_score_offset(centre_offset); - scan_b4_asym_avx512(&sub_packed, m, dim, &q_unit, inv_norm, &mut top); + scratch.top.set_score_offset(centre_offset); + scan_b4_asym_avx512( + &scratch.sub_packed, + m, + dim, + &scratch.q_unit, + inv_norm, + &mut scratch.top, + ); } (SimdTier::Avx2, 2) => { - top.set_score_offset(centre_offset); - scan_b2_asym_avx2(&sub_packed, m, dim, &q_unit, inv_norm, &mut top); + scratch.top.set_score_offset(centre_offset); + scan_b2_asym_avx2( + &scratch.sub_packed, + m, + dim, + &scratch.q_unit, + inv_norm, + &mut scratch.top, + ); } (SimdTier::Avx2, 4) => { - top.set_score_offset(centre_offset); - scan_b4_asym_avx2(&sub_packed, m, dim, &q_unit, inv_norm, &mut top); + scratch.top.set_score_offset(centre_offset); + scan_b4_asym_avx2( + &scratch.sub_packed, + m, + dim, + &scratch.q_unit, + inv_norm, + &mut scratch.top, + ); } _ => scan_via_lut_scalar( - &sub_packed, + &scratch.sub_packed, m, dim, bits, n_buckets, - &q_unit, + &scratch.q_unit, inv_norm, - &mut top, + &mut scratch.top, ), } } #[cfg(not(target_arch = "x86_64"))] scan_via_lut_scalar( - &sub_packed, + &scratch.sub_packed, m, dim, bits, n_buckets, - &q_unit, + &scratch.q_unit, inv_norm, - &mut top, + &mut scratch.top, + ); + + // Finalize local positions into reused buffer, then map local → global. + scratch.local_indices.clear(); + scratch.local_indices.resize(out_k, -1); + scratch.top.finalize_into_with_scratch( + &mut scratch.final_order, + out_scores, + &mut scratch.local_indices, + ); + for (out, &loc) in out_indices.iter_mut().zip(scratch.local_indices.iter()) { + *out = if loc < 0 { + -1 + } else { + candidates_row[loc as usize] as i64 + }; + } + } + + /// Serial (NO rayon) batched subset rerank into caller-owned buffers. + /// Allocation-free after `scratch` warmup. The integration contract for + /// runtimes that own their own parallelism (call this from a bounded pool, + /// with the GIL released, one row range per worker is the caller's choice). + /// + /// `queries` is `nq * dim`. Candidates are CSR: `candidate_offsets.len() + /// == nq + 1`, row `qi` is `candidates[candidate_offsets[qi]..candidate_offsets[qi+1]]`. + /// Output is rectangular: `out_k = k.min(self.len())`, and both output + /// buffers MUST have length `nq * out_k`. Underfull rows are sentinel-padded + /// (`NEG_INFINITY` / `-1`). Duplicate candidate ids are scored independently + /// — but each CSR row's length (duplicates included) must be `<= self.len()`; + /// deduplicate a duplicate-heavy row before calling if it would exceed that. + /// Candidate ids within a row need not be sorted: input order has no effect + /// on results — the tie policy `(score desc, global row-id asc)` fully + /// determines output order (matching [`Self::search_asymmetric_subset`]). + /// + /// Panics on any contract violation: malformed CSR (`candidate_offsets` not + /// `nq + 1` long, not starting at `0`, non-monotonic, or not ending at + /// `candidates.len()`), a row longer than `self.len()`, a candidate id + /// `>= self.len()`, a non-finite query value, or a wrong output-buffer length. + #[allow(clippy::too_many_arguments)] // arity is intrinsic to the caller-owned buffered contract (CSR inputs + scratch + two output buffers) + pub fn search_asymmetric_subset_batched_serial_into( + &self, + queries: &[f32], + candidate_offsets: &[usize], + candidates: &[u32], + k: usize, + scratch: &mut SubsetScratch, + out_scores: &mut [f32], + out_indices: &mut [i64], + ) { + let dim = self.dim; + assert!( + queries.len().is_multiple_of(dim), + "queries length {} must be a multiple of dim {dim}", + queries.len() + ); + let nq = queries.len() / dim; + assert_all_finite(queries); + self.validate_csr_batch(nq, candidate_offsets, candidates); + let out_k = k.min(self.n_vectors); + let buf_len = result_buffer_len(nq, out_k); + assert_eq!( + out_scores.len(), + buf_len, + "out_scores length must be nq*out_k ({buf_len})" + ); + assert_eq!( + out_indices.len(), + buf_len, + "out_indices length must be nq*out_k ({buf_len})" ); - let mut scores = vec![f32::NEG_INFINITY; k_eff]; - let mut local_indices = vec![-1i64; k_eff]; - top.finalize_into(&mut scores, &mut local_indices); - // Map local → global doc IDs. - let global_indices: Vec = local_indices - .iter() - .map(|&loc| { - if loc < 0 { - -1 - } else { - candidates[loc as usize] as i64 + if out_k == 0 || nq == 0 { + return; + } + + // No whole-buffer preclear: each row is written exactly once below. A + // non-empty row is fully (re)written by `subset_rerank_row_into` (its + // `finalize_into_with_scratch` preclears the row slice, then fills it); an + // empty row is sentinel-padded explicitly here. Validation above still + // precedes every write, so a validation panic leaves caller buffers intact. + for qi in 0..nq { + let q = &queries[qi * dim..(qi + 1) * dim]; + let row = &candidates[candidate_offsets[qi]..candidate_offsets[qi + 1]]; + let os = &mut out_scores[qi * out_k..(qi + 1) * out_k]; + let oi = &mut out_indices[qi * out_k..(qi + 1) * out_k]; + if row.is_empty() { + for s in os.iter_mut() { + *s = f32::NEG_INFINITY; + } + for i in oi.iter_mut() { + *i = -1; } - }) - .collect(); - (scores, global_indices) + } else { + l2_normalise_into(&mut scratch.q_unit, q); + self.subset_rerank_row_into(row, os, oi, scratch); + } + } + } + + /// Allocating ergonomic wrapper over + /// [`Self::search_asymmetric_subset_batched_serial_into`]. Allocates the + /// output `SearchResults` and a transient `SubsetScratch`. NO rayon. + /// `result.k == k.min(self.len())`. Shares the CSR/validation contract (and + /// panic conditions, incl. the per-row `len <= self.len()` cap) of + /// [`Self::search_asymmetric_subset_batched_serial_into`]. + /// + /// # Example + /// ```no_run + /// use ordvec::{RankQuant, SignBitmap}; + /// # let (dim, queries, k) = (768usize, vec![0.0f32; 768 * 3], 10usize); + /// # let sign = SignBitmap::new(dim); + /// # let rq = RankQuant::new(dim, 2); + /// let cb = sign.top_m_candidates_batched_serial_csr(&queries, 256); + /// let results = rq.search_asymmetric_subset_batched_serial( + /// &queries, &cb.offsets, &cb.candidates, k, + /// ); + /// for qi in 0..results.nq { + /// let _ids = results.indices_for_query(qi); + /// } + /// ``` + pub fn search_asymmetric_subset_batched_serial( + &self, + queries: &[f32], + candidate_offsets: &[usize], + candidates: &[u32], + k: usize, + ) -> SearchResults { + let dim = self.dim; + assert!( + queries.len().is_multiple_of(dim), + "queries length {} must be a multiple of dim {dim}", + queries.len() + ); + let nq = queries.len() / dim; + let out_k = k.min(self.n_vectors); + let buf_len = result_buffer_len(nq, out_k); + let mut scores = vec![f32::NEG_INFINITY; buf_len]; + let mut indices = vec![-1i64; buf_len]; + let mut scratch = SubsetScratch::new(); + self.search_asymmetric_subset_batched_serial_into( + queries, + candidate_offsets, + candidates, + k, + &mut scratch, + &mut scores, + &mut indices, + ); + SearchResults { + scores, + indices, + nq, + k: out_k, + } } pub fn try_search_with_sign_probe( diff --git a/src/sign_bitmap.rs b/src/sign_bitmap.rs index 2e3473c..2cca6cc 100644 --- a/src/sign_bitmap.rs +++ b/src/sign_bitmap.rs @@ -31,6 +31,48 @@ use rayon::prelude::*; use crate::OrdvecError; +/// Candidate sets for a query batch in CSR (compressed-sparse-row) form, as +/// produced by [`SignBitmap::top_m_candidates_batched_serial_csr`]. +/// +/// Invariants (guaranteed and tested): +/// - `offsets.len() == query_count() + 1` +/// - `offsets[0] == 0` +/// - `offsets` is monotonic non-decreasing +/// - `*offsets.last().unwrap() == candidates.len()` +/// - row `i` is `candidates[offsets[i]..offsets[i + 1]]` +/// +/// Fields are `pub` for zero-copy hand-off (same precedent as +/// [`crate::SearchResults`]); the invariants above are part of the stable API. +#[derive(Clone, Debug)] +#[must_use = "candidate generation scans the corpus; dropping the result discards that work"] +pub struct CandidateBatch { + pub candidates: Vec, + pub offsets: Vec, +} + +impl CandidateBatch { + /// Number of queries in the batch (`offsets.len() - 1`). + pub fn query_count(&self) -> usize { + self.offsets.len().saturating_sub(1) + } + /// Candidate row for query `qi`, or `None` if `qi >= query_count()`. + pub fn candidates_for_query(&self, qi: usize) -> Option<&[u32]> { + let start = *self.offsets.get(qi)?; + let end = *self.offsets.get(qi + 1)?; + Some(&self.candidates[start..end]) + } + /// `true` iff there are **no queries** (`query_count() == 0`) — NOT iff + /// there are no candidates. A 3-query batch with zero candidates per query + /// is not empty. + pub fn is_empty(&self) -> bool { + self.query_count() == 0 + } + /// `true` iff there are no candidates across all queries. + pub fn has_no_candidates(&self) -> bool { + self.candidates.is_empty() + } +} + /// Index storing a 1-bit sign-cosine fingerprint per document. /// /// Storage: `dim / 8` bytes per doc. Dim must be a multiple of 64 @@ -240,6 +282,46 @@ impl SignBitmap { .collect() } + /// Serial (NO rayon) CSR candidate generation for a query batch. Returns a + /// [`CandidateBatch`]; row `qi` is the top-`m` candidate doc ids for query + /// `qi`, ordered `(hamming ascending, doc_id ascending)`, of length + /// `m.min(self.len())`. + /// + /// This is the caller-owned integration primitive: it never enters rayon, + /// so a caller (e.g. a database) parallelises across queries with its own + /// pool. (The existing [`Self::top_m_candidates_batched`] remains the + /// internally-parallel standalone convenience.) + /// + /// Track-1 implementation is intentionally naive — it loops the single-query + /// [`Self::top_m_candidates`] (which materialises a per-query `n` Hamming + /// row). A future release may replace the internals with streaming top-m + /// behind this frozen signature; the CSR output contract will not change. + #[must_use = "this scans the corpus per query to generate candidates; dropping the result discards that work"] + pub fn top_m_candidates_batched_serial_csr(&self, queries: &[f32], m: usize) -> CandidateBatch { + let dim = self.dim; + assert!( + queries.len().is_multiple_of(dim), + "queries length {} must be a multiple of dim {dim}", + queries.len() + ); + crate::util::assert_all_finite(queries); + let nq = queries.len() / dim; + let m_eff = m.min(self.n_vectors); + let mut offsets = Vec::with_capacity(nq + 1); + offsets.push(0usize); + let mut candidates = Vec::with_capacity(nq.saturating_mul(m_eff)); + for qi in 0..nq { + let q = &queries[qi * dim..(qi + 1) * dim]; + let row = self.top_m_candidates(q, m); + candidates.extend_from_slice(&row); + offsets.push(candidates.len()); + } + CandidateBatch { + candidates, + offsets, + } + } + /// Score every indexed document against one query and return dense /// sign-agreement counts aligned by document id. /// @@ -605,6 +687,39 @@ mod tests { .sum() } + #[test] + fn candidate_batch_helpers() { + use super::CandidateBatch; + let cb = CandidateBatch { + candidates: vec![5, 6, 7, 2], + offsets: vec![0, 2, 2, 4], // q0=[5,6], q1=[], q2=[7,2] + }; + assert_eq!(cb.query_count(), 3); + assert!(!cb.is_empty()); + assert!(!cb.has_no_candidates()); + assert_eq!(cb.candidates_for_query(0), Some(&[5u32, 6][..])); + assert_eq!(cb.candidates_for_query(1), Some(&[][..])); + assert_eq!(cb.candidates_for_query(2), Some(&[7u32, 2][..])); + assert_eq!(cb.candidates_for_query(3), None); + + let empty = CandidateBatch { + candidates: vec![], + offsets: vec![0], + }; + assert_eq!(empty.query_count(), 0); + assert!(empty.is_empty()); + assert!(empty.has_no_candidates()); + + // 2 queries, zero candidates each → NOT empty, but has_no_candidates. + let no_cands = CandidateBatch { + candidates: vec![], + offsets: vec![0, 0, 0], + }; + assert_eq!(no_cands.query_count(), 2); + assert!(!no_cands.is_empty()); + assert!(no_cands.has_no_candidates()); + } + #[test] #[should_panic(expected = "dim must be > 0")] fn new_rejects_dim_zero() { diff --git a/src/util.rs b/src/util.rs index f6f7a5b..8e229f5 100644 --- a/src/util.rs +++ b/src/util.rs @@ -103,6 +103,20 @@ pub(crate) fn l2_normalise(v: &[f32]) -> Vec { } } +/// Allocation-free counterpart of [`l2_normalise`]: writes the L2-normalised +/// vector into `out`, reusing its capacity. Same semantics as `l2_normalise` +/// (a near-zero-norm input yields all zeros of the same length). +pub(crate) fn l2_normalise_into(out: &mut Vec, v: &[f32]) { + out.clear(); + let norm: f32 = v.iter().map(|x| x * x).sum::().sqrt(); + if norm <= 1e-12 { + out.resize(v.len(), 0.0); + } else { + let inv = 1.0 / norm; + out.extend(v.iter().map(|&x| x * inv)); + } +} + /// Assert that every element of `v` is finite (no `NaN`, no `±Inf`). /// /// ordvec's public `add` / `search` entry points reject non-finite @@ -404,9 +418,11 @@ impl TopK { /// Construct a top-k collector whose emitted indices are local scan /// positions but whose score ties are broken by caller-supplied keys. /// - /// This is used by subset scans: SIMD kernels still emit local candidate - /// positions into the gathered scratch buffer, while ties must follow the - /// public global row-id policy. + /// Subset scans now reuse a long-lived `TopK` via + /// [`Self::reset_with_tie_keys`]; this fresh-allocation constructor is + /// retained as the reference path the reuse tests compare against (hence + /// `#[allow(dead_code)]` for non-test builds). + #[allow(dead_code)] pub(crate) fn new_with_tie_keys(k: usize, tie_key_by_index: &[u32]) -> Self { let mut top = Self::new(k); top.tie_key_by_index = Some(tie_key_by_index.iter().map(|&id| i64::from(id)).collect()); @@ -496,11 +512,44 @@ impl TopK { self.worst_pos = wp; } + /// Reset to an empty top-k collector of capacity `k` whose score ties are + /// broken by caller-supplied global keys (subset scans), reusing buffers — + /// including the inner tie-key Vec's capacity (allocation-free after warmup). + pub(crate) fn reset_with_tie_keys(&mut self, k: usize, tie_key_by_index: &[u32]) { + self.k = k; + self.scores.clear(); + self.scores.resize(k, f32::NEG_INFINITY); + self.indices.clear(); + self.indices.resize(k, -1); + self.tie_keys.clear(); + self.tie_keys.resize(k, i64::MAX); + let buf = self.tie_key_by_index.get_or_insert_with(Vec::new); + buf.clear(); + buf.extend(tie_key_by_index.iter().map(|&id| i64::from(id))); + self.score_offset = 0.0; + self.filled = 0; + self.worst_pos = 0; + self.worst_val = f32::INFINITY; + self.worst_tie_key = i64::MAX; + } + /// Drain into `out_scores` / `out_indices` sorted by the composite /// key `(score desc, tie_key asc)`. `out_scores.len()` is the /// user-requested `k`; positions beyond `self.filled` are left as /// sentinels. pub(crate) fn finalize_into(&self, out_scores: &mut [f32], out_indices: &mut [i64]) { + let mut order_buf = Vec::new(); + self.finalize_into_with_scratch(&mut order_buf, out_scores, out_indices); + } + + /// Allocation-free [`Self::finalize_into`]: reuses the caller-owned + /// `order_buf` for the final sort instead of allocating a fresh `Vec`. + pub(crate) fn finalize_into_with_scratch( + &self, + order_buf: &mut Vec<(f32, i64, i64, usize)>, + out_scores: &mut [f32], + out_indices: &mut [i64], + ) { debug_assert_eq!(out_scores.len(), out_indices.len()); for s in out_scores.iter_mut() { *s = f32::NEG_INFINITY; @@ -508,21 +557,22 @@ impl TopK { for i in out_indices.iter_mut() { *i = -1; } - let mut pairs: Vec<(f32, i64, i64, usize)> = self - .scores - .iter() - .zip(self.indices.iter()) - .zip(self.tie_keys.iter()) - .enumerate() - .take(self.filled) - .map(|(slot, ((&s, &i), &tie_key))| (s, i, tie_key, slot)) - .collect(); + order_buf.clear(); + order_buf.extend( + self.scores + .iter() + .zip(self.indices.iter()) + .zip(self.tie_keys.iter()) + .enumerate() + .take(self.filled) + .map(|(slot, ((&s, &i), &tie_key))| (s, i, tie_key, slot)), + ); // Composite key: score descending, then tie key ascending. The kept // slot is only a final deterministic tie-break when duplicate // candidate entries are otherwise indistinguishable. For full-index // scans the tie key is the doc_id; for subset scans it is the global // row id associated with the emitted local index. - pairs.sort_unstable_by(|a, b| { + order_buf.sort_unstable_by(|a, b| { // `total_cmp` is a true total order (IEEE-754 `totalOrder`), so the // sort stays well-defined even if a non-finite score ever slipped // past the finite-input guards — `partial_cmp(..).unwrap_or(Equal)` @@ -533,7 +583,7 @@ impl TopK { .then_with(|| a.2.cmp(&b.2)) .then_with(|| a.3.cmp(&b.3)) }); - for (slot, (s, i, _, _)) in pairs.into_iter().enumerate() { + for (slot, &(s, i, _, _)) in order_buf.iter().enumerate() { if slot >= out_scores.len() { break; } @@ -541,6 +591,11 @@ impl TopK { out_indices[slot] = i; } } + + #[cfg(test)] + pub(crate) fn scores_capacity_for_test(&self) -> usize { + self.scores.capacity() + } } #[cfg(test)] @@ -663,4 +718,71 @@ mod tests { // usize — the 32-bit (wasm32) hazard the `resize` in `add` would hit. let _ = checked_new_len(0, 2, usize::MAX); } + + #[test] + fn topk_reset_and_finalize_with_scratch_match_fresh() { + use super::TopK; + // Build via fresh new_with_tie_keys + finalize_into (reference). + let tie = [10u32, 20, 30, 40]; + let mut a = TopK::new_with_tie_keys(2, &tie); + a.maybe_insert(1.0, 0); + a.maybe_insert(3.0, 1); + a.maybe_insert(2.0, 2); + let mut s_ref = vec![f32::NEG_INFINITY; 2]; + let mut i_ref = vec![-1i64; 2]; + a.finalize_into(&mut s_ref, &mut i_ref); + + // Build via reset_with_tie_keys + finalize_into_with_scratch (reuse path). + let mut b = TopK::new(0); + b.reset_with_tie_keys(2, &tie); + b.maybe_insert(1.0, 0); + b.maybe_insert(3.0, 1); + b.maybe_insert(2.0, 2); + let mut order_buf = Vec::new(); + let mut s = vec![f32::NEG_INFINITY; 2]; + let mut i = vec![-1i64; 2]; + b.finalize_into_with_scratch(&mut order_buf, &mut s, &mut i); + assert_eq!(s, s_ref); + assert_eq!(i, i_ref); + + // Reuse: a second reset+finalize on the same TopK + order_buf grows nothing. + let cap_top = b.scores_capacity_for_test(); + let cap_buf = order_buf.capacity(); + b.reset_with_tie_keys(2, &tie); + b.maybe_insert(5.0, 3); + b.maybe_insert(1.0, 0); + b.finalize_into_with_scratch(&mut order_buf, &mut s, &mut i); + assert_eq!( + b.scores_capacity_for_test(), + cap_top, + "TopK reset must reuse capacity" + ); + assert_eq!( + order_buf.capacity(), + cap_buf, + "finalize order_buf must reuse capacity" + ); + assert_eq!(i, vec![3, 0]); // score 5.0 (id 40) then 1.0 (id 10) + } + + #[test] + fn l2_normalise_into_matches_l2_normalise_and_reuses_capacity() { + use super::{l2_normalise, l2_normalise_into}; + let v = vec![3.0f32, 0.0, 4.0, 0.0]; // norm 5 + let expected = l2_normalise(&v); + let mut out: Vec = Vec::new(); + l2_normalise_into(&mut out, &v); + assert_eq!(out, expected); + // zero vector → zeros, same length + let z = vec![0.0f32; 4]; + l2_normalise_into(&mut out, &z); + assert_eq!(out, vec![0.0f32; 4]); + // reuse: second identical call does not grow capacity + let cap = { + l2_normalise_into(&mut out, &v); + out.capacity() + }; + l2_normalise_into(&mut out, &v); + assert_eq!(out.capacity(), cap, "l2_normalise_into must reuse capacity"); + } } diff --git a/tests/determinism_contract.rs b/tests/determinism_contract.rs index cddffd1..56c8f48 100644 --- a/tests/determinism_contract.rs +++ b/tests/determinism_contract.rs @@ -82,6 +82,45 @@ fn rankquant_subset_ties_use_global_row_ids() { assert_ids(&duplicate_ids, &[7, 7]); } +#[test] +fn batched_subset_rerank_ties_use_global_row_ids_and_keep_duplicates() { + const DIM: usize = 64; + let docs = repeated_docs(12, DIM, 1.0); + let query = vec![0.0; DIM]; + let mut index = RankQuant::new(DIM, 2); + index.add(&docs); + + // Single tied-score row routed through the batched `_into` path. All scores + // are equal (zero query over constant-composition docs), so the order is + // purely (score desc, global row-id asc). Duplicate candidate ids must NOT + // be collapsed — each is scored independently. + let candidate_offsets = [0usize, 4]; + let candidates = [7u32, 7, 3, 7]; + let k = 4usize; + let out_k = k.min(12); + let mut scratch = ordvec::SubsetScratch::new(); + let mut scores = vec![0.0f32; out_k]; + let mut indices = vec![0i64; out_k]; + index.search_asymmetric_subset_batched_serial_into( + &query, + &candidate_offsets, + &candidates, + k, + &mut scratch, + &mut scores, + &mut indices, + ); + // (score desc, global id asc): id 3 sorts first, then the three duplicate 7s. + assert_eq!(scores, vec![0.0, 0.0, 0.0, 0.0]); + assert_ids(&indices, &[3, 7, 7, 7]); + + // The batched `_into` row must agree byte-for-byte with the single-query + // `search_asymmetric_subset` reference on the same tied/duplicate row. + let (ref_scores, ref_ids) = index.search_asymmetric_subset(&query, &candidates, k); + assert_eq!(scores, ref_scores); + assert_ids(&indices, &ref_ids); +} + #[test] fn candidate_prefilters_preserve_order_across_single_and_batched_paths() { const DIM: usize = 64; diff --git a/tests/index/two_stage.rs b/tests/index/two_stage.rs index 5b0ad19..b638da6 100644 --- a/tests/index/two_stage.rs +++ b/tests/index/two_stage.rs @@ -2,6 +2,8 @@ use ordvec::{ validate_candidate_ids, validate_flat_vectors_len, Bitmap, OrdvecError, RankQuant, SignBitmap, TwoStageCandidatePolicy, }; +use rand::{RngExt, SeedableRng}; +use rand_chacha::ChaCha8Rng; use crate::{make_corpus, D, N}; @@ -281,3 +283,466 @@ fn sign_rankquant_subset_orders_visible_ties_after_centre_offset() { assert!(scores.iter().all(|score| score.is_finite())); assert_score_then_id_order(&scores, &ids); } + +#[test] +fn serial_csr_matches_looped_single_query_and_invariants() { + let corpus = make_corpus(20_001); + let mut sign = SignBitmap::new(D); + sign.add(&corpus); + let nq = 5usize; + let queries = make_corpus(99)[..nq * D].to_vec(); + let m = 12usize; + + let cb = sign.top_m_candidates_batched_serial_csr(&queries, m); + + // CSR invariants. + assert_eq!(cb.offsets.len(), nq + 1); + assert_eq!(cb.offsets[0], 0); + assert_eq!(*cb.offsets.last().unwrap(), cb.candidates.len()); + assert!(cb.offsets.windows(2).all(|w| w[1] >= w[0])); + assert_eq!(cb.query_count(), nq); + + // Row-for-row parity with looped single-query top_m_candidates. + for qi in 0..nq { + let q = &queries[qi * D..(qi + 1) * D]; + let expected = sign.top_m_candidates(q, m); + assert_eq!( + cb.candidates_for_query(qi).unwrap(), + &expected[..], + "row {qi}" + ); + assert_eq!(expected.len(), m.min(N)); + } +} + +#[test] +fn serial_csr_edges() { + let corpus = make_corpus(20_002); + let mut sign = SignBitmap::new(D); + sign.add(&corpus); + // nq == 0 + let cb0 = sign.top_m_candidates_batched_serial_csr(&[], 8); + assert_eq!(cb0.query_count(), 0); + assert!(cb0.is_empty()); + assert_eq!(cb0.offsets, vec![0]); + // m == 0 → every row empty, but 2 queries → not empty + let q2 = make_corpus(7)[..2 * D].to_vec(); + let cb = sign.top_m_candidates_batched_serial_csr(&q2, 0); + assert_eq!(cb.query_count(), 2); + assert!(cb.has_no_candidates()); + assert_eq!(cb.offsets, vec![0, 0, 0]); + // m > n clamps to n + let cb_big = sign.top_m_candidates_batched_serial_csr(&q2, N + 100); + assert_eq!(cb_big.candidates_for_query(0).unwrap().len(), N); +} + +#[test] +#[should_panic] +fn serial_csr_rejects_ragged_queries() { + let mut sign = SignBitmap::new(D); + sign.add(&make_corpus(20_003)); + let ragged = vec![0.0f32; D + 1]; // not a multiple of D + let _ = sign.top_m_candidates_batched_serial_csr(&ragged, 4); +} + +#[test] +fn single_query_subset_unchanged_after_refactor() { + let (sign, rq, _corpus) = build_two_stage(2); + let q = make_corpus(31)[..D].to_vec(); + let cands = sign.top_m_candidates(&q, 20); + let (scores, ids) = rq.search_asymmetric_subset(&q, &cands, 7); + assert_eq!(scores.len(), 7); + assert_eq!(ids.len(), 7); + assert_score_then_id_order(&scores, &ids); + // Fewer candidates than k → returns exactly k.min(m) entries (no padding). + let (s2, i2) = rq.search_asymmetric_subset(&q, &cands[..3], 7); + assert_eq!(s2.len(), 3); + assert_eq!(i2.len(), 3); +} + +fn flatten_to_csr(rows: &[Vec]) -> (Vec, Vec) { + let mut candidates = Vec::new(); + let mut offsets = vec![0usize]; + for r in rows { + candidates.extend_from_slice(r); + offsets.push(candidates.len()); + } + (candidates, offsets) +} + +#[test] +fn batched_into_matches_looped_single_query() { + for bits in [1u8, 2, 4] { + let (sign, rq, _corpus) = build_two_stage(bits); + let nq = 6usize; + let queries = make_corpus(40_000 + bits as u64)[..nq * D].to_vec(); + let k = 5usize; + // Per-query candidate rows of varying length. + let rows: Vec> = (0..nq) + .map(|qi| sign.top_m_candidates(&queries[qi * D..(qi + 1) * D], 10 + qi)) + .collect(); + let (cand, off) = flatten_to_csr(&rows); + + let out_k = k.min(N); + let mut scores = vec![0.0f32; nq * out_k]; + let mut indices = vec![0i64; nq * out_k]; + let mut scratch = ordvec::SubsetScratch::new(); + rq.search_asymmetric_subset_batched_serial_into( + &queries, + &off, + &cand, + k, + &mut scratch, + &mut scores, + &mut indices, + ); + + for qi in 0..nq { + let (es, ei) = + rq.search_asymmetric_subset(&queries[qi * D..(qi + 1) * D], &rows[qi], k); + // Row qi: first es.len() slots equal the single-query result; rest padded. + for slot in 0..out_k { + if slot < ei.len() { + assert_eq!( + indices[qi * out_k + slot], + ei[slot], + "bits={bits} q{qi} slot{slot} id" + ); + assert_eq!( + scores[qi * out_k + slot], + es[slot], + "bits={bits} q{qi} slot{slot} score" + ); + } else { + assert_eq!(indices[qi * out_k + slot], -1); + assert_eq!(scores[qi * out_k + slot], f32::NEG_INFINITY); + } + } + } + } +} + +#[test] +fn batched_into_preserves_duplicate_candidates() { + let (_sign, rq, _corpus) = build_two_stage(2); + let q = make_corpus(55)[..D].to_vec(); + let row = vec![7u32, 7, 3, 7]; + let (cand, off) = flatten_to_csr(std::slice::from_ref(&row)); + let k = 4usize; + let out_k = k.min(N); + let mut scores = vec![0.0f32; out_k]; + let mut indices = vec![0i64; out_k]; + let mut scratch = ordvec::SubsetScratch::new(); + rq.search_asymmetric_subset_batched_serial_into( + &q, + &off, + &cand, + k, + &mut scratch, + &mut scores, + &mut indices, + ); + let (es, ei) = rq.search_asymmetric_subset(&q, &row, k); + assert_eq!( + &indices[..ei.len()], + &ei[..], + "duplicate ids/order must match single-query" + ); + assert_eq!(&scores[..es.len()], &es[..]); + assert_score_then_id_order(&scores[..ei.len()], &indices[..ei.len()]); +} + +#[test] +fn batched_into_is_order_independent_for_unsorted_candidate_rows() { + // Contract (doc'd on `search_asymmetric_subset_batched_serial_into`): + // candidate ids within a row need NOT be sorted, and input order has no + // effect on results — output is fully determined by the tie policy + // (score desc, then global doc-id asc). Pin it: feed the same candidate + // set in two different orders and require byte-identical output. bits=1 + // exercises the tie-break path directly (coarse quantization ties scores, + // so ordering is decided by doc-id, not input position). + for bits in [1u8, 2, 4] { + let (sign, rq, _corpus) = build_two_stage(bits); + let q = make_corpus(73_000 + bits as u64)[..D].to_vec(); + let k = 6usize; + let out_k = k.min(N); + + // A real candidate shortlist (probe order)... + let row = sign.top_m_candidates(&q, 12); + assert!(row.len() >= 2, "need a non-trivial row to permute"); + // ...and a non-identity permutation of the SAME ids. + let mut shuffled = row.clone(); + shuffled.reverse(); + assert_ne!(row, shuffled, "reversed row must differ from original"); + + let run = |cand_row: &[u32]| { + let rows = vec![cand_row.to_vec()]; + let (cand, off) = flatten_to_csr(&rows); + let mut scores = vec![0.0f32; out_k]; + let mut indices = vec![0i64; out_k]; + let mut scratch = ordvec::SubsetScratch::new(); + rq.search_asymmetric_subset_batched_serial_into( + &q, + &off, + &cand, + k, + &mut scratch, + &mut scores, + &mut indices, + ); + (scores, indices) + }; + + let (s0, i0) = run(&row); + let (s1, i1) = run(&shuffled); + + assert_eq!( + i0, i1, + "bits={bits}: output ids must not depend on input order" + ); + assert_eq!( + s0, s1, + "bits={bits}: output scores must not depend on input order" + ); + assert_score_then_id_order(&s0, &i0); + + // ...and matches the single-query reference (sorted output regardless). + let (es, ei) = rq.search_asymmetric_subset(&q, &row, k); + assert_eq!( + &i0[..ei.len()], + &ei[..], + "bits={bits}: must match single-query" + ); + assert_eq!(&s0[..es.len()], &es[..]); + } +} + +#[test] +fn batched_into_edges() { + let (_sign, rq, _corpus) = build_two_stage(2); + let mut scratch = ordvec::SubsetScratch::new(); + // nq == 0 + let mut s0: Vec = vec![]; + let mut i0: Vec = vec![]; + rq.search_asymmetric_subset_batched_serial_into( + &[], + &[0], + &[], + 5, + &mut scratch, + &mut s0, + &mut i0, + ); + // k == 0 → out_k 0 → buffers length 0 + let q = make_corpus(8)[..2 * D].to_vec(); + let mut s: Vec = vec![]; + let mut i: Vec = vec![]; + rq.search_asymmetric_subset_batched_serial_into( + &q, + &[0, 0, 0], + &[], + 0, + &mut scratch, + &mut s, + &mut i, + ); + // empty rows for 2 queries, k=3 → all sentinel + let out_k = 3usize.min(N); + let mut s2 = vec![0.0f32; 2 * out_k]; + let mut i2 = vec![0i64; 2 * out_k]; + rq.search_asymmetric_subset_batched_serial_into( + &q, + &[0, 0, 0], + &[], + 3, + &mut scratch, + &mut s2, + &mut i2, + ); + assert!(s2.iter().all(|&x| x == f32::NEG_INFINITY)); + assert!(i2.iter().all(|&x| x == -1)); +} + +#[test] +#[should_panic(expected = "candidate_offsets length")] +fn batched_into_rejects_bad_offsets_len() { + let (_sign, rq, _corpus) = build_two_stage(2); + let q = make_corpus(9)[..2 * D].to_vec(); + let mut s = vec![0.0f32; 2 * 3]; + let mut i = vec![0i64; 2 * 3]; + let mut scratch = ordvec::SubsetScratch::new(); + // offsets len 2 but nq is 2 → needs len 3 + rq.search_asymmetric_subset_batched_serial_into( + &q, + &[0, 0], + &[], + 3, + &mut scratch, + &mut s, + &mut i, + ); +} + +#[test] +#[should_panic(expected = "out of range")] +fn batched_into_rejects_oob_candidate() { + let (_sign, rq, _corpus) = build_two_stage(2); + let q = make_corpus(10)[..D].to_vec(); + let mut s = vec![0.0f32; 3]; + let mut i = vec![0i64; 3]; + let mut scratch = ordvec::SubsetScratch::new(); + rq.search_asymmetric_subset_batched_serial_into( + &q, + &[0, 1], + &[N as u32 + 5], + 3, + &mut scratch, + &mut s, + &mut i, + ); +} + +#[test] +fn batched_into_is_allocation_free_after_warmup() { + let (sign, rq, _corpus) = build_two_stage(2); + let nq = 4usize; + let queries = make_corpus(61)[..nq * D].to_vec(); + let rows: Vec> = (0..nq) + .map(|qi| sign.top_m_candidates(&queries[qi * D..(qi + 1) * D], 16)) + .collect(); + let (cand, off) = flatten_to_csr(&rows); + let out_k = 5usize.min(N); + let mut scores = vec![0.0f32; nq * out_k]; + let mut indices = vec![0i64; nq * out_k]; + let mut scratch = ordvec::SubsetScratch::new(); + // Warmup. + rq.search_asymmetric_subset_batched_serial_into( + &queries, + &off, + &cand, + 5, + &mut scratch, + &mut scores, + &mut indices, + ); + // Second identical call must not grow scratch (capacity-stability proxy for + // allocation-free; covers scan + finalize buffers). See spec §B for the + // optional allocator-counter strengthening. + let caps = scratch.capacities_for_test(); + rq.search_asymmetric_subset_batched_serial_into( + &queries, + &off, + &cand, + 5, + &mut scratch, + &mut scores, + &mut indices, + ); + assert_eq!( + scratch.capacities_for_test(), + caps, + "scratch must reuse capacity (allocation-free)" + ); +} + +#[test] +fn batched_serial_wrapper_matches_into_and_full_set_matches_search_asymmetric() { + let (sign, rq, _corpus) = build_two_stage(2); + let nq = 4usize; + let queries = make_corpus(70_001)[..nq * D].to_vec(); + let k = 6usize; + let rows: Vec> = (0..nq) + .map(|qi| sign.top_m_candidates(&queries[qi * D..(qi + 1) * D], 25)) + .collect(); + let (cand, off) = flatten_to_csr(&rows); + + let res = rq.search_asymmetric_subset_batched_serial(&queries, &off, &cand, k); + let out_k = k.min(N); + assert_eq!(res.nq, nq); + assert_eq!(res.k, out_k); + + // == *_into + let mut s = vec![0.0f32; nq * out_k]; + let mut i = vec![0i64; nq * out_k]; + let mut scratch = ordvec::SubsetScratch::new(); + rq.search_asymmetric_subset_batched_serial_into( + &queries, + &off, + &cand, + k, + &mut scratch, + &mut s, + &mut i, + ); + assert_eq!(res.scores, s); + assert_eq!(res.indices, i); + + // Full candidate set per row (0..N) == search_asymmetric. + let full: Vec = (0..N as u32).collect(); + let full_rows: Vec> = (0..nq).map(|_| full.clone()).collect(); + let (fc, fo) = flatten_to_csr(&full_rows); + let res_full = rq.search_asymmetric_subset_batched_serial(&queries, &fo, &fc, k); + let asym = rq.search_asymmetric(&queries, k); + assert_eq!( + res_full.indices, asym.indices, + "full-set batched subset must equal search_asymmetric" + ); + assert_eq!(res_full.scores, asym.scores); +} + +/// Cross-tier: batched subset rerank's top-k SET + order must match the scalar +/// byte-LUT reference (`search_asymmetric_byte_lut`) across dims that route to +/// different kernels (dim%64 → AVX-512; %16-not-%64 b=2 → AVX2; non-%16 → scalar). +/// Scores compared within the existing kernel parity tolerance, NOT byte-identical +/// across tiers. (Same convention as redteam_beta + determinism_contract.) +#[test] +fn batched_subset_rerank_matches_scalar_reference_across_tiers() { + use ordvec::search_asymmetric_byte_lut; + for dim in [64usize, 80, 128] { + // 64 → AVX-512-eligible; 80 → %16 (AVX2 b=2) not %64; 128 → AVX-512. + let n = 64usize; + let mut rng = ChaCha8Rng::seed_from_u64(0xA11C + dim as u64); + let corpus: Vec = (0..n * dim).map(|_| rng.random_range(-1.0..1.0)).collect(); + let mut rq = RankQuant::new(dim, 2); + rq.add(&corpus); + let query: Vec = (0..dim).map(|_| rng.random_range(-1.0..1.0)).collect(); + let k = 8usize; + let full: Vec = (0..n as u32).collect(); + let off = vec![0usize, full.len()]; + let res = rq.search_asymmetric_subset_batched_serial(&query, &off, &full, k); + let reference = search_asymmetric_byte_lut(&rq, &query, k); + + use std::collections::HashSet; + let got: HashSet = res.indices.iter().copied().filter(|&x| x >= 0).collect(); + let want: HashSet = reference + .indices + .iter() + .copied() + .filter(|&x| x >= 0) + .collect(); + assert_eq!( + got, want, + "dim={dim}: top-k set diverged from scalar reference" + ); + // Tie/order policy: scores non-increasing, ids ascending within equal scores. + assert_score_then_id_order(&res.scores, &res.indices); + // Scores within tolerance (NOT byte-identical across tiers). + for slot in 0..k { + let a = res.scores[slot]; + // find matching id in reference to compare its score + if res.indices[slot] >= 0 { + let rpos = reference + .indices + .iter() + .position(|&x| x == res.indices[slot]); + if let Some(p) = rpos { + let b = reference.scores[p]; + assert!( + (a - b).abs() <= 1e-4, + "dim={dim} slot{slot}: score {a} vs ref {b}" + ); + } + } + } + } +}