From d31f345de4d45e5c2d7eb4e4945eda873dcc3032 Mon Sep 17 00:00:00 2001 From: Todd Baur Date: Fri, 19 Jun 2026 13:03:11 -0700 Subject: [PATCH 1/2] fix(beir-bench): streaming row-bounded npy loader (--max-docs honored, ~2x less peak RAM) load_npy_f32 had two issues that made large scaling sweeps painful: - ~2x memory peak: std::fs::read pulled the whole file into a Vec, then a second full Vec was allocated while the bytes were still alive (~72GB peak on an 8.8M x 1024 f32 corpus). - --max-docs was ignored at load time: the full corpus was read off disk and only sliced afterward, so every sub-sampled point in a scaling sweep paid the full read + single-threaded parse. New load_npy_f32_rows(path, max_rows) seeks past the header and reads ONLY the kept rows, parses the payload in parallel (rayon) directly into the output Vec with no intermediate full-size copy. Added npy_row_count (header-only) for the corpus_ids length assertion. load_npy_f32 keeps its signature (max_rows=None). Verified: builds clean; --max-docs 5000 loads exactly 5000 rows; full-corpus path unchanged. Co-Authored-By: Claude Opus 4.8 Signed-off-by: Todd Baur --- benchmarks/beir-bench/src/main.rs | 117 ++++++++++++++++++++---------- 1 file changed, 79 insertions(+), 38 deletions(-) diff --git a/benchmarks/beir-bench/src/main.rs b/benchmarks/beir-bench/src/main.rs index 0e1962e..82f3d24 100644 --- a/benchmarks/beir-bench/src/main.rs +++ b/benchmarks/beir-bench/src/main.rs @@ -156,24 +156,61 @@ fn parse_args() -> Config { // --------------------------------------------------------------------------- fn load_npy_f32(path: &str) -> (Vec, usize, usize) { - let bytes = std::fs::read(path).unwrap_or_else(|e| panic!("read npy {path}: {e}")); - assert!(bytes.len() >= 10, "npy file too short: {path}"); - assert_eq!(&bytes[..6], b"\x93NUMPY", "not a numpy file: {path}"); - let major = bytes[6]; - let minor = bytes[7]; - assert!( - major == 1 || major == 2, - "unsupported npy version {major}.{minor}: {path}", - ); + load_npy_f32_rows(path, None) +} + +/// Read just the npy header and return the row count (dim 0). Cheap: no payload read. +fn npy_row_count(path: &str) -> usize { + use std::io::Read; + let mut f = std::fs::File::open(path).unwrap_or_else(|e| panic!("open npy {path}: {e}")); + let mut pre = [0u8; 12]; + f.read_exact(&mut pre[..10]).expect("read npy magic"); + assert_eq!(&pre[..6], b"\x93NUMPY", "not a numpy file: {path}"); + let header_len = if pre[6] == 1 { + u16::from_le_bytes([pre[8], pre[9]]) as usize + } else { + f.read_exact(&mut pre[10..12]).unwrap(); + u32::from_le_bytes([pre[8], pre[9], pre[10], pre[11]]) as usize + }; + let mut hb = vec![0u8; header_len]; + f.read_exact(&mut hb).expect("read npy header"); + let header = std::str::from_utf8(&hb).expect("npy header not utf-8"); + let after = &header[header.find("'shape':").expect("no shape")..]; + let open = after.find('(').unwrap(); + let close = after.find(')').unwrap(); + after[open + 1..close] + .split(',') + .find_map(|s| s.trim().parse::().ok()) + .expect("no row count in npy shape") +} + +/// Read a 2-D LE-f32 C-order npy. When `max_rows` is `Some(m)`, only the first +/// `m` rows of the payload are read off disk (so `--max-docs` subsampling does +/// NOT pull the whole 36 GB corpus into RAM). The payload is parsed in parallel +/// directly into the output `Vec` — no intermediate full `Vec` copy, so +/// peak memory is ~1× the kept data, not 2× the whole file. +fn load_npy_f32_rows(path: &str, max_rows: Option) -> (Vec, usize, usize) { + use std::io::{Read, Seek, SeekFrom}; + let mut f = std::fs::File::open(path).unwrap_or_else(|e| panic!("open npy {path}: {e}")); + // header: magic(6) + ver(2) + len(2 or 4) + header + let mut pre = [0u8; 12]; + f.read_exact(&mut pre[..10]) + .unwrap_or_else(|e| panic!("read npy magic {path}: {e}")); + assert_eq!(&pre[..6], b"\x93NUMPY", "not a numpy file: {path}"); + let major = pre[6]; + assert!(major == 1 || major == 2, "unsupported npy version: {path}"); let (header_len, header_start) = if major == 1 { - let hl = u16::from_le_bytes([bytes[8], bytes[9]]) as usize; - (hl, 10) + (u16::from_le_bytes([pre[8], pre[9]]) as usize, 10usize) } else { - let hl = u32::from_le_bytes([bytes[8], bytes[9], bytes[10], bytes[11]]) as usize; - (hl, 12) + f.read_exact(&mut pre[10..12]).unwrap(); + ( + u32::from_le_bytes([pre[8], pre[9], pre[10], pre[11]]) as usize, + 12usize, + ) }; - let header = std::str::from_utf8(&bytes[header_start..header_start + header_len]) - .expect("npy header not utf-8"); + let mut hb = vec![0u8; header_len]; + f.read_exact(&mut hb).expect("read npy header"); + let header = std::str::from_utf8(&hb).expect("npy header not utf-8"); assert!( header.contains("'descr': ' (Vec, usize, usize) { header.contains("'fortran_order': False"), "expected C order in {path}", ); - let shape_start = header.find("'shape':").expect("no shape in npy header"); - let after = &header[shape_start..]; + let after = &header[header.find("'shape':").expect("no shape in npy header")..]; let open = after.find('(').unwrap(); let close = after.find(')').unwrap(); let dims: Vec = after[open + 1..close] @@ -191,19 +227,20 @@ fn load_npy_f32(path: &str) -> (Vec, usize, usize) { .filter_map(|s| s.trim().parse::().ok()) .collect(); assert_eq!(dims.len(), 2, "expected 2-D array in {path}"); - let n = dims[0]; - let dim = dims[1]; + let (n_full, dim) = (dims[0], dims[1]); + let n = max_rows.map_or(n_full, |m| m.min(n_full)); + // seek to payload start and read ONLY the rows we keep let data_start = header_start + header_len; - let n_floats = n * dim; - assert_eq!( - bytes.len() - data_start, - n_floats * 4, - "data length mismatch in {path}", - ); - let mut out = vec![0.0f32; n_floats]; - for (i, chunk) in bytes[data_start..].chunks_exact(4).enumerate() { - out[i] = f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]); - } + f.seek(SeekFrom::Start(data_start as u64)).unwrap(); + let mut raw = vec![0u8; n * dim * 4]; + f.read_exact(&mut raw) + .unwrap_or_else(|e| panic!("read npy payload {path}: {e}")); + // parallel parse, no second full-size buffer beyond the f32 output + let mut out = vec![0.0f32; n * dim]; + out.par_iter_mut().enumerate().for_each(|(i, v)| { + let c = &raw[i * 4..i * 4 + 4]; + *v = f32::from_le_bytes([c[0], c[1], c[2], c[3]]); + }); (out, n, dim) } @@ -712,10 +749,18 @@ fn main() { let manifest_path = format!("{enc_dir}/embeddings.manifest.json"); let encoder_sha = sha256_file(&manifest_path); - let (corpus_full, n_corpus_full, dim) = load_npy_f32(&format!("{enc_dir}/corpus.f32.npy")); + // Load ONLY the first n_docs rows when sub-sampling (--max-docs), so the scan + // sweep never pulls the whole corpus off disk just to slice it. corpus_ids is + // truncated to match; the full row count comes from the npy header. + let n_corpus_full = npy_row_count(&format!("{enc_dir}/corpus.f32.npy")); + let n_docs = cfg.max_docs.unwrap_or(n_corpus_full).min(n_corpus_full); + let full_corpus = cfg.max_docs.is_none() || n_docs == n_corpus_full; + let (corpus_vec, n_loaded, dim) = + load_npy_f32_rows(&format!("{enc_dir}/corpus.f32.npy"), Some(n_docs)); + assert_eq!(n_loaded, n_docs, "corpus load row mismatch"); let (queries, n_queries, q_dim) = load_npy_f32(&format!("{enc_dir}/queries.f32.npy")); assert_eq!(q_dim, dim, "query dim {q_dim} != corpus dim {dim}"); - validate_embeddings(&corpus_full, n_corpus_full, dim, "corpus"); + validate_embeddings(&corpus_vec, n_docs, dim, "corpus"); validate_embeddings(&queries, n_queries, q_dim, "queries"); let corpus_ids_full = load_json_string_array(&format!("{enc_dir}/corpus_ids.json")); @@ -727,10 +772,7 @@ fn main() { ); assert_eq!(query_ids.len(), n_queries, "query_ids/embeddings mismatch"); - // Sub-sample the corpus for the scaling sweep (latency-only; no nDCG). - let n_docs = cfg.max_docs.unwrap_or(n_corpus_full).min(n_corpus_full); - let full_corpus = cfg.max_docs.is_none() || n_docs == n_corpus_full; - let corpus = &corpus_full[..n_docs * dim]; + let corpus = &corpus_vec[..n_docs * dim]; let corpus_ids = &corpus_ids_full[..n_docs]; let write_topk = full_corpus; // qrels-based nDCG only valid on the full corpus @@ -986,9 +1028,8 @@ fn run_hnsw( let build_seconds = t0.elapsed().as_secs_f64(); eprintln!(" build done in {build_seconds:.2}s"); - // HNSW graph size is implementation-internal to hnsw_rs; the numeric field - // reports stored float-vector bytes only. Public docs label this as - // "4096 B + graph" so the side structure is not silently counted as zero. + // HNSW graph size is implementation-internal; report the stored-vector bytes + // (full float) as the index footprint, matching the dense baseline accounting. let bytes_per_vector = dim * 4; let index_total_mib = (n_docs * bytes_per_vector) as f64 / 1024.0 / 1024.0; let warmup = 5.min(n_queries); From 0f8554eebd61c23e2b1f59b2db7258847573170d Mon Sep 17 00:00:00 2001 From: Nelson Spence Date: Fri, 19 Jun 2026 16:14:07 -0500 Subject: [PATCH 2/2] Fix BEIR NPY row loader review findings --- benchmarks/beir-bench/src/main.rs | 203 ++++++++++++++++++++++-------- 1 file changed, 152 insertions(+), 51 deletions(-) diff --git a/benchmarks/beir-bench/src/main.rs b/benchmarks/beir-bench/src/main.rs index 82f3d24..b931c6a 100644 --- a/benchmarks/beir-bench/src/main.rs +++ b/benchmarks/beir-bench/src/main.rs @@ -159,28 +159,62 @@ fn load_npy_f32(path: &str) -> (Vec, usize, usize) { load_npy_f32_rows(path, None) } -/// Read just the npy header and return the row count (dim 0). Cheap: no payload read. -fn npy_row_count(path: &str) -> usize { +fn read_npy_header(f: &mut std::fs::File, path: &str) -> (String, usize) { use std::io::Read; - let mut f = std::fs::File::open(path).unwrap_or_else(|e| panic!("open npy {path}: {e}")); + let mut pre = [0u8; 12]; - f.read_exact(&mut pre[..10]).expect("read npy magic"); + f.read_exact(&mut pre[..10]) + .unwrap_or_else(|e| panic!("read npy magic {path}: {e}")); assert_eq!(&pre[..6], b"\x93NUMPY", "not a numpy file: {path}"); - let header_len = if pre[6] == 1 { - u16::from_le_bytes([pre[8], pre[9]]) as usize + let major = pre[6]; + let minor = pre[7]; + assert!( + major == 1 || major == 2, + "unsupported npy version {major}.{minor}: {path}", + ); + let (header_len, data_start) = if major == 1 { + (u16::from_le_bytes([pre[8], pre[9]]) as usize, 10usize) } else { - f.read_exact(&mut pre[10..12]).unwrap(); - u32::from_le_bytes([pre[8], pre[9], pre[10], pre[11]]) as usize + f.read_exact(&mut pre[10..12]) + .unwrap_or_else(|e| panic!("read npy header length {path}: {e}")); + ( + u32::from_le_bytes([pre[8], pre[9], pre[10], pre[11]]) as usize, + 12usize, + ) }; let mut hb = vec![0u8; header_len]; - f.read_exact(&mut hb).expect("read npy header"); - let header = std::str::from_utf8(&hb).expect("npy header not utf-8"); - let after = &header[header.find("'shape':").expect("no shape")..]; + f.read_exact(&mut hb) + .unwrap_or_else(|e| panic!("read npy header {path}: {e}")); + let header = + String::from_utf8(hb).unwrap_or_else(|e| panic!("npy header not utf-8 {path}: {e}")); + (header, data_start + header_len) +} + +fn npy_shape(header: &str, path: &str) -> Vec { + let after = &header[header.find("'shape':").expect("no shape in npy header")..]; let open = after.find('(').unwrap(); let close = after.find(')').unwrap(); - after[open + 1..close] + let dims: Vec = after[open + 1..close] .split(',') - .find_map(|s| s.trim().parse::().ok()) + .filter_map(|s| s.trim().parse::().ok()) + .collect(); + assert!(!dims.is_empty(), "empty npy shape in {path}"); + dims +} + +fn npy_payload_bytes(n: usize, dim: usize, path: &str) -> usize { + n.checked_mul(dim) + .and_then(|floats| floats.checked_mul(std::mem::size_of::())) + .unwrap_or_else(|| panic!("npy payload too large: {path}")) +} + +/// Read just the npy header and return the row count (dim 0). Cheap: no payload read. +fn npy_row_count(path: &str) -> usize { + let mut f = std::fs::File::open(path).unwrap_or_else(|e| panic!("open npy {path}: {e}")); + let (header, _) = read_npy_header(&mut f, path); + npy_shape(&header, path) + .into_iter() + .next() .expect("no row count in npy shape") } @@ -190,27 +224,10 @@ fn npy_row_count(path: &str) -> usize { /// directly into the output `Vec` — no intermediate full `Vec` copy, so /// peak memory is ~1× the kept data, not 2× the whole file. fn load_npy_f32_rows(path: &str, max_rows: Option) -> (Vec, usize, usize) { - use std::io::{Read, Seek, SeekFrom}; + use std::io::Read; + let mut f = std::fs::File::open(path).unwrap_or_else(|e| panic!("open npy {path}: {e}")); - // header: magic(6) + ver(2) + len(2 or 4) + header - let mut pre = [0u8; 12]; - f.read_exact(&mut pre[..10]) - .unwrap_or_else(|e| panic!("read npy magic {path}: {e}")); - assert_eq!(&pre[..6], b"\x93NUMPY", "not a numpy file: {path}"); - let major = pre[6]; - assert!(major == 1 || major == 2, "unsupported npy version: {path}"); - let (header_len, header_start) = if major == 1 { - (u16::from_le_bytes([pre[8], pre[9]]) as usize, 10usize) - } else { - f.read_exact(&mut pre[10..12]).unwrap(); - ( - u32::from_le_bytes([pre[8], pre[9], pre[10], pre[11]]) as usize, - 12usize, - ) - }; - let mut hb = vec![0u8; header_len]; - f.read_exact(&mut hb).expect("read npy header"); - let header = std::str::from_utf8(&hb).expect("npy header not utf-8"); + let (header, data_start) = read_npy_header(&mut f, path); assert!( header.contains("'descr': ') -> (Vec, usize, u header.contains("'fortran_order': False"), "expected C order in {path}", ); - let after = &header[header.find("'shape':").expect("no shape in npy header")..]; - let open = after.find('(').unwrap(); - let close = after.find(')').unwrap(); - let dims: Vec = after[open + 1..close] - .split(',') - .filter_map(|s| s.trim().parse::().ok()) - .collect(); + let dims = npy_shape(&header, path); assert_eq!(dims.len(), 2, "expected 2-D array in {path}"); let (n_full, dim) = (dims[0], dims[1]); let n = max_rows.map_or(n_full, |m| m.min(n_full)); - // seek to payload start and read ONLY the rows we keep - let data_start = header_start + header_len; - f.seek(SeekFrom::Start(data_start as u64)).unwrap(); - let mut raw = vec![0u8; n * dim * 4]; - f.read_exact(&mut raw) + let full_payload_bytes = npy_payload_bytes(n_full, dim, path); + let expected_len = (data_start as u64) + .checked_add(full_payload_bytes as u64) + .unwrap_or_else(|| panic!("npy file too large: {path}")); + let actual_len = f + .metadata() + .unwrap_or_else(|e| panic!("stat npy {path}: {e}")) + .len(); + assert_eq!(actual_len, expected_len, "data length mismatch in {path}"); + + let n_floats = n + .checked_mul(dim) + .unwrap_or_else(|| panic!("npy payload too large: {path}")); + let mut out = vec![0.0f32; n_floats]; + let read_bytes = npy_payload_bytes(n, dim, path); + // SAFETY: `out` is fully initialized, `f32` has no invalid bit patterns, and + // the byte slice covers exactly the initialized backing storage. + let out_bytes = + unsafe { std::slice::from_raw_parts_mut(out.as_mut_ptr().cast::(), read_bytes) }; + f.read_exact(out_bytes) .unwrap_or_else(|e| panic!("read npy payload {path}: {e}")); - // parallel parse, no second full-size buffer beyond the f32 output - let mut out = vec![0.0f32; n * dim]; - out.par_iter_mut().enumerate().for_each(|(i, v)| { - let c = &raw[i * 4..i * 4 + 4]; - *v = f32::from_le_bytes([c[0], c[1], c[2], c[3]]); - }); + + #[cfg(target_endian = "big")] + out.par_iter_mut() + .for_each(|v| *v = f32::from_bits(v.to_bits().swap_bytes())); + (out, n, dim) } @@ -1637,3 +1662,79 @@ unsafe fn scan_agree_avx512(codes: &[u64], wpd: usize, d0: usize, qcode: &[u64], *a = dim - ham; } } + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Write; + use std::path::PathBuf; + + fn temp_npy_path(name: &str) -> PathBuf { + let mut path = std::env::temp_dir(); + path.push(format!( + "ordvec-beir-bench-{name}-{}-{}.npy", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + )); + path + } + + fn write_v1_npy(path: &std::path::Path, rows: usize, dim: usize, values: &[f32]) { + assert_eq!(values.len(), rows * dim); + let mut header = + format!("{{'descr': '