diff --git a/.github/workflows/pm-e2e-bench.yml b/.github/workflows/pm-e2e-bench.yml index 74c90ece5..eb560969b 100644 --- a/.github/workflows/pm-e2e-bench.yml +++ b/.github/workflows/pm-e2e-bench.yml @@ -143,6 +143,43 @@ jobs: name: utoo-linux-x64 path: target/x86_64-unknown-linux-gnu/release/utoo retention-days: 1 + # manifest-bench is a standalone HTTP-only fetch sweeper used as + # the network-only baseline for p1_resolve perf work. Built only + # when phases bench is going to run (label or dispatch), so plain + # PR builds aren't slowed by the extra crate. + - name: Build manifest-bench (p1 baseline) + if: > + (github.event_name == 'pull_request' && contains(github.event.pull_request.labels.*.name, 'benchmark')) || + (github.event_name == 'workflow_dispatch' && (inputs.target == 'pm-bench-phases' || inputs.target == 'pm-bench-pcap')) + run: cargo build --release --target x86_64-unknown-linux-gnu -p manifest-bench + - name: Upload manifest-bench binary + if: > + (github.event_name == 'pull_request' && contains(github.event.pull_request.labels.*.name, 'benchmark')) || + (github.event_name == 'workflow_dispatch' && (inputs.target == 'pm-bench-phases' || inputs.target == 'pm-bench-pcap')) + uses: actions/upload-artifact@v4 + with: + name: manifest-bench-linux-x64 + path: target/x86_64-unknown-linux-gnu/release/manifest-bench + retention-days: 1 + # preload-bench: same HTTP setup as manifest-bench, but discovers + # names by walking transitive deps from a package.json root — + # tests whether a fully self-contained streaming preload can match + # standalone manifest-bench's wall on the same workload that + # ruborist's path runs at ~2.18s. + - name: Build preload-bench + if: > + (github.event_name == 'pull_request' && contains(github.event.pull_request.labels.*.name, 'benchmark')) || + (github.event_name == 'workflow_dispatch' && (inputs.target == 'pm-bench-phases' || inputs.target == 'pm-bench-pcap')) + run: cargo build --release --target x86_64-unknown-linux-gnu -p preload-bench + - name: Upload preload-bench binary + if: > + (github.event_name == 'pull_request' && contains(github.event.pull_request.labels.*.name, 'benchmark')) || + (github.event_name == 'workflow_dispatch' && (inputs.target == 'pm-bench-phases' || inputs.target == 'pm-bench-pcap')) + uses: actions/upload-artifact@v4 + with: + name: preload-bench-linux-x64 + path: target/x86_64-unknown-linux-gnu/release/preload-bench + retention-days: 1 # Piggyback on the already-built target/ from the step above: when the # PR is labeled `benchmark`, overlay origin/next's tree onto the current # workdir and re-run cargo build. cargo's incremental compile only @@ -516,6 +553,33 @@ jobs: mv /tmp/utoo-next-dist/utoo /tmp/utoo-next echo "Baseline utoo (next) version: $(/tmp/utoo-next --version)" echo "UTOO_NEXT_BIN=/tmp/utoo-next" >> $GITHUB_ENV + # Download the manifest-bench binary built by build-linux. Used as + # the network-only baseline for p1_resolve work — strips out parse, + # BFS, dedup, lockfile write so the wall is pure HTTP fetch. + - name: Download manifest-bench binary + uses: actions/download-artifact@v4 + with: + name: manifest-bench-linux-x64 + path: /tmp/manifest-bench-dist + - name: Install manifest-bench + run: | + chmod +x /tmp/manifest-bench-dist/manifest-bench + mv /tmp/manifest-bench-dist/manifest-bench /tmp/manifest-bench + echo "MANIFEST_BENCH_BIN=/tmp/manifest-bench" >> $GITHUB_ENV + # Self-contained streaming preload bench — same HTTP setup as + # manifest-bench but discovers names via transitive walk from a + # package.json. Used to test whether a fully-isolated path can + # match standalone manifest-bench's wall on the same workload. + - name: Download preload-bench binary + uses: actions/download-artifact@v4 + with: + name: preload-bench-linux-x64 + path: /tmp/preload-bench-dist + - name: Install preload-bench + run: | + chmod +x /tmp/preload-bench-dist/preload-bench + mv /tmp/preload-bench-dist/preload-bench /tmp/preload-bench + echo "PRELOAD_BENCH_BIN=/tmp/preload-bench" >> $GITHUB_ENV - name: Verify tools run: | hyperfine --version @@ -565,6 +629,91 @@ jobs: run: | mkdir -p /tmp/pm-bench-output bash bench/pm-bench-phases.sh 2>&1 | tee /tmp/pm-bench-output/bench-phases-npmmirror.log + # Standalone HTTP-only sweep — sweeps the network-only ceiling + # against the same lockfile-derived workload phase-bench just used. + # Output goes into the bench logs artifact; no PR comment surface. + - name: Standalone manifest-bench (HTTP-only sweep) + env: + PROJECT: ${{ github.event.inputs.project || 'ant-design' }} + REGISTRY: 'https://registry.npmjs.org' + run: | + set -eu + mkdir -p /tmp/pm-bench-output + PROJECT_DIR="/tmp/pm-bench/$PROJECT" + if [ ! -d "$PROJECT_DIR" ]; then + mkdir -p /tmp/pm-bench + git clone --depth 1 "https://github.com/ant-design/$PROJECT" "$PROJECT_DIR" + fi + cd "$PROJECT_DIR" + if [ ! -f package-lock.json ]; then + echo "==> generating lockfile via utoo (one-shot, untimed)" + utoo deps --registry "$REGISTRY" || true + fi + ls -la package-lock.json || { echo "no lockfile; skipping manifest-bench"; exit 0; } + + MB_LOG=/tmp/pm-bench-output/manifest-bench-npmjs.log + { + echo "============================================================" + echo "manifest-bench: HTTP-only fetch (no parse, no resolver)" + echo " Goal: isolate reqwest/rustls/tokio behaviour from" + echo " ruborist's resolver pipeline. Same metric shape as" + echo " ruborist's p1-breakdown line." + echo "============================================================" + for CAP in 32 64 96 128 192 256; do + echo + echo "--- concurrency=$CAP, h1, full manifest, default UA ---" + "$MANIFEST_BENCH_BIN" --lockfile package-lock.json --registry "$REGISTRY" \ + --concurrency "$CAP" --reps 2 --http1-only || true + done + echo + echo "--- concurrency=128, h2 negotiate, full manifest, default UA ---" + "$MANIFEST_BENCH_BIN" --lockfile package-lock.json --registry "$REGISTRY" \ + --concurrency 128 --reps 2 || true + echo + echo "--- concurrency=128, h1, single-version endpoint ---" + "$MANIFEST_BENCH_BIN" --lockfile package-lock.json --registry "$REGISTRY" \ + --concurrency 128 --reps 2 --http1-only --single-version || true + echo + echo "--- concurrency=128, h1, UA=Bun/1.2.21 ---" + "$MANIFEST_BENCH_BIN" --lockfile package-lock.json --registry "$REGISTRY" \ + --concurrency 128 --reps 2 --http1-only --user-agent "Bun/1.2.21" || true + } 2>&1 | tee "$MB_LOG" + # Self-contained streaming preload (transitive walk from + # package.json) — same HTTP setup as manifest-bench but with a + # streaming FuturesUnordered + per-future parse. This tests + # whether a fully ruborist-independent path can hit standalone + # manifest-bench's wall under the same project workload. + - name: Standalone preload-bench (transitive walk sweep) + env: + PROJECT: ${{ github.event.inputs.project || 'ant-design' }} + REGISTRY: 'https://registry.npmjs.org' + run: | + set -eu + mkdir -p /tmp/pm-bench-output + PROJECT_DIR="/tmp/pm-bench/$PROJECT" + if [ ! -d "$PROJECT_DIR" ]; then + echo "no project dir; skipping preload-bench"; exit 0 + fi + PJ="$PROJECT_DIR/package.json" + if [ ! -f "$PJ" ]; then + echo "no package.json; skipping preload-bench"; exit 0 + fi + + PB_LOG=/tmp/pm-bench-output/preload-bench-npmjs.log + { + echo "============================================================" + echo "preload-bench: streaming transitive-walk preload" + echo " Self-contained (no ruborist deps). Same HTTP setup as" + echo " manifest-bench, but discovers names by walking transitive" + echo " deps from package.json instead of consuming a flat list." + echo "============================================================" + for CAP in 64 96 128; do + echo + echo "--- concurrency=$CAP, h1, transitive walk ---" + "$PRELOAD_BENCH_BIN" --package-json "$PJ" --registry "$REGISTRY" \ + --concurrency "$CAP" --reps 4 || true + done + } 2>&1 | tee "$PB_LOG" - name: Upload bench logs if: always() uses: actions/upload-artifact@v4 diff --git a/Cargo.toml b/Cargo.toml index ef4a4f926..4b2836c06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,8 @@ [workspace] resolver = "2" members = [ + "crates/manifest-bench", + "crates/preload-bench", "crates/pack-api", "crates/pack-cli", "crates/pack-core", diff --git a/bench/pm-bench-phases.sh b/bench/pm-bench-phases.sh index 226ffb751..b025ebc6f 100755 --- a/bench/pm-bench-phases.sh +++ b/bench/pm-bench-phases.sh @@ -22,6 +22,13 @@ UTOO_NEXT_CACHE="${UTOO_NEXT_CACHE:-/tmp/utoo-next-bench-cache}" BUN_CACHE="${BUN_CACHE:-/tmp/bun-bench-cache}" export BUN_INSTALL_CACHE_DIR="$BUN_CACHE" +# utoo path defaults to fast_preload (combined-parse) so we have a +# stable baseline to compare against. preload-bench is run as a +# separate standalone tool by the CI workflow — its wall is the +# self-contained-streaming reference, ruborist's utoo p1_resolve +# wall is the integrated path. The gap between them is what +# remains to close. + # Drop optional baselines from the PM list when their binary is not wired # up — UTOO_NPM_BIN is set by CI's "Install utoo@npm" step, UTOO_NEXT_BIN # by the optional "Build next branch utoo" step. Local runs without them diff --git a/crates/manifest-bench/Cargo.toml b/crates/manifest-bench/Cargo.toml new file mode 100644 index 000000000..5b01e57c0 --- /dev/null +++ b/crates/manifest-bench/Cargo.toml @@ -0,0 +1,37 @@ +[package] +name = "manifest-bench" +version = "0.0.0" +edition = "2024" +license = "MIT" +publish = false +description = "Standalone HTTP-only manifest fetch benchmark, isolating network behaviour from ruborist's resolver pipeline." + +[[bin]] +name = "manifest-bench" +path = "src/main.rs" + +# tombi: format.rules.table-keys-order.disabled = true +[dependencies] +anyhow = { workspace = true } +clap = { workspace = true } +futures = "0.3" +serde = { version = "1", features = ["derive"] } +serde_json = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "fs", "time"] } + +# Identical TLS / DNS choices to ruborist so we measure the *protocol* +# characteristics of the same stack, not a different implementation. +reqwest = { version = "0.12", default-features = false, features = [ + "brotli", + "gzip", + "http2", + "rustls-tls-native-roots-no-provider", + "socks" +] } +rustls = { version = "0.23", default-features = false, features = [ + "aws-lc-rs", + "logging", + "std", + "tls12" +] } +rustls-native-certs = "0.8" diff --git a/crates/manifest-bench/src/main.rs b/crates/manifest-bench/src/main.rs new file mode 100644 index 000000000..fa70f3fe4 --- /dev/null +++ b/crates/manifest-bench/src/main.rs @@ -0,0 +1,371 @@ +//! Standalone HTTP-only manifest fetch benchmark. +//! +//! Isolates the network behaviour of `reqwest + rustls + tokio` from +//! ruborist's resolver pipeline (BFS, dedup, parse, lockfile, project +//! cache). Reads a list of package names, builds manifest URLs, fires +//! parallel `GET` requests, records `(start, end)` per request, and +//! reports the same diag shape as ruborist's `Preload HTTP diag` line. +//! +//! Two input modes: +//! - `--names-file ` — newline-separated package names +//! - `--lockfile ` — a npm-style package-lock.json; we extract +//! the `packages.*` (v3) or `dependencies.*` (v2) keys +//! +//! Two registry modes: +//! - `/` — full manifest endpoint (default, npmjs) +//! - `//latest` — single-version endpoint +//! (gated behind `--single-version`) +//! +//! Each request reads the body to completion (we only measure I/O, no +//! parse). Output: same fields as preload's HTTP diag for direct +//! comparison. + +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +use anyhow::{Context, Result, anyhow}; +use clap::Parser; +use futures::stream::{FuturesUnordered, StreamExt}; + +#[derive(Parser, Debug)] +#[command( + name = "manifest-bench", + about = "HTTP-only manifest fetch bench (no parse, no resolver)" +)] +struct Args { + /// Registry base URL. + #[arg(long, default_value = "https://registry.npmjs.org")] + registry: String, + + /// File of newline-separated package names. Mutually exclusive with `--lockfile`. + #[arg(long, conflicts_with = "lockfile")] + names_file: Option, + + /// `package-lock.json` file. Reads top-level `packages.*.name` keys. + #[arg(long)] + lockfile: Option, + + /// Maximum concurrent in-flight requests. + #[arg(long, default_value_t = 128)] + concurrency: usize, + + /// Number of times to repeat the whole sweep (each iteration is a + /// fresh `reqwest::Client`, so connection pool / TLS handshake + /// costs are paid each time, matching `hyperfine` cold-start). + #[arg(long, default_value_t = 1)] + reps: usize, + + /// Use the single-version endpoint `//latest` instead of the + /// full-manifest endpoint `/`. Smaller bodies, more requests + /// served per byte. + #[arg(long)] + single_version: bool, + + /// Override `Accept` header. Default mimics ruborist's preload + /// (`application/vnd.npm.install-v1+json` — abbreviated metadata). + #[arg(long)] + accept: Option, + + /// Override `User-Agent`. Default uses reqwest's default. Try + /// `Bun/1.x.x` to test whether Cloudflare differentiates by UA. + #[arg(long)] + user_agent: Option, + + /// Force HTTP/1.1 (no H2 negotiation). Default lets ALPN decide. + #[arg(long)] + http1_only: bool, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + let names = load_names(&args)?; + if names.is_empty() { + return Err(anyhow!("no package names found in input")); + } + + println!( + "manifest-bench: registry={} concurrency={} reps={} names={} h1_only={} single_version={} accept={} ua={}", + args.registry, + args.concurrency, + args.reps, + names.len(), + args.http1_only, + args.single_version, + args.accept.as_deref().unwrap_or(""), + args.user_agent.as_deref().unwrap_or(""), + ); + + for rep in 1..=args.reps { + run_once(&args, &names, rep).await?; + } + + Ok(()) +} + +fn load_names(args: &Args) -> Result> { + if let Some(path) = &args.names_file { + let raw = std::fs::read_to_string(path).with_context(|| format!("read {path:?}"))?; + return Ok(raw + .lines() + .map(str::trim) + .filter(|s| !s.is_empty() && !s.starts_with('#')) + .map(str::to_string) + .collect()); + } + + if let Some(path) = &args.lockfile { + let raw = std::fs::read_to_string(path).with_context(|| format!("read {path:?}"))?; + return extract_lockfile_names(&raw); + } + + Err(anyhow!("provide --names-file or --lockfile")) +} + +/// Pull unique package names from an npm v3 lockfile (`packages.*`) +/// or an older v2 lockfile (`dependencies.*`). +fn extract_lockfile_names(raw: &str) -> Result> { + use std::collections::BTreeSet; + + let v: serde_json::Value = serde_json::from_str(raw).context("parse lockfile JSON")?; + let mut names: BTreeSet = BTreeSet::new(); + + if let Some(packages) = v.get("packages").and_then(|p| p.as_object()) { + for key in packages.keys() { + if key.is_empty() { + continue; + } + // npm v3 packages key like "node_modules/foo" or + // "node_modules/@scope/bar/node_modules/baz" — take the + // last path segment (or @scope/name pair). + let last = last_module_name(key); + if !last.is_empty() { + names.insert(last); + } + } + } else if let Some(deps) = v.get("dependencies").and_then(|d| d.as_object()) { + for key in deps.keys() { + names.insert(key.clone()); + } + } + + Ok(names.into_iter().collect()) +} + +fn last_module_name(key: &str) -> String { + let parts: Vec<&str> = key.split("node_modules/").collect(); + let tail = parts.last().copied().unwrap_or(""); + tail.to_string() +} + +#[derive(Debug)] +struct ReqResult { + start: Instant, + end: Instant, + bytes: usize, + status: u16, +} + +async fn run_once(args: &Args, names: &[String], rep: usize) -> Result<()> { + // Build a fresh client per rep — matches hyperfine's cold-start + // assumption that each iteration pays the TLS handshake cost. + let client = build_client(args)?; + let registry = Arc::new(args.registry.trim_end_matches('/').to_string()); + let accept = Arc::new( + args.accept + .clone() + .unwrap_or_else(|| "application/vnd.npm.install-v1+json".to_string()), + ); + + let single_version = args.single_version; + let concurrency = args.concurrency; + + let phase_start = Instant::now(); + let mut futs = FuturesUnordered::new(); + let mut idx = 0usize; + let mut results: Vec = Vec::with_capacity(names.len()); + + while idx < names.len() && futs.len() < concurrency { + spawn_one( + &client, + ®istry, + &names[idx], + &accept, + single_version, + &mut futs, + ); + idx += 1; + } + + while let Some(res) = futs.next().await { + results.push(res); + if idx < names.len() { + spawn_one( + &client, + ®istry, + &names[idx], + &accept, + single_version, + &mut futs, + ); + idx += 1; + } + } + let phase_wall_ms = phase_start.elapsed().as_millis(); + + report(rep, &results, phase_wall_ms); + Ok(()) +} + +type Fut = std::pin::Pin + Send>>; + +fn spawn_one( + client: &reqwest::Client, + registry: &Arc, + name: &str, + accept: &Arc, + single_version: bool, + futs: &mut FuturesUnordered, +) { + let url = if single_version { + format!("{registry}/{name}/latest") + } else { + format!("{registry}/{name}") + }; + let client = client.clone(); + let accept = Arc::clone(accept); + futs.push(Box::pin(async move { + let start = Instant::now(); + let req = client.get(&url).header("accept", accept.as_str()).send(); + let (bytes, status) = match req.await { + Ok(resp) => { + let status = resp.status().as_u16(); + let body = resp.bytes().await.map(|b| b.len()).unwrap_or(0); + (body, status) + } + Err(_) => (0, 0), + }; + let end = Instant::now(); + ReqResult { + start, + end, + bytes, + status, + } + })); +} + +fn build_client(args: &Args) -> Result { + // Install aws-lc-rs as the default crypto provider (idempotent — + // first call wins). Matches ruborist's `service::http` setup. + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + + let mut roots = rustls::RootCertStore::empty(); + let native = rustls_native_certs::load_native_certs(); + for cert in native.certs { + let _ = roots.add(cert); + } + + let tls_config = rustls::ClientConfig::builder_with_provider(std::sync::Arc::new( + rustls::crypto::aws_lc_rs::default_provider(), + )) + .with_safe_default_protocol_versions() + .map_err(|e| anyhow!("rustls protocol versions: {e}"))? + .with_root_certificates(roots) + .with_no_client_auth(); + + let mut builder = reqwest::Client::builder() + .use_preconfigured_tls(tls_config) + .no_proxy() + .pool_max_idle_per_host(256); + if args.http1_only { + builder = builder.http1_only(); + } + if let Some(ua) = &args.user_agent { + builder = builder.user_agent(ua); + } + builder.build().context("build reqwest client") +} + +fn report(rep: usize, results: &[ReqResult], wall_ms: u128) { + if results.is_empty() { + eprintln!("[rep {rep}] no results"); + return; + } + + let mut spans: Vec<(Instant, Instant)> = results.iter().map(|r| (r.start, r.end)).collect(); + spans.sort_by_key(|(s, _)| *s); + + let first_start = spans.first().unwrap().0; + let last_end = spans.iter().map(|(_, e)| *e).max().unwrap(); + let win_wall = last_end.duration_since(first_start).as_millis(); + + let mut per_us: Vec = spans + .iter() + .map(|(s, e)| e.duration_since(*s).as_micros()) + .collect(); + per_us.sort_unstable(); + let n = per_us.len(); + let pct = |p: usize| per_us[(n * p).div_ceil(100).saturating_sub(1)]; + let sum: u128 = per_us.iter().sum(); + let p50 = per_us[n / 2]; + + let mut busy_us: u128 = 0; + let (mut cur_s, mut cur_e) = spans[0]; + for &(s, e) in &spans[1..] { + if s <= cur_e { + if e > cur_e { + cur_e = e; + } + } else { + busy_us += cur_e.duration_since(cur_s).as_micros(); + cur_s = s; + cur_e = e; + } + } + busy_us += cur_e.duration_since(cur_s).as_micros(); + + let bytes_total: usize = results.iter().map(|r| r.bytes).sum(); + let ok = results.iter().filter(|r| r.status == 200).count(); + let err = results.iter().filter(|r| r.status == 0).count(); + let four_xx = results + .iter() + .filter(|r| (400..500).contains(&r.status)) + .count(); + let five_xx = results + .iter() + .filter(|r| (500..600).contains(&r.status)) + .count(); + + let avg_conc = if busy_us > 0 { + sum as f64 / busy_us as f64 + } else { + 0.0 + }; + + println!( + "[rep {rep}] n={} phase_wall={}ms win_wall={}ms busy={}ms ({:.0}%) sum={}ms avg_conc={:.1} p50={}ms p95={}ms p99={}ms max={}ms bytes={} 200={} 4xx={} 5xx={} err={}", + n, + wall_ms, + win_wall, + busy_us / 1000, + if win_wall > 0 { + 100.0 * (busy_us as f64 / 1000.0) / win_wall as f64 + } else { + 0.0 + }, + sum / 1000, + avg_conc, + p50 / 1000, + pct(95) / 1000, + pct(99) / 1000, + per_us.last().unwrap() / 1000, + bytes_total, + ok, + four_xx, + five_xx, + err, + ); +} diff --git a/crates/pm/src/helper/ruborist_context.rs b/crates/pm/src/helper/ruborist_context.rs index b47def019..e9226243b 100644 --- a/crates/pm/src/helper/ruborist_context.rs +++ b/crates/pm/src/helper/ruborist_context.rs @@ -9,6 +9,9 @@ use utoo_ruborist::service::{ use crate::service::pipeline::{PipelineChannels, PipelineReceiver}; use crate::util::cache::get_cache_dir; use crate::util::logger::ProgressReceiver; +// EXPERIMENT: DiskManifestStore swapped for NoopStore (see manifest_store +// fn below), so the disk-backed store is unused on this branch. +#[allow(unused_imports)] use crate::util::manifest_store::DiskManifestStore; use crate::util::project_cache; use crate::util::user_config::{ @@ -40,8 +43,25 @@ pub(crate) type Registry = UnifiedRegistry; pub(crate) struct Context; impl Context { + /// EXPERIMENT (experiment/no-disk-cache branch): swap + /// `DiskManifestStore` for `NoopStore` so every + /// `store.load_versions` / `store.load_version_manifest` call in + /// `service::registry::UnifiedRegistry` returns `None` without + /// touching the filesystem, and every `store.store_*` call is a + /// no-op. Used to A/B test how much of utoo's p1/p3 wall comes + /// from the per-fetch disk-cache existence-check IO that the + /// registry layer issues alongside each manifest fetch. + /// + /// Affects ALL paths that build `BuildDepsOptions` via this + /// helper (`deps_options` → `pipeline_deps_options`, + /// `build_deps`). The new `mb_resolve` lockfile-only path + /// already bypasses `UnifiedRegistry` entirely, so it sees no + /// effect from this swap; the install path (which still goes + /// through `UnifiedRegistry` for the pipeline preload) does see + /// the difference, and so does any BFS edge that misses + /// `MemoryCache` and falls into `resolve_via_full_manifest`. fn manifest_store() -> Arc { - Arc::new(DiskManifestStore::new(get_cache_dir())) + Arc::new(utoo_ruborist::service::NoopStore) } /// Create BuildDepsOptions with a custom event receiver. @@ -63,6 +83,7 @@ impl Context { receiver, supports_semver: get_supports_semver(), catalogs, + skip_preload: false, } } @@ -82,8 +103,17 @@ impl Context { /// Resolve dependency tree with plain ProgressReceiver. Returns /// [`BuildDepsOutput`] (lock + project cache); the project cache is /// persisted in the background. + /// + /// Used by the lockfile-only path (`utoo deps`). With + /// `skip_preload=true`, ruborist's `service::api::build_deps` + /// internally routes through `mb_resolve::mb_fetch` — a + /// standalone manifest-bench-style preload that bypasses + /// `service::http` / `service::manifest` / `service::registry` + /// for the cold-cache lockfile-only workload. PM doesn't see + /// the dispatch. pub async fn build_deps(cwd: PathBuf) -> anyhow::Result { - let options = Self::deps_options(cwd.clone(), ProgressReceiver).await; + let mut options = Self::deps_options(cwd.clone(), ProgressReceiver).await; + options.skip_preload = true; let output = utoo_ruborist::service::build_deps(options).await?; spawn_save_project_cache(cwd, output.project_cache.clone()); Ok(output) diff --git a/crates/pm/src/util/manifest_store.rs b/crates/pm/src/util/manifest_store.rs index 7f9c61bb1..b1fee9818 100644 --- a/crates/pm/src/util/manifest_store.rs +++ b/crates/pm/src/util/manifest_store.rs @@ -19,10 +19,15 @@ use utoo_ruborist::service::{ManifestStore, VersionsInfo}; use crate::util::json::read_json_file; +// EXPERIMENT: ruborist_context swaps DiskManifestStore for NoopStore on +// this branch — type stays defined to keep the import path valid, but +// fields go unread. +#[allow(dead_code)] pub struct DiskManifestStore { cache_dir: PathBuf, } +#[allow(dead_code)] impl DiskManifestStore { pub fn new(cache_dir: PathBuf) -> Self { Self { cache_dir } @@ -75,6 +80,7 @@ impl ManifestStore for DiskManifestStore { /// Serialize `value` and write to `path`. On `NotFound`, create the parent /// directory and retry once — saves the mkdir syscall on every warm-cache /// rewrite. Errors are logged at debug; disk cache is opportunistic. +#[allow(dead_code)] async fn write_json(path: &Path, value: &T) { let bytes = match serde_json::to_vec(value) { Ok(b) => b, diff --git a/crates/pm/src/util/sysconf.rs b/crates/pm/src/util/sysconf.rs index af77a7745..645b7b451 100644 --- a/crates/pm/src/util/sysconf.rs +++ b/crates/pm/src/util/sysconf.rs @@ -6,13 +6,46 @@ pub fn init() { reset_sigpipe(); } - // Windows default thread stack is 1MB, insufficient for libdeflater + tar - // + rayon work-stealing. + init_rayon_pool(); +} + +/// Configure the global rayon pool size. +/// +/// Rayon defaults to `num_cpus` workers, which is 2 on GHA ubuntu-latest. +/// Two workers are enough for the install-path's `par_chunks(64)` extract +/// (mostly disk-bound), but the resolve-path's manifest parse + extract +/// pipeline runs *many* short CPU bursts (parse: ~5ms, get_core_version: +/// ~1-3ms) dispatched from up to 64 concurrent fetches. +/// +/// With pool=2, each fetch waits up to ~25ms in queue per dispatch — +/// fetch-breakdown instrumentation showed avg_parse jumping 5ms (CPU) +/// → 30ms (CPU + queue) just from the first dispatch. The second hop +/// (`extract_core_version_off_runtime`) has the same problem. `tokio +/// spawn_blocking` avoids the queue but its per-dispatch overhead +/// (round 3 measurement) was higher than rayon's queue wait at 64×. +/// +/// Sizing the pool above the host CPU count for these short, blocking +/// JSON-shape operations gives the queue a chance to drain even when +/// 64 fetches dispatch concurrently. The work itself is bounded — at +/// most 2 are doing real CPU at once on a 2-core box; the extra pool +/// slots just hold pending tasks until a CPU is free, replacing FIFO +/// queueing with parallel dispatch. +/// +/// Cap of 8 keeps the pool reasonable on bigger machines (where +/// `num_cpus` is already enough); the floor of 8 oversubscribes +/// only on the constrained 2-core CI image. +fn init_rayon_pool() { + let parallelism = std::thread::available_parallelism() + .map(std::num::NonZero::get) + .unwrap_or(2); + let threads = parallelism.max(8); + + let builder = rayon::ThreadPoolBuilder::new().num_threads(threads); + #[cfg(target_os = "windows")] - rayon::ThreadPoolBuilder::new() - .stack_size(8 * 1024 * 1024) - .build_global() - .ok(); + let builder = builder.stack_size(8 * 1024 * 1024); + + builder.build_global().ok(); } /// Restore default SIGPIPE handling so broken pipes cause a clean exit diff --git a/crates/pm/src/util/user_config.rs b/crates/pm/src/util/user_config.rs index 34ee45a34..f6924f5aa 100644 --- a/crates/pm/src/util/user_config.rs +++ b/crates/pm/src/util/user_config.rs @@ -132,9 +132,23 @@ pub fn get_install_scope() -> InstallScope { INSTALL_SCOPE.get().copied().unwrap_or_default() } -// Manifest fetch concurrency configuration +// Manifest fetch concurrency configuration. Default kept at 64. +// +// We tried 256 to match bun's observed parallel streams; on GHA the +// fetch-breakdown instrumentation showed sum_parse exploded from +// ~10ms (local Mac, network-bound) to 728s on first cold run with +// manifest-bench's HTTP-only sweep on GHA (npmjs, h1) bottoms out +// somewhere in the 96-128 band — which one wins varies with npmjs's +// per-IP latency on each run (good runs picked 128, slow-network +// runs flattened the curve and even regressed at 128 due to wider +// p99 from queued requests). 96 is the conservative pick: it's at +// or near best on every run we've measured, never the worst, and +// leaves headroom for npmjs to throttle without compounding queue +// time. Combined-parse fetch (671ac98e) made the spawn_blocking +// pool no longer a contention bottleneck, but didn't change the +// network-side variance — that's what caps the useful concurrency. static MANIFESTS_CONCURRENCY_LIMIT: LazyLock> = - LazyLock::new(|| ConfigValue::new("manifests-concurrency-limit", 64)); + LazyLock::new(|| ConfigValue::new("manifests-concurrency-limit", 96)); pub fn set_manifests_concurrency_limit(value: Option) { MANIFESTS_CONCURRENCY_LIMIT.set(value); diff --git a/crates/preload-bench/Cargo.toml b/crates/preload-bench/Cargo.toml new file mode 100644 index 000000000..9d37d7769 --- /dev/null +++ b/crates/preload-bench/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "preload-bench" +version = "0.0.0" +edition = "2024" +license = "MIT" +publish = false +description = "Self-contained streaming-with-transitive-walk manifest preload bench. Reproduces manifest-bench's standalone fetch loop but discovers transitive deps from package.json instead of consuming a flat name list. No dependency on ruborist or any utoo internals." + +[[bin]] +name = "preload-bench" +path = "src/main.rs" + +# tombi: format.rules.table-keys-order.disabled = true +[dependencies] +anyhow = { workspace = true } +clap = { workspace = true } +futures = "0.3" +serde = { version = "1", features = ["derive"] } +serde_json = { workspace = true } +simd-json = "0.17" +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "fs", "time"] } + +# Same TLS/DNS choices as manifest-bench so the only delta vs that bench +# is the transitive-walk loop. +reqwest = { version = "0.12", default-features = false, features = [ + "brotli", + "gzip", + "http2", + "rustls-tls-native-roots-no-provider", + "socks" +] } +rustls = { version = "0.23", default-features = false, features = [ + "aws-lc-rs", + "logging", + "std", + "tls12" +] } +rustls-native-certs = "0.8" diff --git a/crates/preload-bench/src/main.rs b/crates/preload-bench/src/main.rs new file mode 100644 index 000000000..46f917d19 --- /dev/null +++ b/crates/preload-bench/src/main.rs @@ -0,0 +1,505 @@ +//! Self-contained streaming preload bench with transitive walking. +//! +//! Same HTTP setup as `manifest-bench` (own `reqwest::Client` built +//! per rep with `aws-lc-rs` TLS, `pool_max_idle_per_host(256)`, no +//! proxy, default DNS, no retry). The only delta vs `manifest-bench` +//! is that this bench discovers names by walking transitive deps +//! from a `package.json` root, instead of consuming a flat name +//! list. +//! +//! Why a separate crate: ruborist's manifest-fetch path goes through +//! several service layers (custom DNS resolver, retry, cache, +//! single-flight gates, event receivers). Each layer might add +//! overhead. This bench bypasses all of them — same shape as +//! manifest-bench, just with a streaming `FuturesUnordered` that +//! refills from a pending queue extended by parsed transitive deps. +//! +//! Reports both the standalone preload wall and a per-rep eff_parallel +//! number so we can compare directly against manifest-bench's +//! `phase_wall` + `avg_conc` for the same workload. +//! +//! Output (one line per rep, matching manifest-bench shape): +//! [rep N] preload_wall=Xms n=Y bytes=Z avg_conc=N.N parse_sum=Wms 200=A 4xx=B err=C + +use std::collections::{HashMap, HashSet, VecDeque}; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +use anyhow::{Context, Result, anyhow}; +use clap::Parser; +use futures::stream::{FuturesUnordered, StreamExt}; +use serde::Deserialize; + +#[derive(Parser, Debug)] +#[command( + name = "preload-bench", + about = "Streaming preload bench with transitive walking (self-contained)" +)] +struct Args { + /// Registry base URL. + #[arg(long, default_value = "https://registry.npmjs.org")] + registry: String, + + /// Path to a `package.json` to walk from. Reads `dependencies` + + /// `devDependencies` + `optionalDependencies` as the initial seed. + #[arg(long)] + package_json: PathBuf, + + /// Maximum concurrent in-flight requests. + #[arg(long, default_value_t = 96)] + concurrency: usize, + + /// Number of times to repeat the whole walk (fresh client per rep). + #[arg(long, default_value_t = 4)] + reps: usize, + + /// Force HTTP/1.1. + #[arg(long, default_value_t = true)] + http1_only: bool, + + /// Override `User-Agent`. + #[arg(long)] + user_agent: Option, + + /// Include `peerDependencies` when walking transitives. Off by + /// default (matches utoo's default). + #[arg(long)] + include_peer: bool, +} + +#[derive(Deserialize)] +struct PackageJson { + #[serde(default)] + dependencies: HashMap, + #[serde(default, rename = "devDependencies")] + dev_dependencies: HashMap, + #[serde(default, rename = "optionalDependencies")] + optional_dependencies: HashMap, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + let raw = std::fs::read_to_string(&args.package_json) + .with_context(|| format!("read {:?}", args.package_json))?; + let pkg: PackageJson = serde_json::from_str(&raw).context("parse package.json")?; + let initial: Vec<(String, String)> = pkg + .dependencies + .into_iter() + .chain(pkg.dev_dependencies) + .chain(pkg.optional_dependencies) + .filter(|(_, spec)| is_registry_spec(spec)) + .collect(); + + println!( + "preload-bench: registry={} concurrency={} reps={} initial={} h1_only={} ua={} include_peer={}", + args.registry, + args.concurrency, + args.reps, + initial.len(), + args.http1_only, + args.user_agent.as_deref().unwrap_or(""), + args.include_peer, + ); + + for rep in 1..=args.reps { + run_once(&args, &initial, rep).await?; + } + + Ok(()) +} + +/// Quick registry-spec check (a `^...` / `~...` / `latest` / etc). +/// Excludes `file:`, `link:`, `workspace:`, `git+`, `https://`, and +/// `/` shorthand. Same intent as ruborist's +/// `SpecStr::is_registry_spec` but inlined to keep this crate +/// dependency-free. +fn is_registry_spec(spec: &str) -> bool { + if spec.is_empty() { + return true; // bare entries default to "*" + } + let lower = spec.to_ascii_lowercase(); + if lower.starts_with("file:") + || lower.starts_with("link:") + || lower.starts_with("workspace:") + || lower.starts_with("portal:") + || lower.starts_with("git+") + || lower.starts_with("git://") + || lower.starts_with("github:") + || lower.starts_with("https://") + || lower.starts_with("http://") + { + return false; + } + // `/` shorthand — exactly one '/' and no '@' prefix on + // first segment (rules out scoped names like `@scope/pkg`). + if let Some((head, tail)) = spec.split_once('/') + && !head.starts_with('@') + && !tail.is_empty() + && !tail.contains('/') + { + return false; + } + true +} + +#[derive(Debug, Default)] +struct RepStats { + n: usize, + bytes: usize, + parse_sum_us: u128, + busy_us: u128, + sum_us: u128, + ok_200: usize, + err_4xx: usize, + err_other: usize, +} + +async fn run_once(args: &Args, initial: &[(String, String)], rep: usize) -> Result<()> { + let client = build_client(args)?; + let registry = Arc::new(args.registry.trim_end_matches('/').to_string()); + let concurrency = args.concurrency; + let include_peer = args.include_peer; + + let phase_start = Instant::now(); + let mut stats = RepStats::default(); + + // (name, spec) dedup — same shape as ruborist's seen_specs but + // self-contained. We dedup the *spec* level because two specs on + // the same name might resolve to different versions. + let mut seen: HashSet<(String, String)> = HashSet::new(); + let mut pending: VecDeque<(String, String)> = VecDeque::new(); + for (name, spec) in initial { + if seen.insert((name.clone(), spec.clone())) { + pending.push_back((name.clone(), spec.clone())); + } + } + + // Sibling-fetch dedup: when two specs for the same name are both + // pending, only one fetch is issued; subsequent specs settle from + // the cached body. Keyed by name. Maps name → cached parsed body + // (`Arc>`) once the first fetch lands. + let body_cache: Arc>>>> = + Arc::new(std::sync::Mutex::new(HashMap::new())); + let mut in_flight_names: HashSet = HashSet::new(); + let mut deferred_by_name: HashMap> = HashMap::new(); + + let mut futs: FuturesUnordered = FuturesUnordered::new(); + + loop { + while futs.len() < concurrency { + let Some((name, spec)) = pending.pop_front() else { + break; + }; + + // If the body is already cached (sibling spec for an + // already-fetched name), spawn a settle-only future. + if let Some(raw) = body_cache.lock().unwrap().get(&name).cloned() { + let n = name.clone(); + let s = spec.clone(); + let fut: Fut = Box::pin(settle_only(n, s, raw, include_peer)); + futs.push(fut); + continue; + } + + // First time seeing this name: fetch + settle. Stash any + // sibling specs that arrive while in-flight. + if !in_flight_names.insert(name.clone()) { + deferred_by_name.entry(name).or_default().push(spec); + continue; + } + + spawn_fetch( + &client, + ®istry, + name, + spec, + Arc::clone(&body_cache), + include_peer, + &mut futs, + ); + } + + if futs.is_empty() { + break; + } + + let Some(out) = futs.next().await else { break }; + stats.n += 1; + stats.busy_us += out.busy_us; + stats.sum_us += out.sum_us; + stats.parse_sum_us += out.parse_us; + stats.bytes += out.bytes; + match out.status { + 200 => stats.ok_200 += 1, + 400..=499 => stats.err_4xx += 1, + _ => stats.err_other += 1, + } + + // Drain sibling specs for this name now that body is cached. + if out.fetched + && let Some(siblings) = deferred_by_name.remove(&out.name) + && let Some(raw) = body_cache.lock().unwrap().get(&out.name).cloned() + { + for sibling_spec in siblings { + let n = out.name.clone(); + let r = Arc::clone(&raw); + let fut: Fut = Box::pin(settle_only(n, sibling_spec, r, include_peer)); + futs.push(fut); + } + } + + // Extend pending with new transitives, dedup by (name, spec). + for (name, spec) in out.transitives { + if seen.insert((name.clone(), spec.clone())) { + pending.push_back((name, spec)); + } + } + } + + let phase_wall_ms = phase_start.elapsed().as_millis(); + let parse_sum_ms = stats.parse_sum_us / 1000; + // avg_conc = sum_request_us / busy_window_us. busy_us isn't a true + // merged-interval here (we don't track per-req start/end timestamps + // for that), so use phase_wall as the denominator — slightly + // pessimistic but consistent. + let avg_conc = if phase_wall_ms > 0 { + stats.sum_us as f64 / 1000.0 / phase_wall_ms as f64 + } else { + 0.0 + }; + + println!( + "[rep {rep}] preload_wall={phase_wall_ms}ms n={} bytes={} parse_sum={parse_sum_ms}ms avg_conc={avg_conc:.1} 200={} 4xx={} err={}", + stats.n, stats.bytes, stats.ok_200, stats.err_4xx, stats.err_other, + ); + Ok(()) +} + +#[derive(Debug)] +struct FetchOutcome { + name: String, + /// `(name, spec)` transitive deps unfolded by parsing the resolved + /// version's `dependencies` / `optionalDependencies` (and + /// optionally `peerDependencies`). + transitives: Vec<(String, String)>, + /// `true` if this future fetched the body (vs settle-only on a + /// cached body); only fetchers populate `body_cache` and trigger + /// sibling drain. + fetched: bool, + /// HTTP status code (200 / 4xx / 5xx / 0 on transport error). + status: u16, + /// Body byte count (0 on error). + bytes: usize, + /// Self-reported per-future busy_us — `end - start`. Approximate. + busy_us: u128, + /// Sum of all per-future durations summed by the main loop. + sum_us: u128, + /// Parse work done inside this future (for accounting). + parse_us: u128, +} + +type Fut = std::pin::Pin + Send>>; + +fn spawn_fetch( + client: &reqwest::Client, + registry: &Arc, + name: String, + spec: String, + body_cache: Arc>>>>, + include_peer: bool, + futs: &mut FuturesUnordered, +) { + let url = format!("{}/{}", registry, name); + let client = client.clone(); + let fut: Fut = Box::pin(async move { + let start = Instant::now(); + let req = client + .get(&url) + .header("accept", "application/vnd.npm.install-v1+json") + .send(); + let (raw_bytes, status) = match req.await { + Ok(resp) => { + let status = resp.status().as_u16(); + let body = resp.bytes().await.map(|b| b.to_vec()).unwrap_or_default(); + (body, status) + } + Err(_) => (Vec::new(), 0), + }; + let bytes = raw_bytes.len(); + + let (parse_us, transitives) = if status == 200 && !raw_bytes.is_empty() { + let raw_arc = Arc::new(raw_bytes); + body_cache + .lock() + .unwrap() + .insert(name.clone(), Arc::clone(&raw_arc)); + // Move the Arc> into spawn_blocking; the parser + // mutates a clone, so the cached copy is unaffected. + let spec_for_parse = spec.clone(); + let parse_start = Instant::now(); + let result = tokio::task::spawn_blocking(move || { + parse_and_extract(&raw_arc, &spec_for_parse, include_peer) + }) + .await + .ok() + .flatten() + .unwrap_or_default(); + (parse_start.elapsed().as_micros(), result) + } else { + (0, Vec::new()) + }; + + let end = Instant::now(); + let busy_us = end.duration_since(start).as_micros(); + FetchOutcome { + name, + transitives, + fetched: true, + status, + bytes, + busy_us, + sum_us: busy_us, + parse_us, + } + }); + futs.push(fut); +} + +async fn settle_only( + name: String, + spec: String, + raw: Arc>, + include_peer: bool, +) -> FetchOutcome { + let start = Instant::now(); + let parse_start = start; + let transitives = tokio::task::spawn_blocking(move || { + parse_and_extract(&raw, &spec, include_peer).unwrap_or_default() + }) + .await + .unwrap_or_default(); + let parse_us = parse_start.elapsed().as_micros(); + let end = Instant::now(); + let busy_us = end.duration_since(start).as_micros(); + FetchOutcome { + name, + transitives, + fetched: false, + status: 200, + bytes: 0, + busy_us, + sum_us: busy_us, + parse_us, + } +} + +/// Parse a manifest body, resolve `spec` against the version list, +/// extract that version's transitive deps. Single +/// `simd_json::to_borrowed_value` pass for the whole body — same as +/// ruborist's combined-parse path, but inlined here so this crate +/// has no ruborist dependency. +fn parse_and_extract( + raw: &Arc>, + spec: &str, + include_peer: bool, +) -> Option> { + use simd_json::prelude::{ValueAsObject, ValueObjectAccess}; + + let mut buf = (**raw).clone(); + let parsed = simd_json::to_borrowed_value(&mut buf).ok()?; + + let dist_tags: HashMap = parsed + .get("dist-tags") + .and_then(|v| HashMap::::deserialize(v).ok()) + .unwrap_or_default(); + let versions_obj = parsed.get("versions").and_then(ValueAsObject::as_object)?; + + // Resolve spec. Three cases: dist-tag match, exact-version key, or + // semver range (we approximate with "first version that satisfies" + // — preload-bench is a measurement tool, not a real resolver, so + // we tolerate slight selection differences vs ruborist for the + // purpose of timing the network path). + let resolved = if let Some(via_tag) = dist_tags.get(spec) { + via_tag.clone() + } else if versions_obj.contains_key(spec) { + spec.to_string() + } else if let Some(latest) = dist_tags.get("latest") + && spec_satisfied_by(spec, latest) + { + latest.clone() + } else { + // Last-resort: pick the lexicographically-largest version. Not + // semver-correct but bounded by the version set, and good + // enough for timing. + versions_obj.keys().max().map(|k| k.to_string())? + }; + + let version_obj = versions_obj.get(resolved.as_str())?; + let mut out: Vec<(String, String)> = Vec::new(); + + if let Some(deps) = version_obj.get("dependencies") + && let Ok(map) = HashMap::::deserialize(deps) + { + out.extend(map.into_iter().filter(|(_, s)| is_registry_spec(s))); + } + if include_peer + && let Some(deps) = version_obj.get("peerDependencies") + && let Ok(map) = HashMap::::deserialize(deps) + { + out.extend(map.into_iter().filter(|(_, s)| is_registry_spec(s))); + } + if let Some(deps) = version_obj.get("optionalDependencies") + && let Ok(map) = HashMap::::deserialize(deps) + { + out.extend(map.into_iter().filter(|(_, s)| is_registry_spec(s))); + } + Some(out) +} + +/// Crude semver-satisfies check: only handles `^X.Y.Z` and `~X.Y.Z` +/// against an exact target. Sufficient for "does latest satisfy spec" +/// in this measurement context — full semver is in the resolver, not +/// the bench. +fn spec_satisfied_by(spec: &str, target: &str) -> bool { + let s = spec.trim(); + let body = s + .strip_prefix('^') + .or_else(|| s.strip_prefix('~')) + .unwrap_or(s); + target.starts_with(body) || target == body +} + +fn build_client(args: &Args) -> Result { + // Install aws-lc-rs as the default crypto provider (idempotent — + // first call wins). Same setup as manifest-bench. + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + + let mut roots = rustls::RootCertStore::empty(); + let native = rustls_native_certs::load_native_certs(); + for cert in native.certs { + let _ = roots.add(cert); + } + + let tls_config = rustls::ClientConfig::builder_with_provider(std::sync::Arc::new( + rustls::crypto::aws_lc_rs::default_provider(), + )) + .with_safe_default_protocol_versions() + .map_err(|e| anyhow!("rustls protocol versions: {e}"))? + .with_root_certificates(roots) + .with_no_client_auth(); + + let mut builder = reqwest::Client::builder() + .use_preconfigured_tls(tls_config) + .no_proxy() + .pool_max_idle_per_host(256); + if args.http1_only { + builder = builder.http1_only(); + } + if let Some(ua) = &args.user_agent { + builder = builder.user_agent(ua); + } + builder.build().context("build reqwest client") +} diff --git a/crates/ruborist/src/model/manifest.rs b/crates/ruborist/src/model/manifest.rs index 37e95deb9..3509e839d 100644 --- a/crates/ruborist/src/model/manifest.rs +++ b/crates/ruborist/src/model/manifest.rs @@ -163,6 +163,14 @@ pub async fn extract_core_version_off_runtime( full: Arc, version: String, ) -> (String, Option>) { + // Round 3 attempted to switch this to `tokio::task::spawn_blocking` + // for the same reasons as `parse_json_off_runtime`, but CI showed + // it regressed p1 by 0.5s on `preload_wall`. Mechanism: this + // function is called per (name, spec), so packages with multiple + // specs (e.g. peer-dep range overlaps) call it 2-5x per fetch. + // spawn_blocking's per-dispatch overhead (channel + thread wake) + // is significant for short CPU work; with the multiplier this + // outweighed rayon queue waits at conc=64. Keep on rayon::spawn. #[cfg(not(target_arch = "wasm32"))] { let (tx, rx) = tokio::sync::oneshot::channel(); diff --git a/crates/ruborist/src/resolver/builder.rs b/crates/ruborist/src/resolver/builder.rs index b0bf2794c..156622502 100644 --- a/crates/ruborist/src/resolver/builder.rs +++ b/crates/ruborist/src/resolver/builder.rs @@ -18,13 +18,13 @@ //! This separation allows for maximum parallelism during network I/O //! while keeping the graph building logic simple and deterministic. -use petgraph::graph::NodeIndex; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::path::PathBuf; use std::sync::Arc; #[cfg(feature = "http-tarball")] use anyhow::Context as _; +use petgraph::graph::NodeIndex; use crate::model::graph::{DependencyGraph, FindResult, PackageNode}; use crate::model::manifest::NodeManifest; @@ -32,7 +32,7 @@ use crate::model::node::EdgeType; use crate::model::package_json::PackageJson; use crate::resolver::preload::{PreloadConfig, preload_manifests}; use crate::resolver::registry::{ResolveError, resolve_registry_dep}; -use crate::spec::{Catalogs, PackageSpec, Protocol}; +use crate::spec::{Catalogs, PackageSpec, Protocol, SpecStr}; use crate::traits::progress::{BuildEvent, EventReceiver, NoopReceiver}; use crate::traits::registry::{RegistryClient, ResolvedPackage}; @@ -180,10 +180,10 @@ struct NodeFlags { /// Only registry specs (e.g. `^4.17.0`) are collected. `catalog:` specs are /// resolved at edge creation time, so by the time this runs they are already /// concrete registry specs. -fn gather_preload_deps(graph: &DependencyGraph, peer_deps: PeerDeps) -> Vec<(String, String)> { - use crate::spec::SpecStr; - use std::collections::HashSet; - +pub(crate) fn gather_preload_deps( + graph: &DependencyGraph, + peer_deps: PeerDeps, +) -> Vec<(String, String)> { let mut deps = HashSet::new(); let collect = |node_index: NodeIndex, deps: &mut HashSet<(String, String)>| { @@ -756,6 +756,7 @@ async fn run_preload_phase( return; } + crate::util::FETCH_TIMINGS.reset(); let start = tokio::time::Instant::now(); let initial_deps = gather_preload_deps(graph, config.peer_deps); @@ -794,10 +795,27 @@ async fn run_preload_phase( failed: stats.failed_count, }); - tracing::debug!("Preload phase: {:?}", start.elapsed()); + let preload_elapsed = start.elapsed(); + tracing::debug!("Preload phase: {:?}", preload_elapsed); + tracing::info!( + "p1-breakdown preload_wall={}ms | {}", + preload_elapsed.as_millis(), + crate::util::FETCH_TIMINGS.snapshot().summary_line(), + ); } /// Run the BFS traversal phase to build the dependency tree. +/// +/// Each level does a parallel prefetch of all unresolved registry specs +/// before the sequential `process_dependency` walk. +/// +/// When `skip_preload=true` (lockfile-only path), the caller is +/// expected to have already populated `registry.cache()` via +/// [`super::fast_preload::fast_preload`], so this BFS sees only +/// cache hits. When `skip_preload=false` (install paths), the +/// receiver-driven [`super::preload::preload_manifests`] runs ahead +/// of this phase and feeds `BuildEvent::PackageResolved` to the +/// pipeline. async fn run_bfs_phase( graph: &mut DependencyGraph, registry: &R, @@ -805,13 +823,13 @@ async fn run_bfs_phase( receiver: &E, ) -> Result<(), ResolveError> { let start = tokio::time::Instant::now(); - let mut current_level = vec![graph.root_index]; while !current_level.is_empty() { receiver.on_event(BuildEvent::LevelStart { node_count: current_level.len(), }); + let mut next_level = Vec::new(); for node_index in current_level { @@ -896,7 +914,13 @@ async fn run_bfs_phase( current_level = next_level; } - tracing::debug!("Build phase: {:?}", start.elapsed()); + let bfs_elapsed = start.elapsed(); + tracing::debug!("Build phase: {:?}", bfs_elapsed); + tracing::info!( + "p1-breakdown bfs_wall={}ms | {}", + bfs_elapsed.as_millis(), + crate::util::FETCH_TIMINGS.snapshot().summary_line(), + ); Ok(()) } diff --git a/crates/ruborist/src/resolver/fast_preload.rs b/crates/ruborist/src/resolver/fast_preload.rs new file mode 100644 index 000000000..d049321d8 --- /dev/null +++ b/crates/ruborist/src/resolver/fast_preload.rs @@ -0,0 +1,362 @@ +//! Lean parallel manifest fetcher modeled on `manifest-bench`. +//! +//! Bypasses [`crate::service::registry::UnifiedRegistry`] — and therefore +//! its `OnceMap` gates, [`crate::service::store::ManifestStore`] writes, +//! and `EventReceiver` event dispatch — to drive a flat +//! `FuturesUnordered` over [`crate::service::manifest::fetch_full_manifest`] +//! plus a fused-into-fetch primary settle. The warm +//! [`crate::service::cache::MemoryCache`] it leaves behind makes the +//! subsequent BFS phase a pure cache-hit walk: no network, no rayon +//! re-parse hop on `extract_core_version`. +//! +//! Intended for the lockfile-only path (`utoo deps`) which has no +//! pipeline consumer for `BuildEvent::PackageResolved` — install paths +//! still go through [`super::preload::preload_manifests`] so the +//! pipeline keeps its early-start signal. +//! +//! ## Why settle is fused into the fetch task +//! +//! A "settle" turns a freshly-fetched `FullManifest` plus a spec into a +//! `CoreVersionManifest` for one version, via `simd_json::to_borrowed_value` +//! over the manifest's raw bytes. That parse is 5–10ms per spec on a +//! 100KB body. +//! +//! v1 ran settle inline on the tokio runtime worker — that starved +//! sibling fetches' I/O drive (CI showed `avg_request` +3ms, +//! `avg_parse` 5→11ms). v2 dispatched settle to rayon via a separate +//! `FuturesUnordered` future, which fixed the runtime starvation but +//! introduced a dispatch RTT: fetch lands → rayon settle queued → settle +//! pops → `pending` finally gets transitive deps. That round-trip held +//! the wave-shaped transitive walk back, capping `eff_parallel` at ~44 +//! against a 96 cap. +//! +//! v3 (this) folds the primary settle into the fetch task itself via +//! `tokio::task::spawn_blocking`. The fetch task awaits both the +//! network round-trip and the version-extract on the same blocking +//! pool slot, then returns with the resolved `CoreVersionManifest` +//! attached. The main loop pulls a single `Fetched` event and +//! immediately extends `pending` — no separate settle pop. Sibling +//! specs (rare; same package, different range) still go through a +//! `Settled` future to keep the primary path lean. + +use std::collections::{HashMap, HashSet, VecDeque}; +use std::pin::Pin; +use std::sync::Arc; + +use futures::future::BoxFuture; +use futures::stream::{FuturesUnordered, StreamExt}; + +use crate::model::manifest::{CoreVersionManifest, FullManifest, extract_core_version_off_runtime}; +use crate::model::node::PeerDeps; +use crate::resolver::preload::{Dep, PreloadConfig}; +use crate::resolver::version::resolve_target_version; +use crate::service::{ + FetchManifestOptions, FetchWithSettleResult, MemoryCache, MetadataFormat, + fetch_full_manifest_with_settle, +}; +use crate::spec::SpecStr; +use crate::util::FETCH_TIMINGS; + +/// Statistics from the lean fetch loop. Mirrors `PreloadStats` shape so +/// the bench-grep regex stays the same. +#[derive(Debug, Default)] +pub struct FastPreloadStats { + pub success_count: usize, + pub failed_count: usize, + pub fetched_names: usize, + pub min_request_ms: u64, + pub max_request_ms: u64, + pub total_request_ms: u64, +} + +/// One fetch's primary settle outcome — the resolved version + parsed +/// `CoreVersionManifest` for the spec the fetch was originally issued +/// for. `None` means the spec didn't match any version (caller treats +/// as soft skip). +type PrimarySettle = Option<(String, Arc)>; + +/// Outcome of a fetch task. Owning `Arc` (rather than +/// `FetchManifestResult` by-value) means the fetch task can `Arc::clone` +/// once for the primary settle, then pass ownership along — no full +/// `FullManifest` clone (which would copy the 200-entry `time` +/// HashMap + the `versions` `Vec` per fetch). +enum FetchOutcome { + Ok(Arc), + NotModified, + Err, +} + +/// Output of one in-flight future. The main loop merges fetch and +/// sibling-settle completions through a single `FuturesUnordered`. +enum FastEvent { + Fetched { + name: String, + primary_spec: String, + outcome: FetchOutcome, + primary_settle: PrimarySettle, + elapsed_ms: u64, + }, + Settled { + new_deps: Vec, + }, +} + +type FastFut = Pin + Send>>; + +/// Collect dependencies from any deps map, filtering out non-registry specs. +fn collect_deps(map: Option<&HashMap>) -> Vec { + map.into_iter() + .flatten() + .filter(|(_, spec)| spec.is_registry_spec()) + .map(|(name, spec)| (name.clone(), spec.clone())) + .collect() +} + +/// Extract transitive dependencies from a resolved manifest. +/// devDependencies are omitted (only the root installs devDeps). +fn extract_transitive_deps(manifest: &CoreVersionManifest, peer_deps: PeerDeps) -> Vec { + let mut deps = Vec::new(); + deps.extend(collect_deps(manifest.dependencies.as_ref())); + if peer_deps == PeerDeps::Include { + deps.extend(collect_deps(manifest.peer_dependencies.as_ref())); + } + deps.extend(collect_deps(manifest.optional_dependencies.as_ref())); + deps +} + +/// Off-runtime settle for a `(name, spec)` whose `FullManifest` is +/// already cached. Used for sibling specs — multiple ranges on the +/// same package — that arrive after the primary fetch has landed. +fn settle_future( + name: String, + spec: String, + full: Arc, + cache: MemoryCache, + peer_deps: PeerDeps, +) -> BoxFuture<'static, FastEvent> { + Box::pin(async move { + let resolved_version = match resolve_target_version((&*full).into(), &spec) { + Ok(v) => v, + Err(_) => return FastEvent::Settled { new_deps: vec![] }, + }; + if let Some(cached) = cache.get_version_manifest(&name, &resolved_version) { + cache.set_version_manifest(name.clone(), spec.clone(), Arc::clone(&cached)); + return FastEvent::Settled { + new_deps: extract_transitive_deps(&cached, peer_deps), + }; + } + let (resolved_version, core) = + extract_core_version_off_runtime(Arc::clone(&full), resolved_version).await; + let new_deps = match core { + Some(core_arc) => { + cache.set_version_manifest(name.clone(), spec.clone(), Arc::clone(&core_arc)); + cache.set_version_manifest(name, resolved_version, Arc::clone(&core_arc)); + extract_transitive_deps(&core_arc, peer_deps) + } + None => Vec::new(), + }; + FastEvent::Settled { new_deps } + }) +} + +/// Manifest-bench-style flat parallel fetch of all transitively-reachable +/// registry manifests. Populates `cache` with both `full_manifests` and +/// `version_manifests` slots so the subsequent BFS does no network and no +/// re-parse. +/// +/// `initial_deps` should already be the union of root+workspace +/// registry edges, with non-registry specs filtered out. +pub async fn fast_preload( + initial_deps: Vec, + registry_url: &str, + cache: &MemoryCache, + config: &PreloadConfig, +) -> FastPreloadStats { + let mut stats = FastPreloadStats::default(); + let mut pending: VecDeque = VecDeque::from(initial_deps); + // Specs we've already enqueued. Prevents duplicate settles from + // re-walking the same transitive subtree. + let mut seen_specs: HashSet<(String, String)> = HashSet::new(); + // Names whose full manifest is in flight or already cached. + let mut fetched_names: HashSet = HashSet::new(); + // Sibling specs that arrived while their package's full manifest + // was still in flight. The fetch's completion handler dispatches + // settles for them, then drains this bucket. + let mut deferred_by_name: HashMap> = HashMap::new(); + let mut futs: FuturesUnordered = FuturesUnordered::new(); + let concurrency = config.concurrency; + let peer_deps = config.peer_deps; + + loop { + while futs.len() < concurrency { + let Some((name, spec)) = pending.pop_front() else { + break; + }; + if !seen_specs.insert((name.clone(), spec.clone())) { + continue; + } + + // Hot path: a sibling spec for this name has already + // returned, so the full manifest is cached. Settle on + // rayon (off-runtime) — keeps the primary fetch path + // (next branch) clean. + if let Some(full) = cache.get_full_manifest(&name) { + futs.push(Box::pin(settle_future( + name, + spec, + full, + cache.clone(), + peer_deps, + ))); + continue; + } + + // A fetch for this name is already in flight: stash this + // sibling spec; the fetch's completion handler will + // dispatch a settle for it. + if !fetched_names.insert(name.clone()) { + deferred_by_name.entry(name).or_default().push(spec); + continue; + } + + let registry_url = registry_url.to_string(); + let primary_spec = spec.clone(); + let n = name.clone(); + futs.push(Box::pin(async move { + let start = tokio::time::Instant::now(); + // Combined fetch + envelope parse + primary settle in + // a single `to_borrowed_value` pass — replaces the old + // pattern of typed-serde envelope parse followed by a + // separate `to_borrowed_value` reparse for version + // extraction. Halves simd_json work per fetch. + let result = fetch_full_manifest_with_settle( + FetchManifestOptions { + registry_url: ®istry_url, + name: &n, + format: MetadataFormat::Abbreviated, + etag: None, + }, + &primary_spec, + ) + .await; + let elapsed_ms = start.elapsed().as_millis() as u64; + let (outcome, primary_settle) = match result { + Ok(FetchWithSettleResult::Ok(payload)) => { + let full_arc = Arc::new(payload.manifest); + (FetchOutcome::Ok(full_arc), payload.primary_settle) + } + Ok(FetchWithSettleResult::NotModified) => (FetchOutcome::NotModified, None), + Err(e) => { + tracing::debug!("fast_preload failed for {}: {}", n, e); + (FetchOutcome::Err, None) + } + }; + FastEvent::Fetched { + name, + primary_spec, + outcome, + primary_settle, + elapsed_ms, + } + })); + } + + if futs.is_empty() { + break; + } + + let Some(event) = futs.next().await else { + break; + }; + + match event { + FastEvent::Fetched { + name, + primary_spec, + outcome, + primary_settle, + elapsed_ms, + } => { + if stats.success_count == 0 && stats.failed_count == 0 { + stats.min_request_ms = elapsed_ms; + stats.max_request_ms = elapsed_ms; + } else { + stats.min_request_ms = stats.min_request_ms.min(elapsed_ms); + stats.max_request_ms = stats.max_request_ms.max(elapsed_ms); + } + stats.total_request_ms += elapsed_ms; + + match outcome { + FetchOutcome::Ok(full_arc) => { + stats.success_count += 1; + stats.fetched_names += 1; + cache.set_full_manifest(name.clone(), Arc::clone(&full_arc)); + + // Apply the primary settle (already done inside + // the fetch task via spawn_blocking) — populate + // both `(name, primary_spec)` and + // `(name, resolved_version)` cache slots so BFS + // hits the early-return at registry.rs:347 on + // its first probe, then extend `pending` with + // the spec's transitive deps. + if let Some((resolved_version, core_arc)) = primary_settle { + cache.set_version_manifest( + name.clone(), + primary_spec, + Arc::clone(&core_arc), + ); + cache.set_version_manifest( + name.clone(), + resolved_version, + Arc::clone(&core_arc), + ); + pending.extend(extract_transitive_deps(&core_arc, peer_deps)); + } + + // Sibling specs that were stashed while the + // fetch was in flight: dispatch each as a + // separate settle future. + if let Some(siblings) = deferred_by_name.remove(&name) { + for sibling_spec in siblings { + futs.push(Box::pin(settle_future( + name.clone(), + sibling_spec, + Arc::clone(&full_arc), + cache.clone(), + peer_deps, + ))); + } + } + } + FetchOutcome::NotModified | FetchOutcome::Err => { + // 304 is unreachable in practice (no ETag sent); + // both branches treated as soft failure. + stats.failed_count += 1; + } + } + } + FastEvent::Settled { new_deps } => { + pending.extend(new_deps); + } + } + } + + let total = stats.success_count + stats.failed_count; + let avg_ms = if total > 0 { + stats.total_request_ms / total as u64 + } else { + 0 + }; + tracing::info!( + "p1-breakdown fast_preload n={} ok={} fail={} avg_req={}ms min={}ms max={}ms | {}", + total, + stats.success_count, + stats.failed_count, + avg_ms, + stats.min_request_ms, + stats.max_request_ms, + FETCH_TIMINGS.snapshot().summary_line(), + ); + + stats +} diff --git a/crates/ruborist/src/resolver/mb_resolve.rs b/crates/ruborist/src/resolver/mb_resolve.rs new file mode 100644 index 000000000..7e1376330 --- /dev/null +++ b/crates/ruborist/src/resolver/mb_resolve.rs @@ -0,0 +1,380 @@ +//! Standalone manifest preload for the lockfile-only path. +//! +//! Mirrors `crates/preload-bench`'s loop shape verbatim, but lives +//! inside ruborist so it can populate `MemoryCache` for the BFS phase +//! to read. Used by `service::api::build_deps` whenever the caller +//! has `skip_preload=true` and no warm project cache — i.e. the +//! `utoo deps` (lockfile-only) path. +//! +//! Bypasses every other ruborist service layer: +//! * `service::http::get_client` — own `reqwest::Client` built per +//! call, no global LazyLock, no `dns_resolver(shared_resolver)`, +//! no `connect_timeout`, `pool_max_idle_per_host(256)` matching +//! `preload-bench` / `manifest-bench`. +//! * `service::manifest::fetch_full_manifest_with_settle` — own +//! `reqwest::get + body.bytes() + spawn_blocking(simd_json +//! to_borrowed_value)`, no `RetryIf`, no `FETCH_TIMINGS`. +//! * `service::registry::UnifiedRegistry` — no `OnceMap` inflight +//! gates, no `ManifestStore`, no `EventReceiver`. +//! +//! The only `service::*` touched is `MemoryCache::set_full_manifest` +//! and `MemoryCache::set_version_manifest` — thin DashMap wrappers +//! the BFS phase reads from. Without that, BFS would have nothing to +//! resolve against. +//! +//! Why a separate path: same-run CI data shows `preload-bench` +//! (self-contained, transitive walk, 4153 fetches) lands at ~2.57s +//! while ruborist's existing `fast_preload` path (combined parse via +//! service layers, 2733 fetches) lands at ~2.67s on the same network +//! — so on a per-fetch basis the service-layer path is ~50 % slower. +//! Removing the layers should close that gap. + +use std::collections::{HashMap, HashSet, VecDeque}; +use std::pin::Pin; +use std::sync::Arc; +use std::time::Instant; + +use anyhow::{Context, Result}; +use futures::stream::{FuturesUnordered, StreamExt}; +use parking_lot::Mutex; +use serde::Deserialize; + +use crate::model::manifest::{CoreVersionManifest, FullManifest}; +use crate::model::node::PeerDeps; +use crate::resolver::preload::{Dep, PreloadConfig}; +use crate::resolver::version::resolve_target_version; +use crate::service::MemoryCache; +use crate::spec::SpecStr; + +#[derive(Debug, Default)] +pub struct MbFetchStats { + pub success: usize, + pub fail: usize, +} + +/// Build a fresh `reqwest::Client` matching `preload-bench` / +/// `manifest-bench` exactly, except for the TLS provider — those +/// benches use aws-lc-rs but we keep ruborist's existing default +/// rustls (ring on Linux). If A/B data shows TLS is the remaining +/// gap, we'll add the aws-lc-rs deps separately. +fn build_mb_client() -> Result { + reqwest::Client::builder() + .no_proxy() + .pool_max_idle_per_host(256) + .http1_only() + .build() + .context("build reqwest client for mb_resolve") +} + +/// Collect deps from a deps map, filtering non-registry specs. +fn collect_deps(map: Option<&HashMap>) -> Vec { + map.into_iter() + .flatten() + .filter(|(_, spec)| spec.is_registry_spec()) + .map(|(name, spec)| (name.clone(), spec.clone())) + .collect() +} + +fn extract_transitive(manifest: &CoreVersionManifest, peer_deps: PeerDeps) -> Vec { + let mut out = Vec::new(); + out.extend(collect_deps(manifest.dependencies.as_ref())); + if peer_deps == PeerDeps::Include { + out.extend(collect_deps(manifest.peer_dependencies.as_ref())); + } + out.extend(collect_deps(manifest.optional_dependencies.as_ref())); + out +} + +/// What a future returns when it lands. The main loop uses +/// `transitives` to extend `pending`, plus the cache writes already +/// happened inside the future. Only `fetched=true` futures populate +/// `body_cache` and trigger sibling drain. +struct FetchOutcome { + name: String, + transitives: Vec, + fetched: bool, +} + +type Fut = Pin + Send>>; + +/// `(name, spec) → (FullManifest, resolved_version, version_subtree, transitive_deps)`. +type ParseResult = ( + Arc, + String, + Arc, + Vec, +); + +/// Single combined parse: one `simd_json::to_borrowed_value` over the +/// raw body extracts the envelope (name, dist-tags, versions keys) +/// AND deserializes the resolved version's `CoreVersionManifest` +/// subtree. Same shape as the parse step in `preload-bench`. +fn parse_combined(raw: Arc<[u8]>, spec: &str, peer_deps: PeerDeps) -> Option { + use simd_json::prelude::{ValueAsObject, ValueAsScalar, ValueObjectAccess}; + + let mut buf = (*raw).to_vec(); + let parsed = simd_json::to_borrowed_value(&mut buf).ok()?; + + let name = parsed + .get("name") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .unwrap_or_default(); + let dist_tags: HashMap = parsed + .get("dist-tags") + .and_then(|v| HashMap::::deserialize(v).ok()) + .unwrap_or_default(); + let versions_keys: Vec = parsed + .get("versions") + .and_then(ValueAsObject::as_object) + .map(|obj| obj.keys().map(|k| k.to_string()).collect()) + .unwrap_or_default(); + + let full = FullManifest { + name, + dist_tags, + versions: versions_keys, + raw: Arc::clone(&raw), + ..Default::default() + }; + + let resolved = resolve_target_version((&full).into(), spec).ok()?; + let core = parsed + .get("versions") + .and_then(|v| v.get(resolved.as_str())) + .and_then(|version_obj| CoreVersionManifest::deserialize(version_obj).ok())?; + let core_arc = Arc::new(core); + let transitives = extract_transitive(&core_arc, peer_deps); + + Some((Arc::new(full), resolved, core_arc, transitives)) +} + +/// Fetch + combined parse + cache write for one `(name, spec)`. +/// Future body owns all per-fetch work; main loop only extends +/// `pending` from the returned transitives and refills `futs`. +fn spawn_fetch( + client: reqwest::Client, + registry_url: Arc, + name: String, + spec: String, + cache: MemoryCache, + body_cache: Arc>>>, + peer_deps: PeerDeps, +) -> Fut { + Box::pin(async move { + let url = format!("{}/{}", registry_url, name); + let resp = match client + .get(&url) + .header("accept", "application/vnd.npm.install-v1+json") + .send() + .await + { + Ok(r) if r.status().is_success() => r, + _ => { + return FetchOutcome { + name, + transitives: Vec::new(), + fetched: true, + }; + } + }; + let raw_bytes = match resp.bytes().await { + Ok(b) => b, + Err(_) => { + return FetchOutcome { + name, + transitives: Vec::new(), + fetched: true, + }; + } + }; + let raw_arc: Arc<[u8]> = Arc::from(raw_bytes.as_ref()); + // Stash in body_cache early so concurrent sibling specs + // arriving slightly after see it on their pending pop. + body_cache.lock().insert(name.clone(), Arc::clone(&raw_arc)); + + let spec_for_parse = spec.clone(); + let peer = peer_deps; + let parsed = + tokio::task::spawn_blocking(move || parse_combined(raw_arc, &spec_for_parse, peer)) + .await + .ok() + .flatten(); + + let transitives = match parsed { + Some((full_arc, resolved, core_arc, transitives)) => { + cache.set_full_manifest(name.clone(), Arc::clone(&full_arc)); + cache.set_version_manifest(name.clone(), spec, Arc::clone(&core_arc)); + cache.set_version_manifest(name.clone(), resolved, core_arc); + transitives + } + None => Vec::new(), + }; + + FetchOutcome { + name, + transitives, + fetched: true, + } + }) +} + +/// Settle-only future for a sibling spec whose `(name)` body already +/// landed via a sibling fetch. Same combined parse, no network. +fn spawn_settle( + name: String, + spec: String, + raw: Arc<[u8]>, + cache: MemoryCache, + peer_deps: PeerDeps, +) -> Fut { + Box::pin(async move { + let spec_for_parse = spec.clone(); + let peer = peer_deps; + let parsed = tokio::task::spawn_blocking(move || { + parse_combined(Arc::clone(&raw), &spec_for_parse, peer) + }) + .await + .ok() + .flatten(); + + let transitives = match parsed { + Some((full_arc, resolved, core_arc, transitives)) => { + // Don't overwrite full_manifest — the original fetcher + // already set it. Only populate the version-manifest + // slots so BFS hits the (name, spec) early-return. + cache.set_full_manifest(name.clone(), full_arc); + cache.set_version_manifest(name.clone(), spec, Arc::clone(&core_arc)); + cache.set_version_manifest(name.clone(), resolved, core_arc); + transitives + } + None => Vec::new(), + }; + + FetchOutcome { + name, + transitives, + fetched: false, + } + }) +} + +/// Streaming preload with transitive walk. Self-contained — no +/// dependency on `service::http` / `service::manifest` / +/// `service::registry` beyond `MemoryCache` writes. +pub async fn mb_fetch( + initial_deps: Vec, + registry_url: &str, + cache: &MemoryCache, + config: &PreloadConfig, +) -> MbFetchStats { + let mut stats = MbFetchStats::default(); + let total_start = Instant::now(); + + let client = match build_mb_client() { + Ok(c) => c, + Err(e) => { + tracing::warn!("mb_resolve client build failed: {e}"); + return stats; + } + }; + let registry = Arc::new(registry_url.trim_end_matches('/').to_string()); + let cap = config.concurrency; + let peer_deps = config.peer_deps; + + // Spec-level dedup across the entire run. + let mut seen: HashSet<(String, String)> = HashSet::new(); + let mut pending: VecDeque = VecDeque::new(); + for (name, spec) in initial_deps { + if seen.insert((name.clone(), spec.clone())) { + pending.push_back((name, spec)); + } + } + + // Sibling-fetch dedup: when two specs for the same name are both + // in flight, only the first fires a fetch; the second arrives at + // the cached body and goes through `spawn_settle` instead. + let body_cache: Arc>>> = Arc::new(Mutex::new(HashMap::new())); + let mut in_flight_names: HashSet = HashSet::new(); + let mut deferred_by_name: HashMap> = HashMap::new(); + + let mut futs: FuturesUnordered = FuturesUnordered::new(); + + loop { + // Refill to cap. + while futs.len() < cap { + let Some((name, spec)) = pending.pop_front() else { + break; + }; + // Sibling fast path: body already cached. + if let Some(raw) = body_cache.lock().get(&name).cloned() { + futs.push(spawn_settle(name, spec, raw, cache.clone(), peer_deps)); + continue; + } + // Defer if a fetch for this name is already in flight. + if !in_flight_names.insert(name.clone()) { + deferred_by_name.entry(name).or_default().push(spec); + continue; + } + futs.push(spawn_fetch( + client.clone(), + Arc::clone(®istry), + name, + spec, + cache.clone(), + Arc::clone(&body_cache), + peer_deps, + )); + } + + if futs.is_empty() { + break; + } + + let Some(out) = futs.next().await else { break }; + + if out.transitives.is_empty() && out.fetched { + // Empty result from a fetch is ambiguous (no transitives + // OR a fetch/parse failure). Track conservatively as + // success — the FETCH_TIMINGS-equivalent counter is + // omitted in this path on purpose to keep the future + // body lean. + stats.success += 1; + } else if out.fetched { + stats.success += 1; + } + + // Drain sibling specs deferred while the fetch was in flight. + if out.fetched + && let Some(siblings) = deferred_by_name.remove(&out.name) + && let Some(raw) = body_cache.lock().get(&out.name).cloned() + { + for sibling_spec in siblings { + futs.push(spawn_settle( + out.name.clone(), + sibling_spec, + Arc::clone(&raw), + cache.clone(), + peer_deps, + )); + } + } + + // Extend pending with new transitive specs, dedup. + for (name, spec) in out.transitives { + if seen.insert((name.clone(), spec.clone())) { + pending.push_back((name, spec)); + } + } + } + + let total_wall = total_start.elapsed().as_millis(); + tracing::info!( + "p1-breakdown mb_fetch wall={}ms ok={} fail={}", + total_wall, + stats.success, + stats.fail, + ); + + stats +} diff --git a/crates/ruborist/src/resolver/mod.rs b/crates/ruborist/src/resolver/mod.rs index 582e03b31..2d0a288d9 100644 --- a/crates/ruborist/src/resolver/mod.rs +++ b/crates/ruborist/src/resolver/mod.rs @@ -3,10 +3,12 @@ pub mod builder; pub mod common; pub mod edges; +pub mod fast_preload; #[cfg(feature = "native-git")] pub mod git; #[cfg(feature = "http-tarball")] pub mod http; +pub mod mb_resolve; pub mod preload; pub mod registry; pub mod runtime; diff --git a/crates/ruborist/src/resolver/preload.rs b/crates/ruborist/src/resolver/preload.rs index 1230c5bf6..e9a777407 100644 --- a/crates/ruborist/src/resolver/preload.rs +++ b/crates/ruborist/src/resolver/preload.rs @@ -99,8 +99,17 @@ where let mut in_flight = 0usize; let mut started = false; + // Main-loop overhead instrumentation. Atomic accumulators so we + // can attribute the gap between manifest-bench's pure-HTTP wall + // and ruborist's preload wall: how much of the gap is bookkeeping + // (dedup hash, extract_transitive_deps, queue push, events) vs + // actual fetch wait? + let mut total_dispatch_us: u64 = 0; + let mut total_result_us: u64 = 0; + loop { // Fill up to concurrency limit + let dispatch_start = tokio::time::Instant::now(); while in_flight < concurrency { let item = loop { let Some((name, spec)) = pending.pop_front() else { @@ -134,6 +143,7 @@ where }); in_flight += 1; } + total_dispatch_us += dispatch_start.elapsed().as_micros() as u64; if in_flight == 0 { break; @@ -142,6 +152,7 @@ where let Some((name, result, elapsed_ms)) = futures.next().await else { break; }; + let result_start = tokio::time::Instant::now(); in_flight -= 1; if stats.success_count == 0 && stats.failed_count == 0 { @@ -174,8 +185,15 @@ where tracing::debug!("Failed to preload {}: {}", name, e); } } + total_result_us += result_start.elapsed().as_micros() as u64; } + tracing::info!( + "p1-breakdown preload_loop_dispatch_us={} preload_loop_result_us={}", + total_dispatch_us, + total_result_us, + ); + stats.total_processed = processed.len(); receiver.on_event(BuildEvent::PreloadComplete { diff --git a/crates/ruborist/src/service/api.rs b/crates/ruborist/src/service/api.rs index 878b357a1..06079b248 100644 --- a/crates/ruborist/src/service/api.rs +++ b/crates/ruborist/src/service/api.rs @@ -36,7 +36,10 @@ use crate::model::package_lock::PackageLock; use crate::model::util::parse_package_spec; use crate::resolver::builder::{ BuildDepsConfig, DevDeps, EdgeContext, PeerDeps, add_edges_from, build_deps_with_config, + gather_preload_deps, }; +use crate::resolver::mb_resolve::mb_fetch; +use crate::resolver::preload::PreloadConfig; use crate::resolver::runtime::install_runtime_from_map; use crate::resolver::workspace::WorkspaceDiscovery; use crate::spec::Catalogs; @@ -70,6 +73,16 @@ pub struct BuildDepsOptions { /// Catalog definitions for the `catalog:` dependency protocol. /// Key `""` = default catalog, other keys = named catalogs. pub catalogs: Catalogs, + /// When true, skip the up-front `run_preload_phase`. Set by callers + /// that don't consume the `BuildEvent::PackageResolved` pipeline + /// stream — e.g. `utoo deps` (lockfile-only). The BFS phase has its + /// own per-level prefetch that warms the manifest cache, so dropping + /// preload doesn't change correctness, only avoids the redundant + /// up-front fetch + dedicated wall. + /// Install paths (which feed `PipelineReceiver` to start tarball + /// downloads as resolves complete) leave this false so preload still + /// emits PackageResolved events to the pipeline. + pub skip_preload: bool, } impl BuildDepsOptions { @@ -91,6 +104,7 @@ impl BuildDepsOptions { receiver, supports_semver: None, catalogs: HashMap::new(), + skip_preload: false, } } } @@ -132,6 +146,7 @@ where receiver, supports_semver, catalogs, + skip_preload: skip_preload_caller, } = options; // 1. Find root path (workspace root if applicable) @@ -234,7 +249,13 @@ where registry.supports_semver(), ); - let skip_preload = cache_count > 0; + // Skip preload when: + // - the caller asked us to (e.g. `utoo deps`, no pipeline consumer + // for PackageResolved events — BFS does its own per-level + // prefetch, preload is redundant), OR + // - the project's warm cache already has manifests covering most + // of the workload (existing skip-on-warm behavior). + let skip_preload = skip_preload_caller || cache_count > 0; let mut config = BuildDepsConfig::default() .with_peer_deps(peer_deps) .with_concurrency(concurrency) @@ -251,6 +272,29 @@ where ); } + // Lockfile-only callers (`utoo deps`) skip the receiver-driven + // `run_preload_phase` because they have no pipeline consumer for + // `BuildEvent::PackageResolved`. Route through `mb_fetch` — a + // ruborist-internal standalone preload that bypasses + // `service::http`, `service::manifest`, and `service::registry` + // to match `manifest-bench`'s loop shape directly. PM is + // unaware: this dispatch happens entirely inside ruborist when + // `skip_preload=true` and there's no warm project cache. + if skip_preload_caller && cache_count == 0 { + let initial_deps = gather_preload_deps(&graph, peer_deps); + let preload_config = PreloadConfig { + peer_deps, + concurrency, + }; + mb_fetch( + initial_deps, + registry.registry_url(), + registry.cache(), + &preload_config, + ) + .await; + } + // Preserve the typed error via `Error::new` + `.context(...)` so CLI // renderers (e.g. pm's format_print) can downcast and pretty-print the // dependency chain carried by `ResolveError::WithChain`. @@ -258,9 +302,12 @@ where .await .map_err(|e| anyhow::Error::new(e).context("Dependency resolution failed"))?; + let t_serialize_start = std::time::Instant::now(); let (packages, _total) = graph.serialize_to_packages(&root_path); + let serialize_us = t_serialize_start.elapsed().as_micros() as u64; // Export project cache from memory cache for the host to persist. + let t_cache_export_start = std::time::Instant::now(); let mut project_cache = ProjectCacheData::default(); for (key, manifest) in registry.cache().export_version_manifests() { // `parse_package_spec` rather than `split_once('@')` so scoped names @@ -271,6 +318,13 @@ where pkg_cache.specs.insert(spec.to_string(), version.clone()); pkg_cache.manifests.insert(version, (*manifest).clone()); } + let cache_export_us = t_cache_export_start.elapsed().as_micros() as u64; + + tracing::info!( + "p1-breakdown serialize_us={} cache_export_us={}", + serialize_us, + cache_export_us, + ); Ok(BuildDepsOutput { lock: PackageLock::new(&pkg.name, &pkg.version, packages), @@ -324,6 +378,7 @@ mod tests { receiver: NoopReceiver, supports_semver: None, catalogs: HashMap::new(), + skip_preload: false, }; assert_eq!(options.concurrency, 20); diff --git a/crates/ruborist/src/service/manifest.rs b/crates/ruborist/src/service/manifest.rs index 74baf3b9c..38db87969 100644 --- a/crates/ruborist/src/service/manifest.rs +++ b/crates/ruborist/src/service/manifest.rs @@ -4,7 +4,11 @@ //! [`crate::service::fetch`] so retry policy stays uniform across registry //! manifest fetches and non-registry resolvers (git, http tarball). +use std::collections::HashMap; +use std::sync::Arc; + use anyhow::{Result, anyhow}; +use serde::Deserialize; use tokio_retry::RetryIf; use super::fetch::{ @@ -12,25 +16,37 @@ use super::fetch::{ }; use super::http::get_client; use crate::model::manifest::{CoreVersionManifest, FullManifest}; +use crate::resolver::version::resolve_target_version; +use crate::util::FETCH_TIMINGS; -/// Parse JSON bytes on rayon's CPU thread pool (native) or inline -/// (wasm32). Keeps the tokio runtime free of `simd_json` work so other -/// in-flight manifest fetches keep driving network IO while this one -/// parses. +/// Parse JSON bytes on tokio's blocking thread pool. +/// +/// The history of this function captures three different attempts: +/// - rayon::spawn (original): rayon's pool is `num_cpus` (= 2 on +/// GHA), 64 concurrent parses queued behind 2 workers → avg_parse +/// 30ms wall vs ~5ms CPU. round-0 baseline. +/// - inline (round 1, reverted): no rayon hop, but the simd_json +/// call blocks the tokio runtime worker, so other in-flight +/// fetches couldn't drive their socket I/O — avg_request grew +/// 35ms → 52ms (+17ms), eff_parallel 42 → 35, net p1 wall +0.37s. +/// - spawn_blocking (current): tokio's dedicated blocking pool has +/// a much higher default cap (512), so 64 concurrent parses are +/// never queued. Unlike rayon there's no contention with the +/// install path's parallel-write rayon usage, and unlike inline +/// the tokio runtime workers stay free to drive network I/O on +/// all in-flight fetches. async fn parse_json_off_runtime(mut bytes: Vec) -> Result where T: serde::de::DeserializeOwned + Send + 'static, { #[cfg(not(target_arch = "wasm32"))] { - let (tx, rx) = tokio::sync::oneshot::channel(); - rayon::spawn(move || { - let result = simd_json::serde::from_slice::(&mut bytes) - .map_err(|e| anyhow!("JSON parse error: {e}")); - let _ = tx.send(result); - }); - rx.await - .map_err(|e| anyhow!("rayon parse channel closed: {e}"))? + tokio::task::spawn_blocking(move || { + simd_json::serde::from_slice::(&mut bytes) + .map_err(|e| anyhow!("JSON parse error: {e}")) + }) + .await + .map_err(|e| anyhow!("spawn_blocking parse panicked: {e}"))? } #[cfg(target_arch = "wasm32")] { @@ -91,7 +107,9 @@ pub async fn fetch_full_manifest(opts: FetchManifestOptions<'_>) -> Result) -> Result) -> Result, + /// `Some` when the requested spec resolves to a real version in + /// `manifest.versions`. `None` only on no-match (rare; usually a + /// spec referring to a yanked or moved version). + pub primary_settle: Option, +} + +/// `(resolved_version, parsed_subtree)` — what +/// [`fetch_full_manifest_with_settle`] hands back to callers that +/// supplied a `primary_spec`. +pub type PrimarySettleResult = (String, Arc); + +#[allow(clippy::large_enum_variant)] +pub enum FetchWithSettleResult { + Ok(FetchWithSettle), + NotModified, +} + +/// Fetch a full manifest and resolve the primary spec from the same +/// parse pass. +/// +/// Where [`fetch_full_manifest`] uses `simd_json::serde::from_slice` +/// to materialize a typed `FullManifest` (cheap envelope, deep +/// `versions` subtrees skipped via `IgnoredAny`) and leaves version +/// subtree extraction to a later `simd_json::to_borrowed_value` +/// reparse, this entry point does the borrowed-value parse once and +/// extracts: +/// * envelope fields needed by the resolver (`name`, `dist-tags`, +/// `versions` keys), +/// * the resolved-version subtree as a typed +/// [`CoreVersionManifest`]. +/// +/// Saves one full simd_json pass on the parse hot path — +/// `fast_preload` uses ~2700 of these per `utoo deps` cold run, so +/// halving the per-fetch parse work meaningfully reduces CPU on +/// 2-core CI. +pub async fn fetch_full_manifest_with_settle( + opts: FetchManifestOptions<'_>, + primary_spec: &str, +) -> Result { + let url = format!("{}/{}", opts.registry_url, opts.name); + let etag_owned = opts.etag.map(|s| s.to_string()); + let primary_spec_owned = primary_spec.to_string(); + let accept = match opts.format { + MetadataFormat::Abbreviated => "application/vnd.npm.install-v1+json", + MetadataFormat::Complete => "application/json", + }; + + RetryIf::spawn( + retry_strategy(), + || { + let url = url.clone(); + let etag = etag_owned.clone(); + let primary_spec = primary_spec_owned.clone(); + async move { + let mut request = get_client() + .map_err(FetchError::Permanent)? + .get(&url) + .header("Accept", accept); + if let Some(etag_value) = &etag { + request = request.header("If-None-Match", etag_value); + } + + let t_request_start = std::time::Instant::now(); + let response = request.send().await.map_err(classify_reqwest_error)?; + let request_us = t_request_start.elapsed().as_micros() as u64; + let status = response.status(); + + if status == reqwest::StatusCode::NOT_MODIFIED { + if etag.is_some() { + return Ok(FetchWithSettleResult::NotModified); + } + return Err(classify_status(status, &url)); + } + + if status.is_success() { + let new_etag = response + .headers() + .get("etag") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + + let t_body_start = std::time::Instant::now(); + let raw_bytes = response + .bytes() + .await + .map_err(|e| FetchError::Permanent(anyhow!("Response read error: {e}")))? + .to_vec(); + let body_us = t_body_start.elapsed().as_micros() as u64; + let bytes_len = raw_bytes.len() as u64; + let raw_arc: Arc<[u8]> = Arc::from(raw_bytes); + + let t_parse_start = std::time::Instant::now(); + let parse_result = + parse_envelope_and_settle(Arc::clone(&raw_arc), primary_spec) + .await + .map_err(FetchError::Permanent)?; + let parse_us = t_parse_start.elapsed().as_micros() as u64; + + FETCH_TIMINGS.record(request_us, body_us, parse_us, bytes_len); + + let (manifest, primary_settle) = parse_result; + Ok(FetchWithSettleResult::Ok(FetchWithSettle { + manifest, + etag: new_etag, + primary_settle, + })) + } else { + Err(classify_status(status, &url)) + } + } + }, + is_retryable, + ) + .await + .map_err(|e| match e { + FetchError::Retryable(e) | FetchError::Permanent(e) => { + anyhow!("Failed to fetch {}: {:#}", opts.name, e) + } + }) +} + +/// Off-runtime combined parse: `simd_json::to_borrowed_value` once, +/// extract envelope into [`FullManifest`] + resolve `primary_spec` +/// against the parsed `versions` keys + materialize the resolved +/// version's subtree into [`CoreVersionManifest`]. +/// +/// Constructs `FullManifest` manually rather than via typed serde so +/// the work is exactly one parse pass. Other `FullManifest` fields +/// (`description`, `time`, `maintainers`, etc.) stay at `Default` +/// values — none are read on the resolver hot path. +async fn parse_envelope_and_settle( + raw: Arc<[u8]>, + primary_spec: String, +) -> Result<(FullManifest, Option)> { + #[cfg(not(target_arch = "wasm32"))] + { + tokio::task::spawn_blocking(move || parse_envelope_and_settle_sync(raw, &primary_spec)) + .await + .map_err(|e| anyhow!("spawn_blocking parse panicked: {e}"))? + } + #[cfg(target_arch = "wasm32")] + { + parse_envelope_and_settle_sync(raw, &primary_spec) + } +} + +fn parse_envelope_and_settle_sync( + raw: Arc<[u8]>, + primary_spec: &str, +) -> Result<(FullManifest, Option)> { + use simd_json::prelude::{ValueAsScalar, ValueObjectAccess}; + + let mut buf = (*raw).to_vec(); + let parsed = + simd_json::to_borrowed_value(&mut buf).map_err(|e| anyhow!("JSON parse error: {e}"))?; + + let name = parsed + .get("name") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .unwrap_or_default(); + + let dist_tags: HashMap = parsed + .get("dist-tags") + .and_then(|v| HashMap::::deserialize(v).ok()) + .unwrap_or_default(); + + let versions_keys: Vec = parsed + .get("versions") + .and_then(simd_json::prelude::ValueAsObject::as_object) + .map(|obj| obj.keys().map(|k| k.to_string()).collect()) + .unwrap_or_default(); + + let manifest = FullManifest { + name, + dist_tags: dist_tags.clone(), + versions: versions_keys, + raw, + ..Default::default() + }; + + // Resolve spec against the just-extracted envelope. + let primary_settle = match resolve_target_version((&manifest).into(), primary_spec) { + Ok(resolved) => parsed + .get("versions") + .and_then(|v| v.get(resolved.as_str())) + .and_then(|version_obj| CoreVersionManifest::deserialize(version_obj).ok()) + .map(|core| (resolved, Arc::new(core))), + Err(_) => None, + }; + + Ok((manifest, primary_settle)) +} + /// Fetch full manifest without ETag / 304 support. /// /// Convenience wrapper around [`fetch_full_manifest`] for callers that never @@ -190,6 +417,7 @@ pub async fn fetch_version_manifest( || { let url = url.clone(); async move { + let t_request_start = std::time::Instant::now(); let response = get_client() .map_err(FetchError::Permanent)? .get(&url) @@ -197,16 +425,26 @@ pub async fn fetch_version_manifest( .send() .await .map_err(classify_reqwest_error)?; + let request_us = t_request_start.elapsed().as_micros() as u64; if response.status().is_success() { + let t_body_start = std::time::Instant::now(); let bytes = response .bytes() .await .map_err(|e| FetchError::Permanent(anyhow!("Response read error: {e}")))? .to_vec(); - parse_json_off_runtime::(bytes) + let body_us = t_body_start.elapsed().as_micros() as u64; + let bytes_len = bytes.len() as u64; + let t_parse_start = std::time::Instant::now(); + let parsed = parse_json_off_runtime::(bytes) .await - .map_err(FetchError::Permanent) + .map_err(FetchError::Permanent); + let parse_us = t_parse_start.elapsed().as_micros() as u64; + if parsed.is_ok() { + FETCH_TIMINGS.record(request_us, body_us, parse_us, bytes_len); + } + parsed } else { Err(classify_status(response.status(), &url)) } diff --git a/crates/ruborist/src/service/mod.rs b/crates/ruborist/src/service/mod.rs index 13109e994..5adb6bf0b 100644 --- a/crates/ruborist/src/service/mod.rs +++ b/crates/ruborist/src/service/mod.rs @@ -60,8 +60,9 @@ pub use cache::{ pub use fs::{Glob, NoopGlob, exists, read_to_string}; pub use http::client_builder; pub use manifest::{ - FetchManifestOptions, FetchManifestResult, FetchVersionManifestOptions, MetadataFormat, - fetch_full_manifest, fetch_full_manifest_fresh, fetch_version_manifest, + FetchManifestOptions, FetchManifestResult, FetchVersionManifestOptions, FetchWithSettle, + FetchWithSettleResult, MetadataFormat, fetch_full_manifest, fetch_full_manifest_fresh, + fetch_full_manifest_with_settle, fetch_version_manifest, }; pub use registry::UnifiedRegistry; pub use store::{ManifestStore, NoopStore}; diff --git a/crates/ruborist/src/util/mod.rs b/crates/ruborist/src/util/mod.rs index 649e47c95..a7f0b7b7d 100644 --- a/crates/ruborist/src/util/mod.rs +++ b/crates/ruborist/src/util/mod.rs @@ -1,6 +1,8 @@ //! Shared utility primitives for ruborist and downstream consumers. pub mod oncemap; +pub mod timing; pub use crate::model::util::{PackageNameStr, parse_package_spec, read_package_json}; pub use oncemap::OnceMap; +pub use timing::{FETCH_TIMINGS, FetchTimings, FetchTimingsSnapshot}; diff --git a/crates/ruborist/src/util/timing.rs b/crates/ruborist/src/util/timing.rs new file mode 100644 index 000000000..f50e921b9 --- /dev/null +++ b/crates/ruborist/src/util/timing.rs @@ -0,0 +1,134 @@ +//! Per-phase manifest fetch timing accumulator for p1 perf investigation. +//! +//! Splits each `fetch_*_manifest` call into three observable pieces: +//! - `request_us`: from `request.send().await` to response headers +//! received. Captures TCP connect (when not pooled), TLS handshake, +//! HTTP request roundtrip, and server-side processing. +//! - `body_us`: from response headers to the entire JSON body buffered. +//! Network-bandwidth bound for large packuments. +//! - `parse_us`: from full body buffered to a typed manifest. CPU bound +//! (simd_json on a spawn_blocking thread). +//! +//! `parse_us` is wall-clock for the await on `parse_json_off_runtime` — +//! since JSON parse runs on `spawn_blocking`, this includes scheduling +//! latency rather than pure CPU time. Together with the per-fetch total +//! already tracked in `preload_manifests`, this lets us answer "where +//! did p1's wall time go?" without external profiling. +//! +//! All counters are `AtomicU64` so the recording path is lock-free. +//! Numbers are reset between resolves via [`reset()`] so successive +//! `utoo deps` invocations report independently. + +use std::sync::atomic::{AtomicU64, Ordering}; + +/// Per-process accumulator for manifest fetch timings. +#[derive(Default, Debug)] +pub struct FetchTimings { + /// Number of fetches recorded (full + version manifest). + pub count: AtomicU64, + /// Sum of microseconds spent in `request.send().await`. + pub request_us: AtomicU64, + /// Sum of microseconds spent in `response.bytes().await`. + pub body_us: AtomicU64, + /// Sum of microseconds spent awaiting `parse_json_off_runtime`. + pub parse_us: AtomicU64, + /// Sum of body bytes received across all fetches. + pub bytes: AtomicU64, +} + +impl FetchTimings { + /// Record one fetch's split timings. Call once per successful fetch. + pub fn record(&self, request_us: u64, body_us: u64, parse_us: u64, bytes: u64) { + self.count.fetch_add(1, Ordering::Relaxed); + self.request_us.fetch_add(request_us, Ordering::Relaxed); + self.body_us.fetch_add(body_us, Ordering::Relaxed); + self.parse_us.fetch_add(parse_us, Ordering::Relaxed); + self.bytes.fetch_add(bytes, Ordering::Relaxed); + } + + /// Reset all counters to zero. + pub fn reset(&self) { + self.count.store(0, Ordering::Relaxed); + self.request_us.store(0, Ordering::Relaxed); + self.body_us.store(0, Ordering::Relaxed); + self.parse_us.store(0, Ordering::Relaxed); + self.bytes.store(0, Ordering::Relaxed); + } + + /// Snapshot of the current accumulator state. + pub fn snapshot(&self) -> FetchTimingsSnapshot { + FetchTimingsSnapshot { + count: self.count.load(Ordering::Relaxed), + request_us: self.request_us.load(Ordering::Relaxed), + body_us: self.body_us.load(Ordering::Relaxed), + parse_us: self.parse_us.load(Ordering::Relaxed), + bytes: self.bytes.load(Ordering::Relaxed), + } + } +} + +/// Immutable snapshot suitable for printing. +#[derive(Debug, Clone, Copy)] +pub struct FetchTimingsSnapshot { + pub count: u64, + pub request_us: u64, + pub body_us: u64, + pub parse_us: u64, + pub bytes: u64, +} + +impl FetchTimingsSnapshot { + /// One-line summary for tracing logs. + pub fn summary_line(&self) -> String { + if self.count == 0 { + return "fetch-timings: no requests recorded".to_string(); + } + let count = self.count; + let avg_req = self.request_us / count; + let avg_body = self.body_us / count; + let avg_parse = self.parse_us / count; + let avg_bytes = self.bytes / count; + format!( + "fetch-timings: n={} sum_request={}ms sum_body={}ms sum_parse={}ms total_bytes={}MB | avg_request={}us avg_body={}us avg_parse={}us avg_bytes={}KB", + count, + self.request_us / 1_000, + self.body_us / 1_000, + self.parse_us / 1_000, + self.bytes / 1_000_000, + avg_req, + avg_body, + avg_parse, + avg_bytes / 1_024, + ) + } +} + +/// Process-wide manifest fetch timing accumulator. +pub static FETCH_TIMINGS: FetchTimings = FetchTimings { + count: AtomicU64::new(0), + request_us: AtomicU64::new(0), + body_us: AtomicU64::new(0), + parse_us: AtomicU64::new(0), + bytes: AtomicU64::new(0), +}; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn record_and_snapshot() { + FETCH_TIMINGS.reset(); + FETCH_TIMINGS.record(100, 200, 300, 1024); + FETCH_TIMINGS.record(150, 250, 350, 2048); + let snap = FETCH_TIMINGS.snapshot(); + assert_eq!(snap.count, 2); + assert_eq!(snap.request_us, 250); + assert_eq!(snap.body_us, 450); + assert_eq!(snap.parse_us, 650); + assert_eq!(snap.bytes, 3072); + FETCH_TIMINGS.reset(); + let snap2 = FETCH_TIMINGS.snapshot(); + assert_eq!(snap2.count, 0); + } +}