Skip to content

Commit 115ec5b

Browse files
Route all wheeled packages through daemon + shared CUDA layer cache
Two optimizations: 1. All packages with wheel URLs now go through streaming daemon instead of only large ones. Only sdist-only packages use uv pip install. 2. Shared wheel cache at $ZEROSTART_CACHE/shared_wheels/ — CUDA libs (nvidia-cuda-runtime, nvidia-cublas, etc.) downloaded once and hardlinked across environments (torch, vllm, diffusers share ~6GB). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent fa493a0 commit 115ec5b

3 files changed

Lines changed: 201 additions & 32 deletions

File tree

crates/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/zs-fast-wheel/src/main.rs

Lines changed: 190 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use zs_fast_wheel::daemon::{DaemonConfig, DaemonEngine};
2-
use zs_fast_wheel::manifest::Manifest;
2+
use zs_fast_wheel::manifest::{Manifest, WheelSpec};
33
use zs_fast_wheel::pipeline;
44
use zs_fast_wheel::resolve;
55

66
use anyhow::{Context, Result};
77
use clap::{Parser, Subcommand};
88
use sha2::{Digest, Sha256};
99
use std::os::unix::process::CommandExt;
10-
use std::path::PathBuf;
10+
use std::path::{Path, PathBuf};
1111
use tracing_subscriber::EnvFilter;
1212

1313
#[derive(Parser)]
@@ -409,6 +409,138 @@ fn parse_requirements_file(path: &str) -> Result<Vec<String>> {
409409
.collect())
410410
}
411411

