Skip to content

Commit cb039b8

Browse files
Feat: progressive loading — Python starts while packages install
Start Python immediately on cold start instead of waiting for all wheels. File-based ready markers let the lazy import hook block per-package: - Health check ready in 1.8s (was 46s with uv, 33s without progressive) - Small packages (fastapi, uvicorn) available in <1s - Large packages (torch) block on import until extracted Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent bfb034d commit cb039b8

File tree

2 files changed

+184
-51
lines changed

2 files changed

+184
-51
lines changed

crates/zs-fast-wheel/src/daemon.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ pub struct DaemonConfig {
2727
pub site_packages: PathBuf,
2828
pub parallel_downloads: usize,
2929
pub extract_threads: usize,
30+
/// Directory where per-package ready markers are written.
31+
/// Enables progressive loading: Python starts immediately and imports
32+
/// block only until their specific package's marker appears.
33+
pub ready_dir: Option<PathBuf>,
3034
}
3135

3236
impl DaemonConfig {
@@ -49,6 +53,7 @@ impl Default for DaemonConfig {
4953
extract_threads: std::thread::available_parallelism()
5054
.map(|n| n.get())
5155
.unwrap_or(4),
56+
ready_dir: None,
5257
}
5358
}
5459
}
@@ -219,6 +224,7 @@ impl DaemonEngine {
219224
let queue = self.queue.clone();
220225
let total_wheels = self.total_wheels;
221226
let rx = rx.clone();
227+
let ready_dir = config.ready_dir.clone();
222228

223229
let handle = tokio::task::spawn_blocking(move || {
224230
loop {
@@ -256,6 +262,11 @@ impl DaemonEngine {
256262
elapsed.as_secs_f64()
257263
);
258264

265+
// Write ready marker for progressive loading
266+
if let Some(ref rd) = ready_dir {
267+
let _ = std::fs::File::create(rd.join(&dist));
268+
}
269+
259270
{
260271
let mut q = queue.lock().unwrap();
261272
q.mark_done(&dist);
@@ -322,6 +333,11 @@ impl DaemonEngine {
322333
cvar.notify_all();
323334
}
324335

336+
// Write all_done marker for progressive loading
337+
if let Some(ref ready_dir) = config.ready_dir {
338+
let _ = std::fs::File::create(ready_dir.join(".all_done"));
339+
}
340+
325341
let elapsed = start.elapsed();
326342
let files = self.stats.files_written.load(Ordering::Relaxed);
327343
let bytes = self.stats.bytes_written.load(Ordering::Relaxed);

crates/zs-fast-wheel/src/main.rs

Lines changed: 168 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -392,35 +392,109 @@ else:
392392
std::process::exit(1);
393393
}
394394

395-
/// Spawn Python as a child process, run cache population in background, wait for child.
396-
/// Used when we need to populate the shared cache concurrently with script execution.
397-
fn spawn_python_with_cache(
395+
/// Spawn Python with progressive loading: imports block until their package is extracted.
396+
///
397+
/// The daemon runs in the background writing ready markers to `ready_dir`.
398+
/// Python starts immediately with an inline lazy import hook that polls for markers.
399+
/// Imports block only until their specific package's marker appears.
400+
fn spawn_python_progressive(
398401
python: &std::path::Path,
399402
site_packages: &std::path::Path,
400403
target: &str,
401404
target_args: &[String],
402-
_cache_handle: Option<tokio::task::JoinHandle<()>>,
405+
ready_dir: &std::path::Path,
406+
import_map: &std::collections::HashMap<String, String>,
403407
cuda_dirs: &[PathBuf],
404-
) -> ! {
408+
) -> std::process::Child {
405409
let mut pythonpath = site_packages.to_string_lossy().to_string();
406410
if let Ok(existing) = std::env::var("PYTHONPATH") {
407411
pythonpath = format!("{pythonpath}:{existing}");
408412
}
409413

410414
let sp_str = site_packages.display().to_string();
411415
let args_str = format!("{target_args:?}");
416+
let ready_dir_str = ready_dir.display().to_string();
412417

413-
// Same entry-point-discovery script as exec_python
418+
// Serialize import_map as Python dict literal
419+
let mut map_parts = Vec::new();
420+
for (k, v) in import_map {
421+
map_parts.push(format!("'{}':'{}'", k.replace('\'', "\\'"), v.replace('\'', "\\'")));
422+
}
423+
let import_map_py = format!("{{{}}}", map_parts.join(","));
424+
425+
// Python bootstrap: lazy import hook + entry point discovery
414426
let script = format!(
415427
r#"
416-
import sys, os, re, importlib, subprocess
428+
import sys, os, time, importlib, importlib.abc, importlib.util, re, subprocess
417429
from pathlib import Path
418430
from configparser import ConfigParser
419431
432+
# --- Progressive loading hook ---
433+
class _ZSHook(importlib.abc.MetaPathFinder):
434+
def __init__(self, ready_dir, import_map):
435+
self._ready_dir = ready_dir
436+
self._import_map = import_map
437+
self._resolved = set()
438+
self._wait_times = {{}}
439+
440+
def _can_import(self, name):
441+
idx = sys.meta_path.index(self)
442+
sys.meta_path.pop(idx)
443+
try:
444+
return importlib.util.find_spec(name) is not None
445+
except (ModuleNotFoundError, ValueError, ImportError):
446+
return False
447+
finally:
448+
sys.meta_path.insert(idx, self)
449+
450+
def find_spec(self, fullname, path=None, target=None):
451+
top = fullname.split('.')[0]
452+
if top in self._resolved:
453+
return None
454+
if self._can_import(fullname):
455+
self._resolved.add(top)
456+
return None
457+
458+
dist = self._import_map.get(top, top)
459+
all_done = os.path.join(self._ready_dir, '.all_done')
460+
marker = os.path.join(self._ready_dir, dist)
461+
462+
# Unknown package not in our map — don't wait
463+
if dist == top and top not in self._import_map and self._import_map:
464+
norm = top.lower().replace('-', '_')
465+
known = any(v.lower().replace('-', '_') == norm for v in self._import_map.values())
466+
if not known:
467+
self._resolved.add(top)
468+
return None
469+
470+
t0 = time.monotonic()
471+
wait = 0.01
472+
while time.monotonic() - t0 < 300:
473+
if os.path.exists(marker) or os.path.exists(all_done):
474+
importlib.invalidate_caches()
475+
sys.path_importer_cache.clear()
476+
elapsed = time.monotonic() - t0
477+
if elapsed > 0.1:
478+
self._wait_times[top] = elapsed
479+
print(f' [zerostart] {{top}}: ready ({{elapsed:.1f}}s)', file=sys.stderr, flush=True)
480+
self._resolved.add(top)
481+
return None
482+
time.sleep(wait)
483+
wait = min(wait * 1.5, 0.2)
484+
485+
self._resolved.add(top)
486+
return None
487+
488+
_zs_hook = _ZSHook({ready_dir_repr}, {import_map_repr})
489+
sys.meta_path.insert(0, _zs_hook)
490+
491+
# --- Entry point discovery + execution ---
420492
sp = {sp_repr}
421493
target = {target_repr}
422494
args = {args_repr}
423495
sys.path.insert(0, sp)
496+
# Remove system dist-packages to prevent conflicts
497+
sys.path[:] = [p for p in sys.path if p == sp or 'dist-packages' not in p]
424498
425499
if target.endswith('.py') or Path(target).is_file():
426500
sys.argv = [target] + args
@@ -501,40 +575,35 @@ else:
501575
pass
502576
print(f"Error: no entry point found for '{{target}}'", file=sys.stderr)
503577
sys.exit(1)
578+
579+
# Print wait report
580+
if _zs_hook._wait_times:
581+
total = sum(_zs_hook._wait_times.values())
582+
print(f' [zerostart] total import wait: {{total:.1f}}s', file=sys.stderr, flush=True)
504583
"#,
584+
ready_dir_repr = format!("'{}'", ready_dir_str.replace('\'', "\\'")),
585+
import_map_repr = import_map_py,
505586
sp_repr = format!("'{}'", sp_str.replace('\'', "\\'")),
506587
target_repr = format!("'{}'", target.replace('\'', "\\'")),
507588
args_repr = args_str,
508589
);
509590

510-
// Spawn Python as child (not exec) so we can continue cache work
511591
let mut cmd = std::process::Command::new(python);
512592
cmd.env("PYTHONPATH", &pythonpath)
513593
.arg("-c")
514594
.arg(&script);
595+
515596
if !cuda_dirs.is_empty() {
516597
cmd.env("LD_LIBRARY_PATH", cuda_ld_library_path(cuda_dirs));
517598
}
518-
let mut child = match cmd.spawn() {
519-
Ok(c) => c,
599+
600+
match cmd.spawn() {
601+
Ok(child) => child,
520602
Err(e) => {
521603
eprintln!("Failed to spawn Python: {e}");
522604
std::process::exit(1);
523605
}
524-
};
525-
526-
// Wait for child and cache handle concurrently
527-
// The cache_handle is on the tokio runtime which is still alive
528-
let status = child.wait().unwrap_or_else(|e| {
529-
eprintln!("Failed to wait for Python: {e}");
530-
std::process::exit(1);
531-
});
532-
533-
// Don't wait for cache population — it's best-effort for future cold starts.
534-
// The script has already finished; exit with its status code.
535-
// Note: std::process::exit() will kill the cache thread, but that's OK —
536-
// partial cache is still useful and next cold start fills the rest.
537-
std::process::exit(status.code().unwrap_or(1));
606+
}
538607
}
539608

540609
/// Create a venv using uv if it doesn't exist.
@@ -1252,9 +1321,6 @@ async fn main() -> Result<()> {
12521321
}
12531322

12541323
// === COLD PATH — all in Rust ===
1255-
let mut needs_cache_populate = false;
1256-
let mut cache_handle: Option<tokio::task::JoinHandle<()>> = None;
1257-
12581324
if verbose {
12591325
eprintln!("Cache miss — resolving...");
12601326
}
@@ -1426,15 +1492,73 @@ async fn main() -> Result<()> {
14261492
if verbose {
14271493
eprintln!("Config: parallel_downloads={pd}, extract_threads={et}");
14281494
}
1495+
1496+
// === Progressive loading ===
1497+
// Start daemon in background, start Python immediately.
1498+
// Python imports block only until their specific package is extracted.
1499+
let ready_dir = tempfile::tempdir()
1500+
.context("failed to create ready dir")?
1501+
.keep();
1502+
1503+
// Build import map: import_name → distribution_name
1504+
let mut import_map = std::collections::HashMap::new();
1505+
for spec in &uncached_wheels {
1506+
for root in &spec.import_roots {
1507+
import_map.insert(root.clone(), spec.distribution.clone());
1508+
}
1509+
}
1510+
// Also add cached wheels to import map (they're already extracted)
1511+
for spec in &daemon_wheels {
1512+
for root in &spec.import_roots {
1513+
import_map.entry(root.clone())
1514+
.or_insert_with(|| spec.distribution.clone());
1515+
}
1516+
}
1517+
14291518
let config = DaemonConfig {
14301519
site_packages: site_packages.clone(),
14311520
parallel_downloads: pd,
14321521
extract_threads: et,
1522+
ready_dir: Some(ready_dir.clone()),
14331523
};
14341524

14351525
let wheels_to_cache: Vec<WheelSpec> = uncached_wheels.clone();
1436-
let engine = DaemonEngine::new(uncached_wheels);
1437-
engine.run(&config).await?;
1526+
let engine = std::sync::Arc::new(DaemonEngine::new(uncached_wheels));
1527+
1528+
// Wait for uv small install to finish before starting Python
1529+
if let Some(handle) = uv_handle {
1530+
handle.await??;
1531+
}
1532+
1533+
// Spawn daemon in background
1534+
let engine_bg = engine.clone();
1535+
let daemon_handle = tokio::spawn(async move {
1536+
engine_bg.run(&config).await
1537+
});
1538+
1539+
eprintln!("Starting {target} (packages installing in background)...");
1540+
1541+
// Start Python immediately with progressive loading hook
1542+
let mut child = spawn_python_progressive(
1543+
&python,
1544+
&site_packages,
1545+
&target,
1546+
&target_args,
1547+
&ready_dir,
1548+
&import_map,
1549+
&cuda_dirs,
1550+
);
1551+
1552+
// Wait for Python to finish
1553+
let py_status = child.wait().unwrap_or_else(|e| {
1554+
eprintln!("Failed to wait for Python: {e}");
1555+
std::process::exit(1);
1556+
});
1557+
1558+
// Wait for daemon to finish
1559+
if let Err(e) = daemon_handle.await? {
1560+
eprintln!("Warning: daemon error: {e}");
1561+
}
14381562

14391563
let (files, bytes) = engine.extract_stats();
14401564
if verbose {
@@ -1445,28 +1569,28 @@ async fn main() -> Result<()> {
14451569
);
14461570
}
14471571

1448-
// Schedule shared cache population — runs in background while script executes.
1449-
// Skip if ZS_NO_SHARED_CACHE=1.
1450-
if std::env::var("ZS_NO_SHARED_CACHE").is_ok() {
1451-
tracing::info!("Shared cache disabled via ZS_NO_SHARED_CACHE");
1452-
} else {
1453-
needs_cache_populate = true;
1572+
// Mark complete now that everything is extracted
1573+
std::fs::File::create(&complete_marker).ok();
1574+
1575+
// Populate shared cache in background (best-effort for future cold starts)
1576+
if std::env::var("ZS_NO_SHARED_CACHE").is_err() {
14541577
let sp_for_cache = site_packages.clone();
1455-
cache_handle = Some(tokio::task::spawn_blocking(move || {
1578+
tokio::task::spawn_blocking(move || {
14561579
if let Ok(avail) = available_disk_mb(&sp_for_cache) {
14571580
if avail < 2048 {
1458-
tracing::warn!(
1459-
"Skipping shared cache — only {}MB free",
1460-
avail
1461-
);
14621581
return;
14631582
}
14641583
}
14651584
for spec in &wheels_to_cache {
14661585
populate_shared_cache(spec, &sp_for_cache);
14671586
}
1468-
}));
1587+
});
14691588
}
1589+
1590+
// Clean up ready dir
1591+
let _ = std::fs::remove_dir_all(&ready_dir);
1592+
1593+
std::process::exit(py_status.code().unwrap_or(1));
14701594
}
14711595
}
14721596

@@ -1475,22 +1599,14 @@ async fn main() -> Result<()> {
14751599
handle.await??;
14761600
}
14771601

1478-
// Mark complete — no cache population needed.
1479-
// Warm path uses our venv directly, doesn't need uv's cache.
1602+
// Mark complete
14801603
std::fs::File::create(&complete_marker).ok();
14811604

14821605
if verbose {
14831606
eprintln!("Environment ready — starting {target}");
14841607
}
14851608

1486-
// If we have background cache work, spawn Python as child process
1487-
// so cache population can continue while the script runs.
1488-
// Otherwise, exec directly (more efficient, preserves signals).
1489-
if needs_cache_populate {
1490-
spawn_python_with_cache(&python, &site_packages, &target, &target_args, cache_handle, &cuda_dirs);
1491-
} else {
1492-
exec_python_with_cuda(&python, &site_packages, &target, &target_args, &cuda_dirs);
1493-
}
1609+
exec_python_with_cuda(&python, &site_packages, &target, &target_args, &cuda_dirs);
14941610
}
14951611

14961612
Command::Install {
@@ -1687,6 +1803,7 @@ async fn run_engine(
16871803
site_packages,
16881804
parallel_downloads,
16891805
extract_threads,
1806+
ready_dir: None,
16901807
};
16911808

16921809
let engine = DaemonEngine::new(wheels);

0 commit comments

Comments
 (0)