412+
/// Shared wheel cache directory: `$ZEROSTART_CACHE/shared_wheels/{name}-{version}/`
413+
///
414+
/// CUDA libraries (nvidia-cuda-runtime-cu12, nvidia-cublas-cu12, etc.) are identical
415+
/// across environments (torch, vllm, diffusers all share ~6GB of CUDA deps).
416+
/// By caching extracted wheels and hardlinking them, we avoid re-downloading and
417+
/// re-extracting the same wheels for every environment.
418+
fn shared_wheel_cache_dir(spec: &WheelSpec) -> PathBuf {
419+
cache_dir()
420+
.join("shared_wheels")
421+
.join(format!("{}-{}", spec.distribution, spec.version))
422+
}
423+
424+
/// Try to restore a wheel from the shared cache via hardlinks.
425+
///
426+
/// Returns true if the wheel was fully restored from cache.
427+
fn restore_from_shared_cache(spec: &WheelSpec, site_packages: &Path) -> bool {
428+
let cache_path = shared_wheel_cache_dir(spec);
429+
let marker = cache_path.join(".complete");
430+
if !marker.exists() {
431+
return false;
432+
}
433+
434+
if let Err(e) = hardlink_tree(&cache_path, site_packages) {
435+
tracing::warn!(
436+
"Failed to restore {} from shared cache: {e}",
437+
spec.distribution
438+
);
439+
return false;
440+
}
441+
442+
true
443+
}
444+
445+
/// Populate the shared cache from a freshly extracted wheel in site-packages.
446+
///
447+
/// We identify the wheel's files by looking for its .dist-info directory,
448+
/// then copy the top-level dirs that belong to it into the cache.
449+
fn populate_shared_cache(spec: &WheelSpec, site_packages: &Path) {
450+
let cache_path = shared_wheel_cache_dir(spec);
451+
if cache_path.join(".complete").exists() {
452+
return; // already cached
453+
}
454+
455+
if std::fs::create_dir_all(&cache_path).is_err() {
456+
return;
457+
}
458+
459+
// Find this wheel's dist-info and import roots in site-packages
460+
let norm = spec.distribution.replace('-', "_").to_lowercase();
461+
if let Ok(entries) = std::fs::read_dir(site_packages) {
462+
for entry in entries.flatten() {
463+
let name = entry.file_name();
464+
let name_str = name.to_string_lossy().to_string();
465+
466+
// Match dist-info dir or import root dirs
467+
let is_dist_info = name_str.ends_with(".dist-info") && {
468+
let stem = name_str.trim_end_matches(".dist-info");
469+
let pkg = stem.split('-').next().unwrap_or(stem);
470+
pkg.replace('-', "_").to_lowercase() == norm
471+
};
472+
473+
let is_data_dir = name_str.ends_with(".data") && {
474+
let stem = name_str.trim_end_matches(".data");
475+
let pkg = stem.split('-').next().unwrap_or(stem);
476+
pkg.replace('-', "_").to_lowercase() == norm
477+
};
478+
479+
let is_import_root = spec
480+
.import_roots
481+
.iter()
482+
.any(|r| r == &name_str || name_str == format!("{norm}.py"));
483+
484+
if is_dist_info || is_data_dir || is_import_root {
485+
let src = entry.path();
486+
let dst = cache_path.join(&name);
487+
if src.is_dir() {
488+
let _ = copy_dir_recursive(&src, &dst);
489+
} else {
490+
let _ = std::fs::copy(&src, &dst);
491+
}
492+
}
493+
}
494+
}
495+
496+
// Mark cache as complete
497+
let _ = std::fs::File::create(cache_path.join(".complete"));
498+
}
499+
500+
/// Recursively hardlink all files from src tree into dst.
501+
fn hardlink_tree(src: &Path, dst: &Path) -> Result<()> {
502+
for entry in std::fs::read_dir(src)? {
503+
let entry = entry?;
504+
let name = entry.file_name();
505+
let name_str = name.to_string_lossy();
506+
507+
// Skip .complete marker
508+
if name_str == ".complete" {
509+
continue;
510+
}
511+
512+
let src_path = entry.path();
513+
let dst_path = dst.join(&name);
514+
515+
if src_path.is_dir() {
516+
std::fs::create_dir_all(&dst_path)?;
517+
hardlink_tree(&src_path, &dst_path)?;
518+
} else {
519+
// Try hardlink first, fall back to copy (cross-device)
520+
if std::fs::hard_link(&src_path, &dst_path).is_err() {
521+
std::fs::copy(&src_path, &dst_path)?;
522+
}
523+
}
524+
}
525+
Ok(())
526+
}
527+
528+
/// Recursively copy a directory.
529+
fn copy_dir_recursive(src: &Path, dst: &Path) -> Result<()> {
530+
std::fs::create_dir_all(dst)?;
531+
for entry in std::fs::read_dir(src)? {
532+
let entry = entry?;
533+
let src_path = entry.path();
534+
let dst_path = dst.join(entry.file_name());
535+
if src_path.is_dir() {
536+
copy_dir_recursive(&src_path, &dst_path)?;
537+
} else {
538+
std::fs::copy(&src_path, &dst_path)?;
539+
}
540+
}
541+
Ok(())
542+
}
543+
412544
#[tokio::main]
413545
async fn main() -> Result<()> {
414546
tracing_subscriber::fmt()
@@ -483,27 +615,27 @@ async fn main() -> Result<()> {
483615

484616
if verbose {
485617
eprintln!(
486-
"Resolved: {} packages ({} via uv, {} via daemon)",
618+
"Resolved: {} packages ({} sdist via uv, {} wheels via daemon)",
487619
plan.all.len(),
488620
plan.uv_specs.len(),
489621
plan.daemon_wheels.len(),
490622
);
491623
}
492624

493-
// Run uv small install + daemon streaming in parallel
625+
// Run uv sdist install + daemon streaming in parallel
494626
let uv_specs = plan.uv_specs.clone();
495627
let env_dir_clone = env_dir.clone();
496628
let uv_verbose = verbose;
497629

498630
let uv_handle = if !uv_specs.is_empty() {
499631
if verbose {
500-
eprintln!("Installing {} small packages via uv...", uv_specs.len());
632+
eprintln!("Installing {} sdist-only packages via uv...", uv_specs.len());
501633
}
502634
Some(tokio::task::spawn_blocking(move || {
503635
let result = uv_install(&env_dir_clone, &uv_specs);
504636
if uv_verbose {
505637
if let Err(ref e) = result {
506-
eprintln!("Warning: uv small install failed: {e}");
638+
eprintln!("Warning: uv sdist install failed: {e}");
507639
}
508640
}
509641
result
@@ -513,29 +645,64 @@ async fn main() -> Result<()> {
513645
};
514646

515647
if !plan.daemon_wheels.is_empty() {
648+
// Check shared cache — restore cached wheels via hardlinks, only download uncached
649+
let mut uncached_wheels = Vec::new();
650+
let mut cached_count = 0u32;
651+
652+
for spec in &plan.daemon_wheels {
653+
if restore_from_shared_cache(spec, &site_packages) {
654+
cached_count += 1;
655+
if verbose {
656+
eprintln!(" {} (shared cache hit)", spec.distribution);
657+
}
658+
} else {
659+
uncached_wheels.push(spec.clone());
660+
}
661+
}
662+
516663
if verbose {
517-
eprintln!("Streaming {} large packages via daemon...", plan.daemon_wheels.len());
518-
for w in &plan.daemon_wheels {
519-
eprintln!(" {} ({:.1} MB)", w.distribution, w.size as f64 / 1024.0 / 1024.0);
664+
if cached_count > 0 {
665+
eprintln!(
666+
"Shared cache: {cached_count} wheels restored, {} to download",
667+
uncached_wheels.len()
668+
);
669+
}
670+
if !uncached_wheels.is_empty() {
671+
eprintln!("Streaming {} packages via daemon...", uncached_wheels.len());
672+
for w in &uncached_wheels {
673+
eprintln!(" {} ({:.1} MB)", w.distribution, w.size as f64 / 1024.0 / 1024.0);
674+
}
520675
}
521676
}
522677

523-
let config = DaemonConfig {
524-
site_packages: site_packages.clone(),
525-
parallel_downloads: 8,
526-
extract_threads: num_cpus(),
527-
};
678+
if !uncached_wheels.is_empty() {
679+
let config = DaemonConfig {
680+
site_packages: site_packages.clone(),
681+
parallel_downloads: 8,
682+
extract_threads: num_cpus(),
683+
};
528684

529-
let engine = DaemonEngine::new(plan.daemon_wheels);
530-
engine.run(&config).await?;
685+
let wheels_to_cache: Vec<WheelSpec> = uncached_wheels.clone();
686+
let engine = DaemonEngine::new(uncached_wheels);
687+
engine.run(&config).await?;
531688

532-
let (files, bytes) = engine.extract_stats();
533-
if verbose {
534-
eprintln!(
535-
"Daemon: extracted {} files ({:.1} MB)",
536-
files,
537-
bytes as f64 / 1024.0 / 1024.0
538-
);
689+
let (files, bytes) = engine.extract_stats();
690+
if verbose {
691+
eprintln!(
692+
"Daemon: extracted {} files ({:.1} MB)",
693+
files,
694+
bytes as f64 / 1024.0 / 1024.0
695+
);
696+
}
697+
698+
// Populate shared cache for newly extracted wheels
699+
let sp_for_cache = site_packages.clone();
700+
tokio::task::spawn_blocking(move || {
701+
for spec in &wheels_to_cache {
702+
populate_shared_cache(spec, &sp_for_cache);
703+
}
704+
})
705+
.await?;
539706
}
540707
}
541708

crates/zs-fast-wheel/src/resolve.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,17 +88,16 @@ fn lookup_pypi_wheel(
8888
best_specific.or(best_abi3).or(best_universal)
8989
}
9090

91-
/// Size threshold: wheels larger than this go through the streaming daemon.
92-
/// Smaller wheels are installed via `uv pip install` (instant from cache).
93-
const DAEMON_THRESHOLD: u64 = 5 * 1024 * 1024; // 5MB
94-
95-
/// Resolved artifacts split into small (uv) and large (daemon) buckets.
91+
/// Resolved artifacts split by whether we have a wheel URL.
92+
///
93+
/// Packages with wheel URLs go through our streaming daemon (parallel download+extract).
94+
/// Packages without URLs (sdist-only) go through uv pip install (can build from source).
9695
pub struct ResolvedPlan {
9796
/// All resolved wheel specs
9897
pub all: Vec<WheelSpec>,
99-
/// Small wheels — install via `uv pip install`
98+
/// sdist-only packages — install via `uv pip install` (can build from source)
10099
pub uv_specs: Vec<String>,
101-
/// Large wheels — stream via daemon
100+
/// Packages with wheel URLs — stream via daemon (parallel download+extract)
102101
pub daemon_wheels: Vec<WheelSpec>,
103102
}
104103

@@ -143,9 +142,11 @@ pub fn resolve_requirements(
143142
let mut daemon_wheels = Vec::new();
144143

145144
for spec in &specs {
146-
if spec.url.is_empty() || spec.size <= DAEMON_THRESHOLD {
145+
if spec.url.is_empty() {
146+
// sdist-only — uv builds from source
147147
uv_specs.push(format!("{}=={}", spec.distribution, spec.version));
148148
} else {
149+
// Has wheel URL — stream via daemon (parallel download+extract)
149150
daemon_wheels.push(spec.clone());
150151
}
151152
}
@@ -268,7 +269,7 @@ fn parse_pylock(content: &str) -> Result<Vec<WheelSpec>> {
268269
distribution: name,
269270
version,
270271
import_roots,
271-
size: 0, // forces into uv_specs bucket (< DAEMON_THRESHOLD)
272+
size: 0,
272273
hash: None,
273274
});
274275
}

0 commit comments

Comments
 (0)