From bb8bf42df7a752a8c20bfaf2784d6a43c103cc37 Mon Sep 17 00:00:00 2001 From: branchseer Date: Fri, 17 Apr 2026 15:01:34 +0800 Subject: [PATCH 01/10] refactor(spawn): split execute layer into single-concern modules Decompose the ~200-line `spawn()` that mixed fspy/tokio command construction, Windows Job Object setup, FD_CLOEXEC workaround, stdio piping, and cancellation into four focused modules: - `spawn.rs`: only abstracts fspy vs tokio with a unified cancellation-aware `wait` future; returns `ChildHandle { stdout, stderr, wait }`. - `pipe.rs`: drains stdout/stderr with optional cache capture. - `tracked_accesses.rs`: normalizes raw fspy accesses to workspace-relative form. - `win_job.rs`: extracted from `mod.rs`. `execute_spawn` composes the pieces and measures duration end-to-end (spawn no longer tracks time). Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/vite_task/src/session/cache/mod.rs | 2 +- .../src/session/execute/fingerprint.rs | 5 +- crates/vite_task/src/session/execute/mod.rs | 365 +++++---------- crates/vite_task/src/session/execute/pipe.rs | 97 ++++ crates/vite_task/src/session/execute/spawn.rs | 442 ++++++------------ .../src/session/execute/tracked_accesses.rs | 122 +++++ .../vite_task/src/session/execute/win_job.rs | 84 ++++ 7 files changed, 562 insertions(+), 555 deletions(-) create mode 100644 crates/vite_task/src/session/execute/pipe.rs create mode 100644 crates/vite_task/src/session/execute/tracked_accesses.rs create mode 100644 crates/vite_task/src/session/execute/win_job.rs diff --git a/crates/vite_task/src/session/cache/mod.rs b/crates/vite_task/src/session/cache/mod.rs index 484d7d60..885dc97d 100644 --- a/crates/vite_task/src/session/cache/mod.rs +++ b/crates/vite_task/src/session/cache/mod.rs @@ -23,7 +23,7 @@ use wincode::{ io::{Reader, Writer}, }; -use super::execute::{fingerprint::PostRunFingerprint, spawn::StdOutput}; +use super::execute::{fingerprint::PostRunFingerprint, pipe::StdOutput}; /// Cache lookup key identifying a task's execution configuration. /// diff --git a/crates/vite_task/src/session/execute/fingerprint.rs b/crates/vite_task/src/session/execute/fingerprint.rs index dee12a89..d73a7f61 100644 --- a/crates/vite_task/src/session/execute/fingerprint.rs +++ b/crates/vite_task/src/session/execute/fingerprint.rs @@ -16,7 +16,7 @@ use vite_path::{AbsolutePath, RelativePathBuf}; use vite_str::Str; use wincode::{SchemaRead, SchemaWrite}; -use super::spawn::PathRead; +use super::tracked_accesses::PathRead; use crate::{collections::HashMap, session::cache::InputChangeKind}; /// Post-run fingerprint capturing file state after execution. @@ -53,7 +53,8 @@ pub enum DirEntryKind { impl PostRunFingerprint { /// Creates a new fingerprint from path accesses after task execution. /// - /// Negative glob filtering is done upstream in `spawn_with_tracking`. + /// Negative glob filtering is done upstream (see + /// [`super::tracked_accesses::TrackedPathAccesses::from_raw`]). /// Paths already present in `globbed_inputs` are skipped — they are /// already tracked by the prerun glob fingerprint, and the read-write /// overlap check in `execute_spawn` guarantees the task did not modify diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index 9d5bf8e4..f0c96f81 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -1,9 +1,13 @@ pub mod fingerprint; pub mod glob_inputs; mod hash; +pub mod pipe; pub mod spawn; +pub mod tracked_accesses; +#[cfg(windows)] +mod win_job; -use std::{cell::RefCell, collections::BTreeMap, io::Write as _, process::Stdio, sync::Arc}; +use std::{cell::RefCell, collections::BTreeMap, io::Write as _, sync::Arc, time::Instant}; use futures_util::{FutureExt, StreamExt, future::LocalBoxFuture, stream::FuturesUnordered}; use petgraph::Direction; @@ -12,14 +16,16 @@ use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; use vite_path::AbsolutePath; use vite_task_plan::{ - ExecutionGraph, ExecutionItemDisplay, ExecutionItemKind, LeafExecutionKind, SpawnCommand, - SpawnExecution, execution_graph::ExecutionNodeIndex, + ExecutionGraph, ExecutionItemDisplay, ExecutionItemKind, LeafExecutionKind, SpawnExecution, + execution_graph::ExecutionNodeIndex, }; use self::{ fingerprint::PostRunFingerprint, glob_inputs::compute_globbed_inputs, - spawn::{SpawnResult, TrackedPathAccesses, spawn_with_tracking}, + pipe::{StdOutput, pipe_stdio}, + spawn::{SpawnStdio, spawn}, + tracked_accesses::TrackedPathAccesses, }; use super::{ cache::{CacheEntryValue, ExecutionCache}, @@ -335,8 +341,8 @@ pub async fn execute_spawn( if let Some(cached) = cached_value { for output in cached.std_outputs.iter() { let writer: &mut dyn std::io::Write = match output.kind { - spawn::OutputKind::StdOut => &mut stdio_config.stdout_writer, - spawn::OutputKind::StdErr => &mut stdio_config.stderr_writer, + pipe::OutputKind::StdOut => &mut stdio_config.stdout_writer, + pipe::OutputKind::StdErr => &mut stdio_config.stderr_writer, }; let _ = writer.write_all(&output.content); let _ = writer.flush(); @@ -349,96 +355,61 @@ pub async fn execute_spawn( return SpawnOutcome::CacheHit; } - // 4. Determine actual stdio mode based on the suggestion AND cache state. - // Inherited stdio is only used when the reporter suggests it AND caching is - // completely disabled (no cache_metadata). If caching is enabled but missed, - // we still need piped mode to capture output for the cache update. - let use_inherited = - stdio_config.suggestion == StdioSuggestion::Inherited && cache_metadata.is_none(); - - if use_inherited { - // Inherited mode: all three stdio FDs (stdin, stdout, stderr) are inherited - // from the parent process. No fspy tracking, no output capture. - // Drop the StdioConfig writers before spawning to avoid holding std::io::Stdout - // while the child also writes to the same FD. - drop(stdio_config); - - match spawn_inherited(&spawn_execution.spawn_command, fast_fail_token).await { - Ok(result) => { - leaf_reporter.finish( - Some(result.exit_status), - CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), - None, - ); - return SpawnOutcome::Spawned(result.exit_status); - } + // 4. Decide fspy and stdio mode independently. + // - fspy tracking runs only when caching is on and `includes_auto` is set. + // - Piped stdio is forced whenever caching is on (we need to capture output + // for replay); otherwise we honor the reporter's suggestion. + let fspy_enabled = cache_metadata.is_some_and(|m| m.input_config.includes_auto); + let use_piped = cache_metadata.is_some() || stdio_config.suggestion == StdioSuggestion::Piped; + + // Build negative globs for fspy path filtering (already workspace-root-relative). + let resolved_negatives: Vec> = if let Some(cache_metadata) = cache_metadata { + match cache_metadata + .input_config + .negative_globs + .iter() + .map(|p| Ok(wax::Glob::new(p.as_str())?.into_owned())) + .collect::>>() + { + Ok(negs) => negs, Err(err) => { leaf_reporter.finish( None, CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), - Some(ExecutionError::Spawn(err)), + Some(ExecutionError::PostRunFingerprint(err)), ); return SpawnOutcome::Failed; } } - } + } else { + Vec::new() + }; - // 5. Piped mode: execute spawn with tracking, streaming output to writers. - // - std_outputs: always captured when caching is enabled (for cache replay) - // - path_accesses: only tracked when includes_auto is true (fspy inference) - let (mut std_outputs, mut path_accesses, cache_metadata_and_inputs) = - cache_metadata.map_or((None, None, None), |cache_metadata| { - // On musl targets, LD_PRELOAD-based tracking is unavailable but seccomp - // unotify provides equivalent file access tracing. - let path_accesses = if cache_metadata.input_config.includes_auto { - Some(TrackedPathAccesses::default()) - } else { - None // Skip fspy when inference is disabled or unavailable - }; - (Some(Vec::new()), path_accesses, Some((cache_metadata, globbed_inputs))) - }); - - // Build negative globs for fspy path filtering (already workspace-root-relative) - let resolved_negatives: Vec> = - if let Some((cache_metadata, _)) = &cache_metadata_and_inputs { - match cache_metadata - .input_config - .negative_globs - .iter() - .map(|p| Ok(wax::Glob::new(p.as_str())?.into_owned())) - .collect::>>() - { - Ok(negs) => negs, - Err(err) => { - leaf_reporter.finish( - None, - CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), - Some(ExecutionError::PostRunFingerprint(err)), - ); - return SpawnOutcome::Failed; - } - } - } else { - Vec::new() - }; + // Capture stdout/stderr when caching is enabled (for cache replay). + let mut std_outputs: Option> = cache_metadata.map(|_| Vec::new()); + + // In inherited mode we must release the StdioConfig writers before spawning + // so we don't hold `std::io::Stdout` while the child writes to the same FD. + // `bool::then(|| stdio_config)` moves stdio_config into the closure; when + // `use_piped` is false, `then` returns `None` and the closure (and its + // captured stdio_config) is dropped here. + let stdio_config = use_piped.then_some(stdio_config); + + // Measure end-to-end duration here — spawn() no longer tracks time. + let start = Instant::now(); - #[expect( - clippy::large_futures, - reason = "spawn_with_tracking manages process I/O and creates a large future" - )] - let result = match spawn_with_tracking( + // 5. Spawn. Returns pipes (Piped) or `None` (Inherited) plus a + // cancellation-aware wait future. + let spawn_stdio = if use_piped { SpawnStdio::Piped } else { SpawnStdio::Inherited }; + let mut child = match spawn( &spawn_execution.spawn_command, - cache_base_path, - &mut *stdio_config.stdout_writer, - &mut *stdio_config.stderr_writer, - std_outputs.as_mut(), - path_accesses.as_mut(), - &resolved_negatives, + fspy_enabled, + spawn_stdio, fast_fail_token.clone(), ) .await { - Ok(result) => result, + Ok(child) => child, Err(err) => { leaf_reporter.finish( None, @@ -449,6 +420,57 @@ pub async fn execute_spawn( } }; + // 6. If piped, drain stdout/stderr concurrently, capturing for cache replay. + if let Some(mut stdio_config) = stdio_config { + let stdout = child.stdout.take().expect("SpawnStdio::Piped yields a stdout pipe"); + let stderr = child.stderr.take().expect("SpawnStdio::Piped yields a stderr pipe"); + #[expect( + clippy::large_futures, + reason = "pipe_stdio streams child I/O and creates a large future" + )] + let pipe_result = pipe_stdio( + stdout, + stderr, + &mut *stdio_config.stdout_writer, + &mut *stdio_config.stderr_writer, + std_outputs.as_mut(), + fast_fail_token.clone(), + ) + .await; + if let Err(err) = pipe_result { + // Cancel so `child.wait` kills the child instead of orphaning it. + fast_fail_token.cancel(); + let _ = child.wait.await; + leaf_reporter.finish( + None, + CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), + Some(ExecutionError::Spawn(err.into())), + ); + return SpawnOutcome::Failed; + } + } + + // 7. Wait for exit (handles cancellation internally). + let outcome = match child.wait.await { + Ok(outcome) => outcome, + Err(err) => { + leaf_reporter.finish( + None, + CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), + Some(ExecutionError::Spawn(err.into())), + ); + return SpawnOutcome::Failed; + } + }; + let duration = start.elapsed(); + + // Normalize fspy accesses (workspace-relative, negative-glob filtered). + let path_accesses = outcome + .path_accesses + .as_ref() + .map(|raw| TrackedPathAccesses::from_raw(raw, cache_base_path, &resolved_negatives)); + let cache_metadata_and_inputs = cache_metadata.map(|cm| (cm, globbed_inputs)); + // 6. Update cache if successful and determine cache update status. // Errors during cache update are terminal (reported through finish). let (cache_update_status, cache_error) = if let Some((cache_metadata, globbed_inputs)) = @@ -458,7 +480,7 @@ pub async fn execute_spawn( if cancelled { // Cancelled (Ctrl-C or sibling failure) — result is untrustworthy (CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::Cancelled), None) - } else if result.exit_status.success() { + } else if outcome.exit_status.success() { // Check for read-write overlap: if the task wrote to any file it also // read, the inputs were modified during execution — don't cache. // Note: this only checks fspy-inferred reads, not globbed_inputs keys. @@ -489,7 +511,7 @@ pub async fn execute_spawn( let new_cache_value = CacheEntryValue { post_run_fingerprint, std_outputs: std_outputs.unwrap_or_default().into(), - duration: result.duration, + duration, globbed_inputs, }; match cache.update(cache_metadata, new_cache_value).await { @@ -521,182 +543,9 @@ pub async fn execute_spawn( // 7. Finish the leaf execution with the result and optional cache error. // Cache update/fingerprint failures are reported but do not affect the outcome — // the process ran, so we return its actual exit status. - leaf_reporter.finish(Some(result.exit_status), cache_update_status, cache_error); + leaf_reporter.finish(Some(outcome.exit_status), cache_update_status, cache_error); - SpawnOutcome::Spawned(result.exit_status) -} - -/// Spawn a command with all three stdio file descriptors inherited from the parent. -/// -/// Used when the reporter suggests inherited stdio AND caching is disabled. -/// All three FDs (stdin, stdout, stderr) are inherited, allowing interactive input -/// and direct terminal output. No fspy tracking is performed since there's no -/// cache to update. -/// -/// The child process will see `is_terminal() == true` for stdout/stderr when the -/// parent is running in a terminal. This is expected behavior. -#[tracing::instrument(level = "debug", skip_all)] -async fn spawn_inherited( - spawn_command: &SpawnCommand, - fast_fail_token: CancellationToken, -) -> anyhow::Result { - let mut cmd = fspy::Command::new(spawn_command.program_path.as_path()); - cmd.args(spawn_command.args.iter().map(vite_str::Str::as_str)); - cmd.envs(spawn_command.all_envs.iter()); - cmd.current_dir(&*spawn_command.cwd); - cmd.stdin(Stdio::inherit()).stdout(Stdio::inherit()).stderr(Stdio::inherit()); - - let start = std::time::Instant::now(); - let mut tokio_cmd = cmd.into_tokio_command(); - - // Clear FD_CLOEXEC on stdio fds before exec. libuv (used by Node.js) marks - // stdin/stdout/stderr as close-on-exec, which causes them to be closed when - // the child process calls exec(). Without this fix, the child's fds 0-2 are - // closed after exec and Node.js reopens them as /dev/null, losing all output. - // See: https://github.com/libuv/libuv/issues/2062 - // SAFETY: The pre_exec closure only performs fcntl operations to clear - // FD_CLOEXEC flags on stdio fds, which is safe in a post-fork context. - #[cfg(unix)] - unsafe { - tokio_cmd.pre_exec(|| { - use std::os::fd::BorrowedFd; - - use nix::{ - fcntl::{FcntlArg, FdFlag, fcntl}, - libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO}, - }; - for fd in [STDIN_FILENO, STDOUT_FILENO, STDERR_FILENO] { - // SAFETY: fds 0-2 are always valid in a post-fork context - let borrowed = BorrowedFd::borrow_raw(fd); - if let Ok(flags) = fcntl(borrowed, FcntlArg::F_GETFD) { - let mut fd_flags = FdFlag::from_bits_retain(flags); - if fd_flags.contains(FdFlag::FD_CLOEXEC) { - fd_flags.remove(FdFlag::FD_CLOEXEC); - let _ = fcntl(borrowed, FcntlArg::F_SETFD(fd_flags)); - } - } - } - Ok(()) - }); - } - - let mut child = tokio_cmd.spawn()?; - - // On Windows, assign the child to a Job Object with KILL_ON_JOB_CLOSE so that - // all descendant processes (e.g., node.exe spawned by a .cmd shim) are killed - // when the job handle is dropped. Without this, TerminateProcess only kills the - // direct child, leaving grandchildren alive. - #[cfg(windows)] - let _job = { - use std::os::windows::io::{AsRawHandle, BorrowedHandle}; - // Duplicate the process handle so the job outlives tokio's handle. - // SAFETY: The child was just spawned, so its raw handle is valid. - let borrowed = unsafe { BorrowedHandle::borrow_raw(child.raw_handle().unwrap()) }; - let owned = borrowed.try_clone_to_owned()?; - win_job::assign_to_kill_on_close_job(owned.as_raw_handle())? - }; - - let exit_status = tokio::select! { - status = child.wait() => status?, - () = fast_fail_token.cancelled() => { - child.start_kill()?; - child.wait().await? - } - }; - - Ok(SpawnResult { exit_status, duration: start.elapsed() }) -} - -/// Win32 Job Object utilities for process tree management. -/// -/// On Windows, `TerminateProcess` only kills the direct child process, not its -/// descendants. This module creates a Job Object with `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE`, -/// which automatically terminates all processes in the job when the handle is dropped. -#[cfg(windows)] -mod win_job { - use std::{io, os::windows::io::RawHandle}; - - use winapi::{ - shared::minwindef::FALSE, - um::{ - handleapi::CloseHandle, - jobapi2::{ - AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject, - TerminateJobObject, - }, - winnt::{ - HANDLE, JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, JOBOBJECT_EXTENDED_LIMIT_INFORMATION, - }, - }, - }; - - /// RAII wrapper around a Win32 Job Object `HANDLE` that closes it on drop. - pub(super) struct OwnedJobHandle(HANDLE); - - impl OwnedJobHandle { - /// Immediately terminate all processes in the job. - /// - /// This is needed when pipes to a grandchild process must be closed before - /// the job handle is dropped (e.g., to unblock pipe reads in `spawn_with_tracking`). - pub(super) fn terminate(&self) { - // SAFETY: self.0 is a valid job handle from CreateJobObjectW. - unsafe { TerminateJobObject(self.0, 1) }; - } - } - - impl Drop for OwnedJobHandle { - fn drop(&mut self) { - // SAFETY: self.0 is a valid handle obtained from CreateJobObjectW. - unsafe { CloseHandle(self.0) }; - } - } - - /// Create a Job Object with `KILL_ON_JOB_CLOSE` and assign a process to it. - /// - /// Returns the job handle wrapped in an RAII guard. When dropped, all processes - /// in the job (the child and its descendants) are terminated. - pub(super) fn assign_to_kill_on_close_job( - process_handle: RawHandle, - ) -> io::Result { - // SAFETY: Creating an anonymous job object with no security attributes. - let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) }; - if job.is_null() { - return Err(io::Error::last_os_error()); - } - let job = OwnedJobHandle(job); - - // Configure the job to kill all processes when the handle is closed. - // SAFETY: JOBOBJECT_EXTENDED_LIMIT_INFORMATION is a plain C struct (no pointers - // in the zeroed fields). Zeroing then setting LimitFlags is the standard pattern. - let mut info = unsafe { - let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = std::mem::zeroed(); - info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; - info - }; - - // SAFETY: info is a valid JOBOBJECT_EXTENDED_LIMIT_INFORMATION, job.0 is a valid handle. - let ok = unsafe { - SetInformationJobObject( - job.0, - // JobObjectExtendedLimitInformation = 9 - 9, - std::ptr::from_mut(&mut info).cast(), - std::mem::size_of::().try_into().unwrap(), - ) - }; - if ok == FALSE { - return Err(io::Error::last_os_error()); - } - - // SAFETY: Both handles are valid — job from CreateJobObjectW, process handle - // from the caller. - let ok = unsafe { AssignProcessToJobObject(job.0, process_handle as HANDLE) }; - if ok == FALSE { - return Err(io::Error::last_os_error()); - } - - Ok(job) - } + SpawnOutcome::Spawned(outcome.exit_status) } impl Session<'_> { diff --git a/crates/vite_task/src/session/execute/pipe.rs b/crates/vite_task/src/session/execute/pipe.rs new file mode 100644 index 00000000..4cec668d --- /dev/null +++ b/crates/vite_task/src/session/execute/pipe.rs @@ -0,0 +1,97 @@ +//! Drain child stdout/stderr concurrently to writers, with optional capture. + +use std::io::Write; + +use serde::Serialize; +use tokio::{ + io::AsyncReadExt as _, + process::{ChildStderr, ChildStdout}, +}; +use tokio_util::sync::CancellationToken; +use wincode::{SchemaRead, SchemaWrite}; + +/// Output kind for stdout/stderr +#[derive(Debug, PartialEq, Eq, Clone, Copy, SchemaWrite, SchemaRead, Serialize)] +pub enum OutputKind { + StdOut, + StdErr, +} + +/// Output chunk with stream kind +#[derive(Debug, SchemaWrite, SchemaRead, Serialize, Clone)] +pub struct StdOutput { + pub kind: OutputKind, + pub content: Vec, +} + +/// Drain the child's stdout/stderr concurrently. +/// +/// Bytes are written through to `stdout_writer` / `stderr_writer` in real time +/// and, when `capture` is provided, also appended with adjacent same-kind chunks +/// coalesced (for cache replay). +/// +/// On cancellation: returns `Ok(())` without killing the child — the caller +/// drives the child's cancellation-aware `wait` future next, which observes the +/// same already-fired token and performs the kill. Dropping `stdout`/`stderr` +/// closes the pipe read ends (EPIPE on Unix, `ERROR_BROKEN_PIPE` on Windows). +#[tracing::instrument(level = "debug", skip_all)] +pub async fn pipe_stdio( + mut stdout: ChildStdout, + mut stderr: ChildStderr, + stdout_writer: &mut dyn Write, + stderr_writer: &mut dyn Write, + mut capture: Option<&mut Vec>, + cancellation_token: CancellationToken, +) -> std::io::Result<()> { + let mut stdout_buf = [0u8; 8192]; + let mut stderr_buf = [0u8; 8192]; + let mut stdout_done = false; + let mut stderr_done = false; + + loop { + if stdout_done && stderr_done { + return Ok(()); + } + tokio::select! { + result = stdout.read(&mut stdout_buf), if !stdout_done => { + match result? { + 0 => stdout_done = true, + n => { + let bytes = &stdout_buf[..n]; + stdout_writer.write_all(bytes)?; + stdout_writer.flush()?; + if let Some(capture) = capture.as_deref_mut() { + append_output_chunk(capture, OutputKind::StdOut, bytes); + } + } + } + } + result = stderr.read(&mut stderr_buf), if !stderr_done => { + match result? { + 0 => stderr_done = true, + n => { + let bytes = &stderr_buf[..n]; + stderr_writer.write_all(bytes)?; + stderr_writer.flush()?; + if let Some(capture) = capture.as_deref_mut() { + append_output_chunk(capture, OutputKind::StdErr, bytes); + } + } + } + } + () = cancellation_token.cancelled() => { + return Ok(()); + } + } + } +} + +fn append_output_chunk(capture: &mut Vec, kind: OutputKind, bytes: &[u8]) { + if let Some(last) = capture.last_mut() + && last.kind == kind + { + last.content.extend_from_slice(bytes); + } else { + capture.push(StdOutput { kind, content: bytes.to_vec() }); + } +} diff --git a/crates/vite_task/src/session/execute/spawn.rs b/crates/vite_task/src/session/execute/spawn.rs index 92da92b1..537cea5b 100644 --- a/crates/vite_task/src/session/execute/spawn.rs +++ b/crates/vite_task/src/session/execute/spawn.rs @@ -1,331 +1,185 @@ -//! Process spawning with file system tracking via fspy. +//! Unified spawn abstraction over fspy and plain tokio processes. +//! +//! [`spawn`] does one thing: hand back the child's stdio pipes plus a +//! cancellation-aware `wait` future. Draining the pipes is [`super::pipe`]'s +//! job; normalizing fspy path accesses is [`super::tracked_accesses`]'s. -use std::{ - collections::hash_map::Entry, - io::Write, - process::{ExitStatus, Stdio}, - time::{Duration, Instant}, -}; +use std::{io, process::Stdio}; -use fspy::AccessMode; -use rustc_hash::FxHashSet; -use serde::Serialize; -use tokio::io::AsyncReadExt as _; +use fspy::PathAccessIterable; +use futures_util::{FutureExt, future::BoxFuture}; +use tokio::process::{ChildStderr, ChildStdout}; use tokio_util::sync::CancellationToken; -use vite_path::{AbsolutePath, RelativePathBuf}; use vite_task_plan::SpawnCommand; -use wax::Program as _; -use wincode::{SchemaRead, SchemaWrite}; -use crate::collections::HashMap; - -/// Path read access info -#[derive(Debug, Clone, Copy)] -pub struct PathRead { - pub read_dir_entries: bool, -} - -/// Output kind for stdout/stderr -#[derive(Debug, PartialEq, Eq, Clone, Copy, SchemaWrite, SchemaRead, Serialize)] -pub enum OutputKind { - StdOut, - StdErr, +/// How the child's stdin/stdout/stderr are configured. +pub enum SpawnStdio { + /// All three fds inherited from the parent. On Unix, [`spawn`] also clears + /// `FD_CLOEXEC` on fds 0-2 (libuv workaround — + /// ). + Inherited, + /// stdin is `/dev/null`; stdout and stderr are piped. Drain the pipes with + /// [`super::pipe::pipe_stdio`]. + Piped, } -/// Output chunk with stream kind -#[derive(Debug, SchemaWrite, SchemaRead, Serialize, Clone)] -pub struct StdOutput { - pub kind: OutputKind, - pub content: Vec, +/// Handle to a spawned child. +/// +/// `stdout` and `stderr` are `Some` iff [`SpawnStdio::Piped`] was requested. +/// `wait` resolves when the child exits and handles cancellation internally: +/// when the token fires, the child (and on Windows its descendants via the Job +/// Object) is killed before the future resolves. +pub struct ChildHandle { + pub stdout: Option, + pub stderr: Option, + pub wait: BoxFuture<'static, io::Result>, } -/// Result of spawning a process with file tracking -#[derive(Debug)] -pub struct SpawnResult { - pub exit_status: ExitStatus, - pub duration: Duration, +/// Result of waiting for a child to exit. +pub struct ChildOutcome { + pub exit_status: std::process::ExitStatus, + /// Raw fspy accesses. `Some` iff `fspy` was `true` at spawn time. + pub path_accesses: Option, } -/// Tracked file accesses from fspy. -/// Only populated when fspy tracking is enabled (`includes_auto` is true). -#[derive(Default, Debug)] -pub struct TrackedPathAccesses { - /// Tracked path reads - pub path_reads: HashMap, +/// Spawn a command with the requested fspy and stdio configuration. +/// +/// Cancellation is unified: whether fspy is enabled or not, the returned `wait` +/// future observes `cancellation_token` and kills the child before resolving. +#[tracing::instrument(level = "debug", skip_all)] +pub async fn spawn( + cmd: &SpawnCommand, + fspy: bool, + stdio: SpawnStdio, + cancellation_token: CancellationToken, +) -> anyhow::Result { + let mut fspy_cmd = fspy::Command::new(cmd.program_path.as_path()); + fspy_cmd.args(cmd.args.iter().map(vite_str::Str::as_str)); + fspy_cmd.envs(cmd.all_envs.iter()); + fspy_cmd.current_dir(&*cmd.cwd); + + match stdio { + SpawnStdio::Inherited => { + fspy_cmd.stdin(Stdio::inherit()).stdout(Stdio::inherit()).stderr(Stdio::inherit()); + // libuv (used by Node.js) marks stdin/stdout/stderr as close-on-exec; + // without this fix the child reopens fds 0-2 as /dev/null after exec. + // See: https://github.com/libuv/libuv/issues/2062 + // SAFETY: the pre_exec closure only performs fcntl operations on + // stdio fds, which is safe in a post-fork context. + #[cfg(unix)] + unsafe { + fspy_cmd.pre_exec(clear_stdio_cloexec); + } + } + SpawnStdio::Piped => { + fspy_cmd.stdin(Stdio::null()).stdout(Stdio::piped()).stderr(Stdio::piped()); + } + } - /// Tracked path writes - pub path_writes: FxHashSet, + if fspy { + spawn_fspy(fspy_cmd, cancellation_token).await + } else { + spawn_tokio(fspy_cmd, cancellation_token) + } } -#[expect( - clippy::disallowed_types, - reason = "fspy strip_path_prefix exposes std::path::Path; convert to RelativePathBuf immediately" -)] -fn normalize_tracked_workspace_path( - stripped_path: &std::path::Path, - resolved_negatives: &[wax::Glob<'static>], -) -> Option { - // On Windows, paths are possible to be still absolute after stripping the workspace root. - // For example: c:\workspace\subdir\c:\workspace\subdir - // Just ignore those accesses. - let relative = RelativePathBuf::new(stripped_path).ok()?; +async fn spawn_fspy( + cmd: fspy::Command, + cancellation_token: CancellationToken, +) -> anyhow::Result { + let mut tracked = cmd.spawn(cancellation_token).await?; - // Clean `..` components — fspy may report paths like - // `packages/sub-pkg/../shared/dist/output.js`. Normalize them for - // consistent behavior across platforms and clean user-facing messages. - let relative = relative.clean().ok()?; + // On Windows, assign the child to a Job Object so that killing the child + // also kills all descendant processes (e.g., node.exe via a .cmd shim). + #[cfg(windows)] + let job = { + use std::os::windows::io::AsRawHandle; + super::win_job::assign_to_kill_on_close_job(tracked.process_handle.as_raw_handle())? + }; - // Skip .git directory accesses (workaround for tools like oxlint) - if relative.as_path().strip_prefix(".git").is_ok() { - return None; - } + let stdout = tracked.stdout.take(); + let stderr = tracked.stderr.take(); + let wait_handle = tracked.wait_handle; - if !resolved_negatives.is_empty() - && resolved_negatives.iter().any(|neg| neg.is_match(relative.as_str())) - { - return None; + let wait = async move { + let termination = wait_handle.await?; + // Drop order: `job` drops here, KILL_ON_JOB_CLOSE kills any descendants + // still alive. fspy's wait handle already watched the cancellation + // token and killed the direct child. + #[cfg(windows)] + drop(job); + Ok(ChildOutcome { + exit_status: termination.status, + path_accesses: Some(termination.path_accesses), + }) } + .boxed(); - Some(relative) -} - -/// How the child process is awaited after stdout/stderr are drained. -enum ChildWait { - /// fspy tracking enabled — fspy manages cancellation internally. - Fspy(fspy::TrackedChild), - - /// Plain tokio process — cancellation is handled in the pipe read loop. - Tokio(tokio::process::Child), + Ok(ChildHandle { stdout, stderr, wait }) } -/// Spawn a command with optional file system tracking via fspy, using piped stdio. -/// -/// Returns the execution result including exit status and duration. -/// -/// - stdin is always `/dev/null` (piped mode is for non-interactive execution). -/// - `stdout_writer`/`stderr_writer` receive the child's stdout/stderr output in real-time. -/// - `std_outputs` if provided, will be populated with captured outputs for cache replay. -/// - `path_accesses` if provided, fspy will be used to track file accesses. If `None`, fspy is disabled. -/// - `resolved_negatives` - resolved negative glob patterns for filtering fspy-tracked paths. -#[tracing::instrument(level = "debug", skip_all)] -#[expect( - clippy::too_many_lines, - reason = "spawn logic is inherently sequential and splitting would reduce clarity" -)] -#[expect( - clippy::too_many_arguments, - reason = "spawn parameters are all distinct concerns that don't form a natural group" -)] -pub async fn spawn_with_tracking( - spawn_command: &SpawnCommand, - workspace_root: &AbsolutePath, - stdout_writer: &mut dyn Write, - stderr_writer: &mut dyn Write, - std_outputs: Option<&mut Vec>, - path_accesses: Option<&mut TrackedPathAccesses>, - resolved_negatives: &[wax::Glob<'static>], - fast_fail_token: CancellationToken, -) -> anyhow::Result { - let mut cmd = fspy::Command::new(spawn_command.program_path.as_path()); - cmd.args(spawn_command.args.iter().map(vite_str::Str::as_str)); - cmd.envs(spawn_command.all_envs.iter()); - cmd.current_dir(&*spawn_command.cwd); - cmd.stdin(Stdio::null()).stdout(Stdio::piped()).stderr(Stdio::piped()); +fn spawn_tokio( + cmd: fspy::Command, + cancellation_token: CancellationToken, +) -> anyhow::Result { + let mut child = cmd.into_tokio_command().spawn()?; - // On Windows, assign the child to a Job Object so that killing the child also - // kills all descendant processes (e.g., node.exe spawned by a .cmd shim). #[cfg(windows)] - let job; - - let (mut child_stdout, mut child_stderr, mut child_wait) = if path_accesses.is_some() { - // fspy tracking enabled — fspy manages cancellation internally via a clone - // of the token. We keep the original for the pipe read loop. - let mut tracked_child = cmd.spawn(fast_fail_token.clone()).await?; - let stdout = tracked_child.stdout.take().unwrap(); - let stderr = tracked_child.stderr.take().unwrap(); - #[cfg(windows)] - { - use std::os::windows::io::AsRawHandle; - job = super::win_job::assign_to_kill_on_close_job( - tracked_child.process_handle.as_raw_handle(), - )?; - } - (stdout, stderr, ChildWait::Fspy(tracked_child)) - } else { - let mut child = cmd.into_tokio_command().spawn()?; - let stdout = child.stdout.take().unwrap(); - let stderr = child.stderr.take().unwrap(); - #[cfg(windows)] - { - use std::os::windows::io::{AsRawHandle, BorrowedHandle}; - // SAFETY: The child was just spawned, so its raw handle is valid. - let borrowed = unsafe { BorrowedHandle::borrow_raw(child.raw_handle().unwrap()) }; - let owned = borrowed.try_clone_to_owned()?; - job = super::win_job::assign_to_kill_on_close_job(owned.as_raw_handle())?; - } - (stdout, stderr, ChildWait::Tokio(child)) + let job = { + use std::os::windows::io::{AsRawHandle, BorrowedHandle}; + // Duplicate the process handle so the job outlives tokio's handle. + // SAFETY: The child was just spawned, so its raw handle is valid. + let borrowed = unsafe { BorrowedHandle::borrow_raw(child.raw_handle().unwrap()) }; + let owned = borrowed.try_clone_to_owned()?; + super::win_job::assign_to_kill_on_close_job(owned.as_raw_handle())? }; - // Output capturing is independent of fspy tracking - let mut outputs = std_outputs; - let mut stdout_buf = [0u8; 8192]; - let mut stderr_buf = [0u8; 8192]; - let mut stdout_done = false; - let mut stderr_done = false; - - let start = Instant::now(); + let stdout = child.stdout.take(); + let stderr = child.stderr.take(); - // Read from both stdout and stderr concurrently using select! - // Cancellation is handled directly in the loop: kill the child process (and - // on Windows, terminate the Job Object to kill grandchildren holding pipes). - loop { - if stdout_done && stderr_done { - break; - } - tokio::select! { - result = child_stdout.read(&mut stdout_buf), if !stdout_done => { - match result? { - 0 => stdout_done = true, - n => { - let content = stdout_buf[..n].to_vec(); - // Write to the sync writer immediately - stdout_writer.write_all(&content)?; - stdout_writer.flush()?; - // Store outputs for caching - if let Some(outputs) = &mut outputs { - if let Some(last) = outputs.last_mut() - && last.kind == OutputKind::StdOut - { - last.content.extend(&content); - } else { - outputs.push(StdOutput { kind: OutputKind::StdOut, content }); - } - } - } - } - } - result = child_stderr.read(&mut stderr_buf), if !stderr_done => { - match result? { - 0 => stderr_done = true, - n => { - let content = stderr_buf[..n].to_vec(); - // Write to the sync writer immediately - stderr_writer.write_all(&content)?; - stderr_writer.flush()?; - // Store outputs for caching - if let Some(outputs) = &mut outputs { - if let Some(last) = outputs.last_mut() - && last.kind == OutputKind::StdErr - { - last.content.extend(&content); - } else { - outputs.push(StdOutput { kind: OutputKind::StdErr, content }); - } - } - } - } - } - () = fast_fail_token.cancelled() => { - // Kill the direct child (no-op for fspy which handles it internally). - if let ChildWait::Tokio(ref mut child) = child_wait { - let _ = child.start_kill(); - } - // On Windows, terminate the entire process tree so grandchild - // processes release their pipe handles. + let wait = async move { + let exit_status = tokio::select! { + status = child.wait() => status?, + () = cancellation_token.cancelled() => { + child.start_kill()?; + // Eagerly kill descendants; KILL_ON_JOB_CLOSE on drop is a backstop. #[cfg(windows)] job.terminate(); - break; + child.wait().await? } - } + }; + // `job` drops here on Windows, terminating any stragglers. + #[cfg(windows)] + drop(job); + Ok(ChildOutcome { exit_status, path_accesses: None }) } + .boxed(); - // Wait for process termination and collect results. - // The child may have closed its pipes without exiting (e.g., daemonized), - // so we still need a cancellation arm here. - match child_wait { - ChildWait::Fspy(tracked_child) => { - // fspy's wait_handle already monitors the cancellation token internally, - // so no additional select! is needed here. - let termination = tracked_child.wait_handle.await?; - let duration = start.elapsed(); - - // path_accesses must be Some when fspy is enabled (they're set together) - let path_accesses = path_accesses.ok_or_else(|| { - anyhow::anyhow!("internal error: fspy enabled but path_accesses is None") - })?; - let path_reads = &mut path_accesses.path_reads; - let path_writes = &mut path_accesses.path_writes; - - for access in termination.path_accesses.iter() { - // Strip workspace root, clean `..` components, and filter in one pass. - // fspy may report paths like `packages/sub-pkg/../shared/dist/output.js`. - let relative_path = access.path.strip_path_prefix(workspace_root, |strip_result| { - let Ok(stripped_path) = strip_result else { - return None; - }; - normalize_tracked_workspace_path(stripped_path, resolved_negatives) - }); + Ok(ChildHandle { stdout, stderr, wait }) +} - let Some(relative_path) = relative_path else { - continue; - }; +#[cfg(unix)] +#[expect( + clippy::unnecessary_wraps, + reason = "signature matches Command::pre_exec's FnMut() -> io::Result<()> contract" +)] +fn clear_stdio_cloexec() -> io::Result<()> { + use std::os::fd::BorrowedFd; - if access.mode.contains(AccessMode::READ) { - path_reads - .entry(relative_path.clone()) - .or_insert(PathRead { read_dir_entries: false }); - } - if access.mode.contains(AccessMode::WRITE) { - path_writes.insert(relative_path.clone()); - } - if access.mode.contains(AccessMode::READ_DIR) { - match path_reads.entry(relative_path) { - Entry::Occupied(mut occupied) => { - occupied.get_mut().read_dir_entries = true; - } - Entry::Vacant(vacant) => { - vacant.insert(PathRead { read_dir_entries: true }); - } - } - } + use nix::{ + fcntl::{FcntlArg, FdFlag, fcntl}, + libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO}, + }; + for fd in [STDIN_FILENO, STDOUT_FILENO, STDERR_FILENO] { + // SAFETY: fds 0-2 are always valid in a post-fork context + let borrowed = unsafe { BorrowedFd::borrow_raw(fd) }; + if let Ok(flags) = fcntl(borrowed, FcntlArg::F_GETFD) { + let mut fd_flags = FdFlag::from_bits_retain(flags); + if fd_flags.contains(FdFlag::FD_CLOEXEC) { + fd_flags.remove(FdFlag::FD_CLOEXEC); + let _ = fcntl(borrowed, FcntlArg::F_SETFD(fd_flags)); } - - tracing::debug!( - "spawn finished, path_reads: {}, path_writes: {}, exit_status: {}", - path_reads.len(), - path_writes.len(), - termination.status, - ); - - Ok(SpawnResult { exit_status: termination.status, duration }) - } - ChildWait::Tokio(mut child) => { - let exit_status = tokio::select! { - status = child.wait() => status?, - () = fast_fail_token.cancelled() => { - child.start_kill()?; - child.wait().await? - } - }; - Ok(SpawnResult { exit_status, duration: start.elapsed() }) } } -} - -#[cfg(test)] -mod tests { - #[cfg(windows)] - use super::*; - - #[cfg(windows)] - #[test] - fn malformed_windows_drive_path_after_workspace_strip_is_ignored() { - #[expect( - clippy::disallowed_types, - reason = "normalize_tracked_workspace_path requires std::path::Path for fspy strip_path_prefix output" - )] - let relative_path = - normalize_tracked_workspace_path(std::path::Path::new(r"foo\C:\bar"), &[]); - assert!(relative_path.is_none()); - } + Ok(()) } diff --git a/crates/vite_task/src/session/execute/tracked_accesses.rs b/crates/vite_task/src/session/execute/tracked_accesses.rs new file mode 100644 index 00000000..79c77f10 --- /dev/null +++ b/crates/vite_task/src/session/execute/tracked_accesses.rs @@ -0,0 +1,122 @@ +//! Normalize raw fspy path accesses into workspace-relative, filtered form. + +use std::collections::hash_map::Entry; + +use fspy::{AccessMode, PathAccessIterable}; +use rustc_hash::FxHashSet; +use vite_path::{AbsolutePath, RelativePathBuf}; + +use crate::collections::HashMap; + +/// Path read access info +#[derive(Debug, Clone, Copy)] +pub struct PathRead { + pub read_dir_entries: bool, +} + +/// Tracked file accesses from fspy, normalized to workspace-relative paths. +#[derive(Default, Debug)] +pub struct TrackedPathAccesses { + /// Tracked path reads + pub path_reads: HashMap, + + /// Tracked path writes + pub path_writes: FxHashSet, +} + +impl TrackedPathAccesses { + /// Build from fspy's raw iterable by stripping the workspace prefix, + /// normalizing `..` components, and filtering against the negative globs. + pub fn from_raw( + raw: &PathAccessIterable, + workspace_root: &AbsolutePath, + resolved_negatives: &[wax::Glob<'static>], + ) -> Self { + let mut accesses = Self::default(); + for access in raw.iter() { + // Strip workspace root, clean `..` components, and filter in one pass. + // fspy may report paths like `packages/sub-pkg/../shared/dist/output.js`. + let relative_path = access.path.strip_path_prefix(workspace_root, |strip_result| { + let Ok(stripped_path) = strip_result else { + return None; + }; + normalize_tracked_workspace_path(stripped_path, resolved_negatives) + }); + + let Some(relative_path) = relative_path else { + continue; + }; + + if access.mode.contains(AccessMode::READ) { + accesses + .path_reads + .entry(relative_path.clone()) + .or_insert(PathRead { read_dir_entries: false }); + } + if access.mode.contains(AccessMode::WRITE) { + accesses.path_writes.insert(relative_path.clone()); + } + if access.mode.contains(AccessMode::READ_DIR) { + match accesses.path_reads.entry(relative_path) { + Entry::Occupied(mut occupied) => { + occupied.get_mut().read_dir_entries = true; + } + Entry::Vacant(vacant) => { + vacant.insert(PathRead { read_dir_entries: true }); + } + } + } + } + accesses + } +} + +#[expect( + clippy::disallowed_types, + reason = "fspy strip_path_prefix exposes std::path::Path; convert to RelativePathBuf immediately" +)] +fn normalize_tracked_workspace_path( + stripped_path: &std::path::Path, + resolved_negatives: &[wax::Glob<'static>], +) -> Option { + // On Windows, paths are possible to be still absolute after stripping the workspace root. + // For example: c:\workspace\subdir\c:\workspace\subdir + // Just ignore those accesses. + let relative = RelativePathBuf::new(stripped_path).ok()?; + + // Clean `..` components — fspy may report paths like + // `packages/sub-pkg/../shared/dist/output.js`. Normalize them for + // consistent behavior across platforms and clean user-facing messages. + let relative = relative.clean().ok()?; + + // Skip .git directory accesses (workaround for tools like oxlint) + if relative.as_path().strip_prefix(".git").is_ok() { + return None; + } + + if !resolved_negatives.is_empty() + && resolved_negatives.iter().any(|neg| wax::Program::is_match(neg, relative.as_str())) + { + return None; + } + + Some(relative) +} + +#[cfg(test)] +mod tests { + #[cfg(windows)] + use super::*; + + #[cfg(windows)] + #[test] + fn malformed_windows_drive_path_after_workspace_strip_is_ignored() { + #[expect( + clippy::disallowed_types, + reason = "normalize_tracked_workspace_path requires std::path::Path for fspy strip_path_prefix output" + )] + let relative_path = + normalize_tracked_workspace_path(std::path::Path::new(r"foo\C:\bar"), &[]); + assert!(relative_path.is_none()); + } +} diff --git a/crates/vite_task/src/session/execute/win_job.rs b/crates/vite_task/src/session/execute/win_job.rs new file mode 100644 index 00000000..659203b1 --- /dev/null +++ b/crates/vite_task/src/session/execute/win_job.rs @@ -0,0 +1,84 @@ +//! Win32 Job Object utilities for process tree management. +//! +//! On Windows, `TerminateProcess` only kills the direct child process, not its +//! descendants. This module creates a Job Object with `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE`, +//! which automatically terminates all processes in the job when the handle is dropped. + +use std::{io, os::windows::io::RawHandle}; + +use winapi::{ + shared::minwindef::FALSE, + um::{ + handleapi::CloseHandle, + jobapi2::{ + AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject, TerminateJobObject, + }, + winnt::{HANDLE, JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, JOBOBJECT_EXTENDED_LIMIT_INFORMATION}, + }, +}; + +/// RAII wrapper around a Win32 Job Object `HANDLE` that closes it on drop. +pub(super) struct OwnedJobHandle(HANDLE); + +impl OwnedJobHandle { + /// Immediately terminate all processes in the job. + /// + /// This is needed when pipes to a grandchild process must be closed before + /// the job handle is dropped (e.g., to unblock pipe reads in `spawn`). + pub(super) fn terminate(&self) { + // SAFETY: self.0 is a valid job handle from CreateJobObjectW. + unsafe { TerminateJobObject(self.0, 1) }; + } +} + +impl Drop for OwnedJobHandle { + fn drop(&mut self) { + // SAFETY: self.0 is a valid handle obtained from CreateJobObjectW. + unsafe { CloseHandle(self.0) }; + } +} + +/// Create a Job Object with `KILL_ON_JOB_CLOSE` and assign a process to it. +/// +/// Returns the job handle wrapped in an RAII guard. When dropped, all processes +/// in the job (the child and its descendants) are terminated. +pub(super) fn assign_to_kill_on_close_job(process_handle: RawHandle) -> io::Result { + // SAFETY: Creating an anonymous job object with no security attributes. + let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) }; + if job.is_null() { + return Err(io::Error::last_os_error()); + } + let job = OwnedJobHandle(job); + + // Configure the job to kill all processes when the handle is closed. + // SAFETY: JOBOBJECT_EXTENDED_LIMIT_INFORMATION is a plain C struct (no pointers + // in the zeroed fields). Zeroing then setting LimitFlags is the standard pattern. + let mut info = unsafe { + let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = std::mem::zeroed(); + info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; + info + }; + + // SAFETY: info is a valid JOBOBJECT_EXTENDED_LIMIT_INFORMATION, job.0 is a valid handle. + let ok = unsafe { + SetInformationJobObject( + job.0, + // JobObjectExtendedLimitInformation = 9 + 9, + std::ptr::from_mut(&mut info).cast(), + std::mem::size_of::().try_into().unwrap(), + ) + }; + if ok == FALSE { + return Err(io::Error::last_os_error()); + } + + // SAFETY: Both handles are valid — job from CreateJobObjectW, process handle + // from the caller. + let ok = unsafe { AssignProcessToJobObject(job.0, process_handle as HANDLE) }; + if ok == FALSE { + return Err(io::Error::last_os_error()); + } + + Ok(job) +} From cb552508a0c2b0493ee7957e75bbfeb1d6e63942 Mon Sep 17 00:00:00 2001 From: branchseer Date: Fri, 17 Apr 2026 15:15:27 +0800 Subject: [PATCH 02/10] refactor(execute): group pipe-mode state into a `Pipe` enum Compute `spawn_stdio` directly from the cache/suggestion condition instead of going through an intermediate `use_piped: bool`. Bundle the piped-mode state (reporter writers + optional cache capture buffer) into a local `Pipe` enum so related state lives in one variant; the `Inherited` arm drops `stdio_config` eagerly as before. Also derive Copy/Clone/PartialEq/Eq/Debug on `SpawnStdio` so callers can match on it and still pass it by value to `spawn()`. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/vite_task/src/session/execute/mod.rs | 114 +++++++++++------- crates/vite_task/src/session/execute/spawn.rs | 1 + 2 files changed, 72 insertions(+), 43 deletions(-) diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index f0c96f81..c2574042 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -35,7 +35,7 @@ use super::{ }, reporter::{ ExitStatus, GraphExecutionReporter, GraphExecutionReporterBuilder, LeafExecutionReporter, - StdioSuggestion, + StdioConfig, StdioSuggestion, }, }; use crate::{Session, collections::HashMap}; @@ -251,6 +251,21 @@ impl ExecutionContext<'_> { } } +/// Stdio-mode-specific state for a leaf execution. +/// +/// `Piped` owns the reporter's writers plus an optional capture buffer used for +/// cache replay; `Inherited` owns nothing (the reporter's `StdioConfig` has +/// already been dropped so we don't hold `std::io::Stdout` while the child +/// writes to the same FD). +enum Pipe { + Piped { + stdio_config: StdioConfig, + /// `Some` when caching is on — chunks are appended for cache replay. + capture: Option>, + }, + Inherited, +} + /// Execute a spawned process with cache-aware lifecycle. /// /// This is a free function (not tied to `ExecutionContext`) so it can be reused @@ -260,8 +275,8 @@ impl ExecutionContext<'_> { /// 1. Cache lookup (determines cache status) /// 2. `leaf_reporter.start(cache_status)` → `StdioConfig` /// 3. If cache hit: replay cached outputs via `StdioConfig` writers → finish -/// 4. If `Inherited` suggestion AND caching disabled: `spawn_inherited()` → finish -/// 5. Else (piped): `spawn_with_tracking()` with writers → cache update → finish +/// 4. Otherwise: `spawn()` with the chosen stdio mode, optionally `pipe_stdio()` +/// to drain, then `child.wait` → cache update → finish /// /// Errors (cache lookup failure, spawn failure, cache update failure) are reported /// through `leaf_reporter.finish()` and do not abort the caller. @@ -355,12 +370,17 @@ pub async fn execute_spawn( return SpawnOutcome::CacheHit; } - // 4. Decide fspy and stdio mode independently. + // 4. Decide fspy and stdio mode. // - fspy tracking runs only when caching is on and `includes_auto` is set. // - Piped stdio is forced whenever caching is on (we need to capture output // for replay); otherwise we honor the reporter's suggestion. let fspy_enabled = cache_metadata.is_some_and(|m| m.input_config.includes_auto); - let use_piped = cache_metadata.is_some() || stdio_config.suggestion == StdioSuggestion::Piped; + let spawn_stdio = + if cache_metadata.is_some() || stdio_config.suggestion == StdioSuggestion::Piped { + SpawnStdio::Piped + } else { + SpawnStdio::Inherited + }; // Build negative globs for fspy path filtering (already workspace-root-relative). let resolved_negatives: Vec> = if let Some(cache_metadata) = cache_metadata { @@ -385,22 +405,25 @@ pub async fn execute_spawn( Vec::new() }; - // Capture stdout/stderr when caching is enabled (for cache replay). - let mut std_outputs: Option> = cache_metadata.map(|_| Vec::new()); - - // In inherited mode we must release the StdioConfig writers before spawning - // so we don't hold `std::io::Stdout` while the child writes to the same FD. - // `bool::then(|| stdio_config)` moves stdio_config into the closure; when - // `use_piped` is false, `then` returns `None` and the closure (and its - // captured stdio_config) is dropped here. - let stdio_config = use_piped.then_some(stdio_config); + // Bundle stdio-mode-specific state. `Pipe::Piped` owns the writers plus an + // optional capture buffer (Some when caching is on). In the inherited arm + // we drop `stdio_config` right here, so we don't hold `std::io::Stdout` + // while the child writes to the same FD. + let pipe = match spawn_stdio { + SpawnStdio::Piped => { + Pipe::Piped { stdio_config, capture: cache_metadata.map(|_| Vec::new()) } + } + SpawnStdio::Inherited => { + drop(stdio_config); + Pipe::Inherited + } + }; // Measure end-to-end duration here — spawn() no longer tracks time. let start = Instant::now(); // 5. Spawn. Returns pipes (Piped) or `None` (Inherited) plus a // cancellation-aware wait future. - let spawn_stdio = if use_piped { SpawnStdio::Piped } else { SpawnStdio::Inherited }; let mut child = match spawn( &spawn_execution.spawn_command, fspy_enabled, @@ -420,35 +443,40 @@ pub async fn execute_spawn( } }; - // 6. If piped, drain stdout/stderr concurrently, capturing for cache replay. - if let Some(mut stdio_config) = stdio_config { - let stdout = child.stdout.take().expect("SpawnStdio::Piped yields a stdout pipe"); - let stderr = child.stderr.take().expect("SpawnStdio::Piped yields a stderr pipe"); - #[expect( - clippy::large_futures, - reason = "pipe_stdio streams child I/O and creates a large future" - )] - let pipe_result = pipe_stdio( - stdout, - stderr, - &mut *stdio_config.stdout_writer, - &mut *stdio_config.stderr_writer, - std_outputs.as_mut(), - fast_fail_token.clone(), - ) - .await; - if let Err(err) = pipe_result { - // Cancel so `child.wait` kills the child instead of orphaning it. - fast_fail_token.cancel(); - let _ = child.wait.await; - leaf_reporter.finish( - None, - CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), - Some(ExecutionError::Spawn(err.into())), - ); - return SpawnOutcome::Failed; + // 6. If piped, drain stdout/stderr; capture chunks flow to `std_outputs` + // for the cache update below. + let std_outputs: Option> = match pipe { + Pipe::Piped { mut stdio_config, mut capture } => { + let stdout = child.stdout.take().expect("SpawnStdio::Piped yields a stdout pipe"); + let stderr = child.stderr.take().expect("SpawnStdio::Piped yields a stderr pipe"); + #[expect( + clippy::large_futures, + reason = "pipe_stdio streams child I/O and creates a large future" + )] + let pipe_result = pipe_stdio( + stdout, + stderr, + &mut *stdio_config.stdout_writer, + &mut *stdio_config.stderr_writer, + capture.as_mut(), + fast_fail_token.clone(), + ) + .await; + if let Err(err) = pipe_result { + // Cancel so `child.wait` kills the child instead of orphaning it. + fast_fail_token.cancel(); + let _ = child.wait.await; + leaf_reporter.finish( + None, + CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), + Some(ExecutionError::Spawn(err.into())), + ); + return SpawnOutcome::Failed; + } + capture } - } + Pipe::Inherited => None, + }; // 7. Wait for exit (handles cancellation internally). let outcome = match child.wait.await { diff --git a/crates/vite_task/src/session/execute/spawn.rs b/crates/vite_task/src/session/execute/spawn.rs index 537cea5b..de2ba087 100644 --- a/crates/vite_task/src/session/execute/spawn.rs +++ b/crates/vite_task/src/session/execute/spawn.rs @@ -13,6 +13,7 @@ use tokio_util::sync::CancellationToken; use vite_task_plan::SpawnCommand; /// How the child's stdin/stdout/stderr are configured. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum SpawnStdio { /// All three fds inherited from the parent. On Unix, [`spawn`] also clears /// `FD_CLOEXEC` on fds 0-2 (libuv workaround — From 5b5d9354ab18335e43616fc83da6ce56a475b38c Mon Sep 17 00:00:00 2001 From: branchseer Date: Fri, 17 Apr 2026 15:27:07 +0800 Subject: [PATCH 03/10] fix(execute): use LocalBoxFuture for `ChildHandle::wait` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The wait future captures `OwnedJobHandle` (a Win32 `HANDLE`, `*mut c_void`) on Windows, which isn't `Send`. `BoxFuture` demands `Send`, breaking the Windows build. The future is only ever awaited on a single thread (`execute_graph` doesn't `tokio::spawn`), so `LocalBoxFuture` is the correct shape — and avoids an `unsafe impl Send` on `OwnedJobHandle`. Co-Authored-By: Claude Opus 4.7 (1M context) --- .claude/scheduled_tasks.lock | 1 + crates/vite_task/src/session/execute/spawn.rs | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) create mode 100644 .claude/scheduled_tasks.lock diff --git a/.claude/scheduled_tasks.lock b/.claude/scheduled_tasks.lock new file mode 100644 index 00000000..5d951163 --- /dev/null +++ b/.claude/scheduled_tasks.lock @@ -0,0 +1 @@ +{"sessionId":"50658d05-2bd8-4720-b814-e3efee67b0bc","pid":87173,"acquiredAt":1776410487863} \ No newline at end of file diff --git a/crates/vite_task/src/session/execute/spawn.rs b/crates/vite_task/src/session/execute/spawn.rs index de2ba087..0f29926b 100644 --- a/crates/vite_task/src/session/execute/spawn.rs +++ b/crates/vite_task/src/session/execute/spawn.rs @@ -7,7 +7,7 @@ use std::{io, process::Stdio}; use fspy::PathAccessIterable; -use futures_util::{FutureExt, future::BoxFuture}; +use futures_util::{FutureExt, future::LocalBoxFuture}; use tokio::process::{ChildStderr, ChildStdout}; use tokio_util::sync::CancellationToken; use vite_task_plan::SpawnCommand; @@ -33,7 +33,7 @@ pub enum SpawnStdio { pub struct ChildHandle { pub stdout: Option, pub stderr: Option, - pub wait: BoxFuture<'static, io::Result>, + pub wait: LocalBoxFuture<'static, io::Result>, } /// Result of waiting for a child to exit. @@ -114,7 +114,7 @@ async fn spawn_fspy( path_accesses: Some(termination.path_accesses), }) } - .boxed(); + .boxed_local(); Ok(ChildHandle { stdout, stderr, wait }) } @@ -154,7 +154,7 @@ fn spawn_tokio( drop(job); Ok(ChildOutcome { exit_status, path_accesses: None }) } - .boxed(); + .boxed_local(); Ok(ChildHandle { stdout, stderr, wait }) } From f39b44baa7d32e1bc907f92785ba3b9c13f04f88 Mon Sep 17 00:00:00 2001 From: branchseer Date: Fri, 17 Apr 2026 15:27:29 +0800 Subject: [PATCH 04/10] chore: drop accidentally committed `.claude/scheduled_tasks.lock` Co-Authored-By: Claude Opus 4.7 (1M context) --- .claude/scheduled_tasks.lock | 1 - 1 file changed, 1 deletion(-) delete mode 100644 .claude/scheduled_tasks.lock diff --git a/.claude/scheduled_tasks.lock b/.claude/scheduled_tasks.lock deleted file mode 100644 index 5d951163..00000000 --- a/.claude/scheduled_tasks.lock +++ /dev/null @@ -1 +0,0 @@ -{"sessionId":"50658d05-2bd8-4720-b814-e3efee67b0bc","pid":87173,"acquiredAt":1776410487863} \ No newline at end of file From 0d7790773342572e2312bca3c6136505fb7b0c30 Mon Sep 17 00:00:00 2001 From: branchseer Date: Fri, 17 Apr 2026 15:45:19 +0800 Subject: [PATCH 05/10] refactor(execute): represent all valid states with one `ExecutionMode` enum MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the scattered `fspy_enabled: bool`, `spawn_stdio`, `resolved_negatives`, `std_outputs: Option<_>`, and the 2-variant `Pipe` enum with a single `ExecutionMode<'a>` whose shape statically captures the four valid runtime configurations: - Cache enabled, fspy enabled, piped (forced) - Cache enabled, fspy disabled, piped (forced) - Cache disabled, piped (reporter suggestion) - Cache disabled, inherited (reporter suggestion) Invariants encoded in the type: - fspy lives inside `Cached` only (fspy requires `includes_auto`). - `Cached` always owns its `StdioConfig` (caching forces piped capture). - `Uncached { stdio_config: None }` means inherited — the reporter's config was dropped during mode construction to release `std::io::Stdout`. `(spawn_stdio, fspy_enabled)` is now derived from `&mode` via a single inline match at the `spawn()` call site (the pair is never needed apart). After `spawn()` the mode is consumed to drain pipes and extract the cached-only state into a `CacheState` that feeds the cache-update phase. Incidentally, negative globs are now compiled only when fspy is actually enabled (previously compiled for any cache-enabled task, though only used for fspy normalization). Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/vite_task/src/session/execute/mod.rs | 215 +++++++++++++------- 1 file changed, 137 insertions(+), 78 deletions(-) diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index c2574042..bf4ce580 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -14,10 +14,10 @@ use petgraph::Direction; use rustc_hash::FxHashMap; use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; -use vite_path::AbsolutePath; +use vite_path::{AbsolutePath, RelativePathBuf}; use vite_task_plan::{ ExecutionGraph, ExecutionItemDisplay, ExecutionItemKind, LeafExecutionKind, SpawnExecution, - execution_graph::ExecutionNodeIndex, + cache_metadata::CacheMetadata, execution_graph::ExecutionNodeIndex, }; use self::{ @@ -251,19 +251,42 @@ impl ExecutionContext<'_> { } } -/// Stdio-mode-specific state for a leaf execution. +/// All valid runtime configurations for a leaf execution, after the cache-hit +/// early-return has been ruled out. /// -/// `Piped` owns the reporter's writers plus an optional capture buffer used for -/// cache replay; `Inherited` owns nothing (the reporter's `StdioConfig` has -/// already been dropped so we don't hold `std::io::Stdout` while the child -/// writes to the same FD). -enum Pipe { - Piped { +/// The type shape enforces two invariants statically: +/// - fspy tracking only exists inside [`ExecutionMode::Cached`] (fspy requires +/// `includes_auto`, which only lives on cache metadata). +/// - Cached execution always keeps its `StdioConfig` (piped stdio is forced so +/// that output can be captured for replay). +enum ExecutionMode<'a> { + Cached { + metadata: &'a CacheMetadata, + globbed_inputs: BTreeMap, stdio_config: StdioConfig, - /// `Some` when caching is on — chunks are appended for cache replay. - capture: Option>, + /// Captured stdout/stderr chunks flow here during drain for cache replay. + std_outputs: Vec, + /// `Some` iff fspy is enabled (`includes_auto`). Holds the resolved + /// negative globs used by [`TrackedPathAccesses::from_raw`] to filter + /// tracked accesses. `None` means fspy tracking is off for this task. + fspy: Option>>, }, - Inherited, + Uncached { + /// `Some` iff the reporter suggested piped stdio. `None` means the + /// child inherits stdin/stdout/stderr from the parent; the original + /// `StdioConfig` was already dropped so we don't hold `std::io::Stdout` + /// while the child writes to the same FD. + stdio_config: Option, + }, +} + +/// Cached-only state extracted from [`ExecutionMode::Cached`] after the drain +/// phase, carried into the cache-update phase. +struct CacheState<'a> { + metadata: &'a CacheMetadata, + globbed_inputs: BTreeMap, + std_outputs: Vec, + fspy_negatives: Option>>, } /// Execute a spawned process with cache-aware lifecycle. @@ -370,59 +393,60 @@ pub async fn execute_spawn( return SpawnOutcome::CacheHit; } - // 4. Decide fspy and stdio mode. - // - fspy tracking runs only when caching is on and `includes_auto` is set. - // - Piped stdio is forced whenever caching is on (we need to capture output - // for replay); otherwise we honor the reporter's suggestion. - let fspy_enabled = cache_metadata.is_some_and(|m| m.input_config.includes_auto); - let spawn_stdio = - if cache_metadata.is_some() || stdio_config.suggestion == StdioSuggestion::Piped { - SpawnStdio::Piped - } else { - SpawnStdio::Inherited - }; - - // Build negative globs for fspy path filtering (already workspace-root-relative). - let resolved_negatives: Vec> = if let Some(cache_metadata) = cache_metadata { - match cache_metadata - .input_config - .negative_globs - .iter() - .map(|p| Ok(wax::Glob::new(p.as_str())?.into_owned())) - .collect::>>() - { - Ok(negs) => negs, - Err(err) => { - leaf_reporter.finish( - None, - CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), - Some(ExecutionError::PostRunFingerprint(err)), - ); - return SpawnOutcome::Failed; + // 4. Build the execution mode. This folds the cache/fspy/stdio decisions + // and their associated state into a single value whose shape encodes + // the valid combinations. The inherited arm drops `stdio_config` here so + // we don't hold `std::io::Stdout` while the child writes to the same FD. + let mode: ExecutionMode<'_> = match cache_metadata { + Some(metadata) => { + let fspy = if metadata.input_config.includes_auto { + // Resolve negative globs for fspy path filtering + // (already workspace-root-relative). + match metadata + .input_config + .negative_globs + .iter() + .map(|p| Ok(wax::Glob::new(p.as_str())?.into_owned())) + .collect::>>() + { + Ok(negs) => Some(negs), + Err(err) => { + leaf_reporter.finish( + None, + CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), + Some(ExecutionError::PostRunFingerprint(err)), + ); + return SpawnOutcome::Failed; + } + } + } else { + None + }; + ExecutionMode::Cached { + metadata, + globbed_inputs, + stdio_config, + std_outputs: Vec::new(), + fspy, } } - } else { - Vec::new() + None => ExecutionMode::Uncached { + stdio_config: (stdio_config.suggestion == StdioSuggestion::Piped) + .then_some(stdio_config), + }, }; - // Bundle stdio-mode-specific state. `Pipe::Piped` owns the writers plus an - // optional capture buffer (Some when caching is on). In the inherited arm - // we drop `stdio_config` right here, so we don't hold `std::io::Stdout` - // while the child writes to the same FD. - let pipe = match spawn_stdio { - SpawnStdio::Piped => { - Pipe::Piped { stdio_config, capture: cache_metadata.map(|_| Vec::new()) } - } - SpawnStdio::Inherited => { - drop(stdio_config); - Pipe::Inherited - } + // 5. Derive the arguments for `spawn()` from the mode without consuming it. + let (spawn_stdio, fspy_enabled) = match &mode { + ExecutionMode::Cached { fspy, .. } => (SpawnStdio::Piped, fspy.is_some()), + ExecutionMode::Uncached { stdio_config: Some(_) } => (SpawnStdio::Piped, false), + ExecutionMode::Uncached { stdio_config: None } => (SpawnStdio::Inherited, false), }; // Measure end-to-end duration here — spawn() no longer tracks time. let start = Instant::now(); - // 5. Spawn. Returns pipes (Piped) or `None` (Inherited) plus a + // 6. Spawn. Returns pipes (Piped) or `None` (Inherited) plus a // cancellation-aware wait future. let mut child = match spawn( &spawn_execution.spawn_command, @@ -443,10 +467,16 @@ pub async fn execute_spawn( } }; - // 6. If piped, drain stdout/stderr; capture chunks flow to `std_outputs` - // for the cache update below. - let std_outputs: Option> = match pipe { - Pipe::Piped { mut stdio_config, mut capture } => { + // 7. Consume `mode`: drain pipes (if piped), and for `Cached` keep the + // state we'll need for the cache update. + let cache_state: Option> = match mode { + ExecutionMode::Cached { + metadata, + globbed_inputs, + mut stdio_config, + mut std_outputs, + fspy, + } => { let stdout = child.stdout.take().expect("SpawnStdio::Piped yields a stdout pipe"); let stderr = child.stderr.take().expect("SpawnStdio::Piped yields a stderr pipe"); #[expect( @@ -458,7 +488,7 @@ pub async fn execute_spawn( stderr, &mut *stdio_config.stdout_writer, &mut *stdio_config.stderr_writer, - capture.as_mut(), + Some(&mut std_outputs), fast_fail_token.clone(), ) .await; @@ -473,12 +503,40 @@ pub async fn execute_spawn( ); return SpawnOutcome::Failed; } - capture + Some(CacheState { metadata, globbed_inputs, std_outputs, fspy_negatives: fspy }) } - Pipe::Inherited => None, + ExecutionMode::Uncached { stdio_config: Some(mut stdio_config) } => { + let stdout = child.stdout.take().expect("SpawnStdio::Piped yields a stdout pipe"); + let stderr = child.stderr.take().expect("SpawnStdio::Piped yields a stderr pipe"); + #[expect( + clippy::large_futures, + reason = "pipe_stdio streams child I/O and creates a large future" + )] + let pipe_result = pipe_stdio( + stdout, + stderr, + &mut *stdio_config.stdout_writer, + &mut *stdio_config.stderr_writer, + None, + fast_fail_token.clone(), + ) + .await; + if let Err(err) = pipe_result { + fast_fail_token.cancel(); + let _ = child.wait.await; + leaf_reporter.finish( + None, + CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), + Some(ExecutionError::Spawn(err.into())), + ); + return SpawnOutcome::Failed; + } + None + } + ExecutionMode::Uncached { stdio_config: None } => None, }; - // 7. Wait for exit (handles cancellation internally). + // 8. Wait for exit (handles cancellation internally). let outcome = match child.wait.await { Ok(outcome) => outcome, Err(err) => { @@ -492,18 +550,19 @@ pub async fn execute_spawn( }; let duration = start.elapsed(); - // Normalize fspy accesses (workspace-relative, negative-glob filtered). - let path_accesses = outcome - .path_accesses - .as_ref() - .map(|raw| TrackedPathAccesses::from_raw(raw, cache_base_path, &resolved_negatives)); - let cache_metadata_and_inputs = cache_metadata.map(|cm| (cm, globbed_inputs)); - - // 6. Update cache if successful and determine cache update status. - // Errors during cache update are terminal (reported through finish). - let (cache_update_status, cache_error) = if let Some((cache_metadata, globbed_inputs)) = - cache_metadata_and_inputs - { + // 9. Cache update (only when we were in `Cached` mode). Errors during cache + // update are reported but do not affect the exit status we return. + let (cache_update_status, cache_error) = if let Some(state) = cache_state { + let CacheState { metadata, globbed_inputs, std_outputs, fspy_negatives } = state; + + // Normalize fspy accesses. `zip` gives `Some` iff fspy was enabled + // (both outcome.path_accesses and fspy_negatives are Some together). + let path_accesses = outcome + .path_accesses + .as_ref() + .zip(fspy_negatives.as_deref()) + .map(|(raw, negs)| TrackedPathAccesses::from_raw(raw, cache_base_path, negs)); + let cancelled = fast_fail_token.is_cancelled() || interrupt_token.is_cancelled(); if cancelled { // Cancelled (Ctrl-C or sibling failure) — result is untrustworthy @@ -538,11 +597,11 @@ pub async fn execute_spawn( Ok(post_run_fingerprint) => { let new_cache_value = CacheEntryValue { post_run_fingerprint, - std_outputs: std_outputs.unwrap_or_default().into(), + std_outputs: std_outputs.into(), duration, globbed_inputs, }; - match cache.update(cache_metadata, new_cache_value).await { + match cache.update(metadata, new_cache_value).await { Ok(()) => (CacheUpdateStatus::Updated, None), Err(err) => ( CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), From 411886b23807652be72333732ddddab25238f6b5 Mon Sep 17 00:00:00 2001 From: branchseer Date: Fri, 17 Apr 2026 15:49:25 +0800 Subject: [PATCH 06/10] refactor(execute): nest `CacheState` inside `ExecutionMode::Cached` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `CacheState` duplicated four of the five fields of the old `ExecutionMode::Cached` variant — the only field that belonged solely to the mode was `stdio_config` (consumed during drain). Make `CacheState` a single nested field so the variant is `Cached { stdio_config, state }` and the drain match can just return `state` unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/vite_task/src/session/execute/mod.rs | 44 ++++++++++----------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index bf4ce580..5d7c9a65 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -261,15 +261,10 @@ impl ExecutionContext<'_> { /// that output can be captured for replay). enum ExecutionMode<'a> { Cached { - metadata: &'a CacheMetadata, - globbed_inputs: BTreeMap, + /// Consumed during drain; not needed for the cache update. stdio_config: StdioConfig, - /// Captured stdout/stderr chunks flow here during drain for cache replay. - std_outputs: Vec, - /// `Some` iff fspy is enabled (`includes_auto`). Holds the resolved - /// negative globs used by [`TrackedPathAccesses::from_raw`] to filter - /// tracked accesses. `None` means fspy tracking is off for this task. - fspy: Option>>, + /// Carried through drain into the cache-update phase. + state: CacheState<'a>, }, Uncached { /// `Some` iff the reporter suggested piped stdio. `None` means the @@ -280,12 +275,17 @@ enum ExecutionMode<'a> { }, } -/// Cached-only state extracted from [`ExecutionMode::Cached`] after the drain -/// phase, carried into the cache-update phase. +/// Cached-only state carried from mode construction through drain into the +/// cache-update phase. `std_outputs` is written during drain; the other fields +/// are read during cache update. struct CacheState<'a> { metadata: &'a CacheMetadata, globbed_inputs: BTreeMap, + /// Captured stdout/stderr chunks flow here during drain for cache replay. std_outputs: Vec, + /// `Some` iff fspy is enabled (`includes_auto`). Holds the resolved + /// negative globs used by [`TrackedPathAccesses::from_raw`] to filter + /// tracked accesses. `None` means fspy tracking is off for this task. fspy_negatives: Option>>, } @@ -423,11 +423,13 @@ pub async fn execute_spawn( None }; ExecutionMode::Cached { - metadata, - globbed_inputs, stdio_config, - std_outputs: Vec::new(), - fspy, + state: CacheState { + metadata, + globbed_inputs, + std_outputs: Vec::new(), + fspy_negatives: fspy, + }, } } None => ExecutionMode::Uncached { @@ -438,7 +440,7 @@ pub async fn execute_spawn( // 5. Derive the arguments for `spawn()` from the mode without consuming it. let (spawn_stdio, fspy_enabled) = match &mode { - ExecutionMode::Cached { fspy, .. } => (SpawnStdio::Piped, fspy.is_some()), + ExecutionMode::Cached { state, .. } => (SpawnStdio::Piped, state.fspy_negatives.is_some()), ExecutionMode::Uncached { stdio_config: Some(_) } => (SpawnStdio::Piped, false), ExecutionMode::Uncached { stdio_config: None } => (SpawnStdio::Inherited, false), }; @@ -470,13 +472,7 @@ pub async fn execute_spawn( // 7. Consume `mode`: drain pipes (if piped), and for `Cached` keep the // state we'll need for the cache update. let cache_state: Option> = match mode { - ExecutionMode::Cached { - metadata, - globbed_inputs, - mut stdio_config, - mut std_outputs, - fspy, - } => { + ExecutionMode::Cached { mut stdio_config, mut state } => { let stdout = child.stdout.take().expect("SpawnStdio::Piped yields a stdout pipe"); let stderr = child.stderr.take().expect("SpawnStdio::Piped yields a stderr pipe"); #[expect( @@ -488,7 +484,7 @@ pub async fn execute_spawn( stderr, &mut *stdio_config.stdout_writer, &mut *stdio_config.stderr_writer, - Some(&mut std_outputs), + Some(&mut state.std_outputs), fast_fail_token.clone(), ) .await; @@ -503,7 +499,7 @@ pub async fn execute_spawn( ); return SpawnOutcome::Failed; } - Some(CacheState { metadata, globbed_inputs, std_outputs, fspy_negatives: fspy }) + Some(state) } ExecutionMode::Uncached { stdio_config: Some(mut stdio_config) } => { let stdout = child.stdout.take().expect("SpawnStdio::Piped yields a stdout pipe"); From a5f3f44299ed3b5bea6664313bc70abc6f44b356 Mon Sep 17 00:00:00 2001 From: branchseer Date: Fri, 17 Apr 2026 15:51:35 +0800 Subject: [PATCH 07/10] refactor(execute): unify drain arms behind a single pipe_stdio call Split `mode` into `(stdio_config, cache_state)` first, then run the drain once. `stdio_config.is_some()` decides whether to drain; `cache_state`'s `std_outputs` supplies the optional capture buffer. Collapses the two near-identical match arms into one. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/vite_task/src/session/execute/mod.rs | 98 ++++++++------------- 1 file changed, 38 insertions(+), 60 deletions(-) diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index 5d7c9a65..6f67d516 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -469,68 +469,46 @@ pub async fn execute_spawn( } }; - // 7. Consume `mode`: drain pipes (if piped), and for `Cached` keep the - // state we'll need for the cache update. - let cache_state: Option> = match mode { - ExecutionMode::Cached { mut stdio_config, mut state } => { - let stdout = child.stdout.take().expect("SpawnStdio::Piped yields a stdout pipe"); - let stderr = child.stderr.take().expect("SpawnStdio::Piped yields a stderr pipe"); - #[expect( - clippy::large_futures, - reason = "pipe_stdio streams child I/O and creates a large future" - )] - let pipe_result = pipe_stdio( - stdout, - stderr, - &mut *stdio_config.stdout_writer, - &mut *stdio_config.stderr_writer, - Some(&mut state.std_outputs), - fast_fail_token.clone(), - ) - .await; - if let Err(err) = pipe_result { - // Cancel so `child.wait` kills the child instead of orphaning it. - fast_fail_token.cancel(); - let _ = child.wait.await; - leaf_reporter.finish( - None, - CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), - Some(ExecutionError::Spawn(err.into())), - ); - return SpawnOutcome::Failed; - } - Some(state) - } - ExecutionMode::Uncached { stdio_config: Some(mut stdio_config) } => { - let stdout = child.stdout.take().expect("SpawnStdio::Piped yields a stdout pipe"); - let stderr = child.stderr.take().expect("SpawnStdio::Piped yields a stderr pipe"); - #[expect( - clippy::large_futures, - reason = "pipe_stdio streams child I/O and creates a large future" - )] - let pipe_result = pipe_stdio( - stdout, - stderr, - &mut *stdio_config.stdout_writer, - &mut *stdio_config.stderr_writer, + // 7. Consume `mode`: split into (stdio_config, cache_state) so the drain + // below can run once regardless of variant. + let (mut stdio_config, mut cache_state): (Option, Option>) = + match mode { + ExecutionMode::Cached { stdio_config, state } => (Some(stdio_config), Some(state)), + ExecutionMode::Uncached { stdio_config } => (stdio_config, None), + }; + + // Drain stdout/stderr when piped; capture into `state.std_outputs` when + // caching is on. (`stdio_config` is `None` only in the inherited-uncached + // case.) + if let Some(stdio_config) = &mut stdio_config { + let stdout = child.stdout.take().expect("SpawnStdio::Piped yields a stdout pipe"); + let stderr = child.stderr.take().expect("SpawnStdio::Piped yields a stderr pipe"); + let capture = cache_state.as_mut().map(|s| &mut s.std_outputs); + #[expect( + clippy::large_futures, + reason = "pipe_stdio streams child I/O and creates a large future" + )] + let pipe_result = pipe_stdio( + stdout, + stderr, + &mut *stdio_config.stdout_writer, + &mut *stdio_config.stderr_writer, + capture, + fast_fail_token.clone(), + ) + .await; + if let Err(err) = pipe_result { + // Cancel so `child.wait` kills the child instead of orphaning it. + fast_fail_token.cancel(); + let _ = child.wait.await; + leaf_reporter.finish( None, - fast_fail_token.clone(), - ) - .await; - if let Err(err) = pipe_result { - fast_fail_token.cancel(); - let _ = child.wait.await; - leaf_reporter.finish( - None, - CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), - Some(ExecutionError::Spawn(err.into())), - ); - return SpawnOutcome::Failed; - } - None + CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), + Some(ExecutionError::Spawn(err.into())), + ); + return SpawnOutcome::Failed; } - ExecutionMode::Uncached { stdio_config: None } => None, - }; + } // 8. Wait for exit (handles cancellation internally). let outcome = match child.wait.await { From 6bd5a642da7442e23d9116fd1e1402e5dcbb1840 Mon Sep 17 00:00:00 2001 From: branchseer Date: Fri, 17 Apr 2026 15:57:57 +0800 Subject: [PATCH 08/10] refactor(execute): bundle drain inputs into `PipeIo` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce `pipe::PipeIo { stdio_config, capture }` and change `pipe_stdio`'s signature to take `&mut PipeIo` instead of two writers plus an `Option<&mut Vec>`. The capture buffer now lives with the writers it feeds, owned by `PipeIo`. `execute_spawn` step 7 builds `(Option, Option)` from the mode — the drain call stays a single site, and the captured outputs are pulled back out of `PipeIo::capture` after drain for cache update. `std_outputs` no longer lives in `CacheState`. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/vite_task/src/session/execute/mod.rs | 61 +++++++++----------- crates/vite_task/src/session/execute/pipe.rs | 37 +++++++----- 2 files changed, 49 insertions(+), 49 deletions(-) diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index 6f67d516..e69413c5 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -23,7 +23,7 @@ use vite_task_plan::{ use self::{ fingerprint::PostRunFingerprint, glob_inputs::compute_globbed_inputs, - pipe::{StdOutput, pipe_stdio}, + pipe::{PipeIo, pipe_stdio}, spawn::{SpawnStdio, spawn}, tracked_accesses::TrackedPathAccesses, }; @@ -275,14 +275,12 @@ enum ExecutionMode<'a> { }, } -/// Cached-only state carried from mode construction through drain into the -/// cache-update phase. `std_outputs` is written during drain; the other fields -/// are read during cache update. +/// Cached-only state carried from mode construction through the cache-update +/// phase. Captured stdout/stderr live in `PipeIo::capture` during drain and +/// are handed to cache update separately. struct CacheState<'a> { metadata: &'a CacheMetadata, globbed_inputs: BTreeMap, - /// Captured stdout/stderr chunks flow here during drain for cache replay. - std_outputs: Vec, /// `Some` iff fspy is enabled (`includes_auto`). Holds the resolved /// negative globs used by [`TrackedPathAccesses::from_raw`] to filter /// tracked accesses. `None` means fspy tracking is off for this task. @@ -424,12 +422,7 @@ pub async fn execute_spawn( }; ExecutionMode::Cached { stdio_config, - state: CacheState { - metadata, - globbed_inputs, - std_outputs: Vec::new(), - fspy_negatives: fspy, - }, + state: CacheState { metadata, globbed_inputs, fspy_negatives: fspy }, } } None => ExecutionMode::Uncached { @@ -469,34 +462,29 @@ pub async fn execute_spawn( } }; - // 7. Consume `mode`: split into (stdio_config, cache_state) so the drain - // below can run once regardless of variant. - let (mut stdio_config, mut cache_state): (Option, Option>) = - match mode { - ExecutionMode::Cached { stdio_config, state } => (Some(stdio_config), Some(state)), - ExecutionMode::Uncached { stdio_config } => (stdio_config, None), - }; + // 7. Consume `mode`: pair the `StdioConfig` with an optional capture + // buffer as a `PipeIo`, and carry `CacheState` separately for the + // cache-update phase. + let (mut pipe_io, cache_state): (Option, Option>) = match mode { + ExecutionMode::Cached { stdio_config, state } => { + (Some(PipeIo { stdio_config, capture: Some(Vec::new()) }), Some(state)) + } + ExecutionMode::Uncached { stdio_config: Some(stdio_config) } => { + (Some(PipeIo { stdio_config, capture: None }), None) + } + ExecutionMode::Uncached { stdio_config: None } => (None, None), + }; - // Drain stdout/stderr when piped; capture into `state.std_outputs` when - // caching is on. (`stdio_config` is `None` only in the inherited-uncached - // case.) - if let Some(stdio_config) = &mut stdio_config { + // Drain stdout/stderr when piped. `pipe_io` is `None` only in the + // inherited-uncached case. + if let Some(pipe_io) = &mut pipe_io { let stdout = child.stdout.take().expect("SpawnStdio::Piped yields a stdout pipe"); let stderr = child.stderr.take().expect("SpawnStdio::Piped yields a stderr pipe"); - let capture = cache_state.as_mut().map(|s| &mut s.std_outputs); #[expect( clippy::large_futures, reason = "pipe_stdio streams child I/O and creates a large future" )] - let pipe_result = pipe_stdio( - stdout, - stderr, - &mut *stdio_config.stdout_writer, - &mut *stdio_config.stderr_writer, - capture, - fast_fail_token.clone(), - ) - .await; + let pipe_result = pipe_stdio(stdout, stderr, pipe_io, fast_fail_token.clone()).await; if let Err(err) = pipe_result { // Cancel so `child.wait` kills the child instead of orphaning it. fast_fail_token.cancel(); @@ -524,10 +512,15 @@ pub async fn execute_spawn( }; let duration = start.elapsed(); + // Capture buffer lives in `PipeIo` during drain; pull it back out now so + // cache update can own the captured stdout/stderr. + let captured_outputs = pipe_io.and_then(|io| io.capture); + // 9. Cache update (only when we were in `Cached` mode). Errors during cache // update are reported but do not affect the exit status we return. let (cache_update_status, cache_error) = if let Some(state) = cache_state { - let CacheState { metadata, globbed_inputs, std_outputs, fspy_negatives } = state; + let CacheState { metadata, globbed_inputs, fspy_negatives } = state; + let std_outputs = captured_outputs.unwrap_or_default(); // Normalize fspy accesses. `zip` gives `Some` iff fspy was enabled // (both outcome.path_accesses and fspy_negatives are Some together). diff --git a/crates/vite_task/src/session/execute/pipe.rs b/crates/vite_task/src/session/execute/pipe.rs index 4cec668d..9dda1816 100644 --- a/crates/vite_task/src/session/execute/pipe.rs +++ b/crates/vite_task/src/session/execute/pipe.rs @@ -1,7 +1,5 @@ //! Drain child stdout/stderr concurrently to writers, with optional capture. -use std::io::Write; - use serde::Serialize; use tokio::{ io::AsyncReadExt as _, @@ -10,6 +8,8 @@ use tokio::{ use tokio_util::sync::CancellationToken; use wincode::{SchemaRead, SchemaWrite}; +use crate::session::reporter::StdioConfig; + /// Output kind for stdout/stderr #[derive(Debug, PartialEq, Eq, Clone, Copy, SchemaWrite, SchemaRead, Serialize)] pub enum OutputKind { @@ -24,11 +24,18 @@ pub struct StdOutput { pub content: Vec, } -/// Drain the child's stdout/stderr concurrently. +/// Inputs for [`pipe_stdio`]: the reporter's writers plus an optional capture +/// buffer. When `capture` is `Some`, chunks are appended there for cache replay. +pub struct PipeIo { + pub stdio_config: StdioConfig, + pub capture: Option>, +} + +/// Drain the child's stdout/stderr concurrently into `io`. /// -/// Bytes are written through to `stdout_writer` / `stderr_writer` in real time -/// and, when `capture` is provided, also appended with adjacent same-kind chunks -/// coalesced (for cache replay). +/// Bytes are written through `io.stdio_config`'s writers in real time and, +/// when `io.capture` is `Some`, also appended (with adjacent same-kind chunks +/// coalesced) for cache replay. /// /// On cancellation: returns `Ok(())` without killing the child — the caller /// drives the child's cancellation-aware `wait` future next, which observes the @@ -38,11 +45,11 @@ pub struct StdOutput { pub async fn pipe_stdio( mut stdout: ChildStdout, mut stderr: ChildStderr, - stdout_writer: &mut dyn Write, - stderr_writer: &mut dyn Write, - mut capture: Option<&mut Vec>, + io: &mut PipeIo, cancellation_token: CancellationToken, ) -> std::io::Result<()> { + use std::io::Write as _; + let mut stdout_buf = [0u8; 8192]; let mut stderr_buf = [0u8; 8192]; let mut stdout_done = false; @@ -58,9 +65,9 @@ pub async fn pipe_stdio( 0 => stdout_done = true, n => { let bytes = &stdout_buf[..n]; - stdout_writer.write_all(bytes)?; - stdout_writer.flush()?; - if let Some(capture) = capture.as_deref_mut() { + io.stdio_config.stdout_writer.write_all(bytes)?; + io.stdio_config.stdout_writer.flush()?; + if let Some(capture) = io.capture.as_mut() { append_output_chunk(capture, OutputKind::StdOut, bytes); } } @@ -71,9 +78,9 @@ pub async fn pipe_stdio( 0 => stderr_done = true, n => { let bytes = &stderr_buf[..n]; - stderr_writer.write_all(bytes)?; - stderr_writer.flush()?; - if let Some(capture) = capture.as_deref_mut() { + io.stdio_config.stderr_writer.write_all(bytes)?; + io.stdio_config.stderr_writer.flush()?; + if let Some(capture) = io.capture.as_mut() { append_output_chunk(capture, OutputKind::StdErr, bytes); } } From 5914218ce184a18223278ce67c98e4078f9e3269 Mon Sep 17 00:00:00 2001 From: branchseer Date: Fri, 17 Apr 2026 16:02:45 +0800 Subject: [PATCH 09/10] refactor(execute): make `CacheState::std_outputs` non-optional MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `std_outputs` belongs with the rest of the cached state that feeds the cache update. Add it back to `CacheState` as a non-optional `Vec`, initialize it empty at mode construction, and move the captured buffer out of `PipeIo::capture` into `CacheState::std_outputs` after drain. The invariant (cached mode ⟹ capture was initialized) is asserted via `.expect` at the transfer point. The cache-update destructure now just pulls `std_outputs` straight from `CacheState` — no `Option` unwrapping at the use site. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/vite_task/src/session/execute/mod.rs | 32 ++++++++++++++------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index e69413c5..920e2842 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -23,7 +23,7 @@ use vite_task_plan::{ use self::{ fingerprint::PostRunFingerprint, glob_inputs::compute_globbed_inputs, - pipe::{PipeIo, pipe_stdio}, + pipe::{PipeIo, StdOutput, pipe_stdio}, spawn::{SpawnStdio, spawn}, tracked_accesses::TrackedPathAccesses, }; @@ -276,11 +276,14 @@ enum ExecutionMode<'a> { } /// Cached-only state carried from mode construction through the cache-update -/// phase. Captured stdout/stderr live in `PipeIo::capture` during drain and -/// are handed to cache update separately. +/// phase. `std_outputs` is empty at construction time and filled post-drain by +/// moving the buffer out of `PipeIo::capture`. struct CacheState<'a> { metadata: &'a CacheMetadata, globbed_inputs: BTreeMap, + /// Captured stdout/stderr for cache replay. Populated after drain; always + /// present (possibly empty) once we reach the cache-update phase. + std_outputs: Vec, /// `Some` iff fspy is enabled (`includes_auto`). Holds the resolved /// negative globs used by [`TrackedPathAccesses::from_raw`] to filter /// tracked accesses. `None` means fspy tracking is off for this task. @@ -422,7 +425,12 @@ pub async fn execute_spawn( }; ExecutionMode::Cached { stdio_config, - state: CacheState { metadata, globbed_inputs, fspy_negatives: fspy }, + state: CacheState { + metadata, + globbed_inputs, + std_outputs: Vec::new(), + fspy_negatives: fspy, + }, } } None => ExecutionMode::Uncached { @@ -465,7 +473,7 @@ pub async fn execute_spawn( // 7. Consume `mode`: pair the `StdioConfig` with an optional capture // buffer as a `PipeIo`, and carry `CacheState` separately for the // cache-update phase. - let (mut pipe_io, cache_state): (Option, Option>) = match mode { + let (mut pipe_io, mut cache_state): (Option, Option>) = match mode { ExecutionMode::Cached { stdio_config, state } => { (Some(PipeIo { stdio_config, capture: Some(Vec::new()) }), Some(state)) } @@ -512,15 +520,19 @@ pub async fn execute_spawn( }; let duration = start.elapsed(); - // Capture buffer lives in `PipeIo` during drain; pull it back out now so - // cache update can own the captured stdout/stderr. - let captured_outputs = pipe_io.and_then(|io| io.capture); + // Move captured stdout/stderr out of `PipeIo` and into `CacheState` so the + // cache update can own them. `pipe_io.capture` is `Some` iff caching was + // on, which is exactly when `cache_state` is `Some`. + if let Some(state) = cache_state.as_mut() { + state.std_outputs = pipe_io + .and_then(|io| io.capture) + .expect("cached mode always initializes `PipeIo::capture`"); + } // 9. Cache update (only when we were in `Cached` mode). Errors during cache // update are reported but do not affect the exit status we return. let (cache_update_status, cache_error) = if let Some(state) = cache_state { - let CacheState { metadata, globbed_inputs, fspy_negatives } = state; - let std_outputs = captured_outputs.unwrap_or_default(); + let CacheState { metadata, globbed_inputs, std_outputs, fspy_negatives } = state; // Normalize fspy accesses. `zip` gives `Some` iff fspy was enabled // (both outcome.path_accesses and fspy_negatives are Some together). From 9f26e51834340929cbb8599a7271b89d509a5237 Mon Sep 17 00:00:00 2001 From: branchseer Date: Fri, 17 Apr 2026 16:30:33 +0800 Subject: [PATCH 10/10] =?UTF-8?q?refactor(execute):=20rename=20`PipeIos`?= =?UTF-8?q?=20=E2=86=92=20`PipeSinks`,=20refresh=20stale=20docs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Preserves the user's split of `StdioConfig` into `{ suggestion, writers: PipeWriters }` and the borrowed-reference shape of `PipeIos<'a>`. Renames to `PipeSinks` — "sinks" mirrors `PipeWriters` and reads better than the awkward "Ios" plural. Propagates the `StdioConfig` → `{ writers: PipeWriters { .. } }` change through the four reporters (plain, grouped, interleaved, labeled) that still built the old shape, and cleans up the absolute-path import in `execute/mod.rs`. Refreshes doc comments that went stale: the drain no longer moves a buffer out of `PipeIo::capture` — it writes straight into `state.std_outputs` via the borrow inside `PipeSinks`. Also adds a reminder right before the `let mut mode` declaration so anyone adding a sibling local pauses to consider putting the value inside a variant instead. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/vite_task/src/session/execute/mod.rs | 103 +++++++++--------- crates/vite_task/src/session/execute/pipe.rs | 40 +++---- .../src/session/reporter/grouped/mod.rs | 10 +- .../src/session/reporter/interleaved/mod.rs | 9 +- .../src/session/reporter/labeled/mod.rs | 20 ++-- crates/vite_task/src/session/reporter/mod.rs | 7 +- .../vite_task/src/session/reporter/plain.rs | 14 ++- 7 files changed, 111 insertions(+), 92 deletions(-) diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index 920e2842..8af09888 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -23,7 +23,7 @@ use vite_task_plan::{ use self::{ fingerprint::PostRunFingerprint, glob_inputs::compute_globbed_inputs, - pipe::{PipeIo, StdOutput, pipe_stdio}, + pipe::{PipeSinks, StdOutput, pipe_stdio}, spawn::{SpawnStdio, spawn}, tracked_accesses::TrackedPathAccesses, }; @@ -35,7 +35,7 @@ use super::{ }, reporter::{ ExitStatus, GraphExecutionReporter, GraphExecutionReporterBuilder, LeafExecutionReporter, - StdioConfig, StdioSuggestion, + PipeWriters, StdioSuggestion, }, }; use crate::{Session, collections::HashMap}; @@ -214,8 +214,8 @@ impl ExecutionContext<'_> { let execution_output = in_process_execution.execute(); // Write output to the stdout writer from StdioConfig - let _ = stdio_config.stdout_writer.write_all(&execution_output.stdout); - let _ = stdio_config.stdout_writer.flush(); + let _ = stdio_config.writers.stdout_writer.write_all(&execution_output.stdout); + let _ = stdio_config.writers.stdout_writer.flush(); leaf_reporter.finish( None, @@ -257,32 +257,33 @@ impl ExecutionContext<'_> { /// The type shape enforces two invariants statically: /// - fspy tracking only exists inside [`ExecutionMode::Cached`] (fspy requires /// `includes_auto`, which only lives on cache metadata). -/// - Cached execution always keeps its `StdioConfig` (piped stdio is forced so +/// - Cached execution always owns [`PipeWriters`] (piped stdio is forced so /// that output can be captured for replay). enum ExecutionMode<'a> { Cached { - /// Consumed during drain; not needed for the cache update. - stdio_config: StdioConfig, - /// Carried through drain into the cache-update phase. + /// Borrowed by [`PipeSinks`] during drain; dropped at end of function. + pipe_writers: PipeWriters, + /// Carried through drain into the cache-update phase. Drain writes + /// into `state.std_outputs` in place via a borrow inside `PipeSinks`. state: CacheState<'a>, }, Uncached { /// `Some` iff the reporter suggested piped stdio. `None` means the - /// child inherits stdin/stdout/stderr from the parent; the original - /// `StdioConfig` was already dropped so we don't hold `std::io::Stdout` - /// while the child writes to the same FD. - stdio_config: Option, + /// child inherits stdin/stdout/stderr from the parent; the reporter's + /// writers were dropped here so we don't hold `std::io::Stdout` while + /// the child writes to the same FD. + pipe_writers: Option, }, } /// Cached-only state carried from mode construction through the cache-update -/// phase. `std_outputs` is empty at construction time and filled post-drain by -/// moving the buffer out of `PipeIo::capture`. +/// phase. `std_outputs` starts empty and is written in place during drain via +/// a borrow inside [`PipeSinks::capture`]. struct CacheState<'a> { metadata: &'a CacheMetadata, globbed_inputs: BTreeMap, - /// Captured stdout/stderr for cache replay. Populated after drain; always - /// present (possibly empty) once we reach the cache-update phase. + /// Captured stdout/stderr for cache replay. Written in place during drain; + /// always present (possibly empty) once we reach the cache-update phase. std_outputs: Vec, /// `Some` iff fspy is enabled (`includes_auto`). Holds the resolved /// negative globs used by [`TrackedPathAccesses::from_raw`] to filter @@ -380,8 +381,8 @@ pub async fn execute_spawn( if let Some(cached) = cached_value { for output in cached.std_outputs.iter() { let writer: &mut dyn std::io::Write = match output.kind { - pipe::OutputKind::StdOut => &mut stdio_config.stdout_writer, - pipe::OutputKind::StdErr => &mut stdio_config.stderr_writer, + pipe::OutputKind::StdOut => &mut stdio_config.writers.stdout_writer, + pipe::OutputKind::StdErr => &mut stdio_config.writers.stderr_writer, }; let _ = writer.write_all(&output.content); let _ = writer.flush(); @@ -398,7 +399,17 @@ pub async fn execute_spawn( // and their associated state into a single value whose shape encodes // the valid combinations. The inherited arm drops `stdio_config` here so // we don't hold `std::io::Stdout` while the child writes to the same FD. - let mode: ExecutionMode<'_> = match cache_metadata { + // + // ───────────────────────────────────────────────────────────────────── + // Before adding a new local variable alongside `mode`: think twice. + // Does it make sense for every variant, or only for some? If it's + // variant-specific (only for `Cached`, only when fspy is on, etc.) put + // it inside the variant (or `CacheState`) so the compiler enforces the + // invariant at construction. Sibling locals drift out of sync with the + // mode and force re-derivation (`if let Some(_) = _`, + // `cache_metadata.is_some_and(_)`) at every downstream use site. + // ───────────────────────────────────────────────────────────────────── + let mut mode: ExecutionMode<'_> = match cache_metadata { Some(metadata) => { let fspy = if metadata.input_config.includes_auto { // Resolve negative globs for fspy path filtering @@ -424,7 +435,7 @@ pub async fn execute_spawn( None }; ExecutionMode::Cached { - stdio_config, + pipe_writers: stdio_config.writers, state: CacheState { metadata, globbed_inputs, @@ -434,16 +445,16 @@ pub async fn execute_spawn( } } None => ExecutionMode::Uncached { - stdio_config: (stdio_config.suggestion == StdioSuggestion::Piped) - .then_some(stdio_config), + pipe_writers: (stdio_config.suggestion == StdioSuggestion::Piped) + .then_some(stdio_config.writers), }, }; // 5. Derive the arguments for `spawn()` from the mode without consuming it. let (spawn_stdio, fspy_enabled) = match &mode { ExecutionMode::Cached { state, .. } => (SpawnStdio::Piped, state.fspy_negatives.is_some()), - ExecutionMode::Uncached { stdio_config: Some(_) } => (SpawnStdio::Piped, false), - ExecutionMode::Uncached { stdio_config: None } => (SpawnStdio::Inherited, false), + ExecutionMode::Uncached { pipe_writers: Some(_) } => (SpawnStdio::Piped, false), + ExecutionMode::Uncached { pipe_writers: None } => (SpawnStdio::Inherited, false), }; // Measure end-to-end duration here — spawn() no longer tracks time. @@ -470,29 +481,32 @@ pub async fn execute_spawn( } }; - // 7. Consume `mode`: pair the `StdioConfig` with an optional capture - // buffer as a `PipeIo`, and carry `CacheState` separately for the - // cache-update phase. - let (mut pipe_io, mut cache_state): (Option, Option>) = match mode { - ExecutionMode::Cached { stdio_config, state } => { - (Some(PipeIo { stdio_config, capture: Some(Vec::new()) }), Some(state)) - } - ExecutionMode::Uncached { stdio_config: Some(stdio_config) } => { - (Some(PipeIo { stdio_config, capture: None }), None) - } - ExecutionMode::Uncached { stdio_config: None } => (None, None), + // 7. Build `PipeSinks` by borrowing into `mode`. The drain fills + // `state.std_outputs` in place (via the borrow inside `capture`), so no + // post-drain transfer is needed. `sinks` is `None` only in the + // inherited-uncached case, where there are no pipes to drain. + let sinks: Option> = match &mut mode { + ExecutionMode::Cached { pipe_writers, state } => Some(PipeSinks { + stdout_writer: &mut pipe_writers.stdout_writer, + stderr_writer: &mut pipe_writers.stderr_writer, + capture: Some(&mut state.std_outputs), + }), + ExecutionMode::Uncached { pipe_writers: Some(pipe_writers) } => Some(PipeSinks { + stdout_writer: &mut pipe_writers.stdout_writer, + stderr_writer: &mut pipe_writers.stderr_writer, + capture: None, + }), + ExecutionMode::Uncached { pipe_writers: None } => None, }; - // Drain stdout/stderr when piped. `pipe_io` is `None` only in the - // inherited-uncached case. - if let Some(pipe_io) = &mut pipe_io { + if let Some(sinks) = sinks { let stdout = child.stdout.take().expect("SpawnStdio::Piped yields a stdout pipe"); let stderr = child.stderr.take().expect("SpawnStdio::Piped yields a stderr pipe"); #[expect( clippy::large_futures, reason = "pipe_stdio streams child I/O and creates a large future" )] - let pipe_result = pipe_stdio(stdout, stderr, pipe_io, fast_fail_token.clone()).await; + let pipe_result = pipe_stdio(stdout, stderr, sinks, fast_fail_token.clone()).await; if let Err(err) = pipe_result { // Cancel so `child.wait` kills the child instead of orphaning it. fast_fail_token.cancel(); @@ -520,18 +534,9 @@ pub async fn execute_spawn( }; let duration = start.elapsed(); - // Move captured stdout/stderr out of `PipeIo` and into `CacheState` so the - // cache update can own them. `pipe_io.capture` is `Some` iff caching was - // on, which is exactly when `cache_state` is `Some`. - if let Some(state) = cache_state.as_mut() { - state.std_outputs = pipe_io - .and_then(|io| io.capture) - .expect("cached mode always initializes `PipeIo::capture`"); - } - // 9. Cache update (only when we were in `Cached` mode). Errors during cache // update are reported but do not affect the exit status we return. - let (cache_update_status, cache_error) = if let Some(state) = cache_state { + let (cache_update_status, cache_error) = if let ExecutionMode::Cached { state, .. } = mode { let CacheState { metadata, globbed_inputs, std_outputs, fspy_negatives } = state; // Normalize fspy accesses. `zip` gives `Some` iff fspy was enabled diff --git a/crates/vite_task/src/session/execute/pipe.rs b/crates/vite_task/src/session/execute/pipe.rs index 9dda1816..359d3a03 100644 --- a/crates/vite_task/src/session/execute/pipe.rs +++ b/crates/vite_task/src/session/execute/pipe.rs @@ -1,5 +1,7 @@ //! Drain child stdout/stderr concurrently to writers, with optional capture. +use std::io::Write; + use serde::Serialize; use tokio::{ io::AsyncReadExt as _, @@ -8,8 +10,6 @@ use tokio::{ use tokio_util::sync::CancellationToken; use wincode::{SchemaRead, SchemaWrite}; -use crate::session::reporter::StdioConfig; - /// Output kind for stdout/stderr #[derive(Debug, PartialEq, Eq, Clone, Copy, SchemaWrite, SchemaRead, Serialize)] pub enum OutputKind { @@ -24,18 +24,20 @@ pub struct StdOutput { pub content: Vec, } -/// Inputs for [`pipe_stdio`]: the reporter's writers plus an optional capture -/// buffer. When `capture` is `Some`, chunks are appended there for cache replay. -pub struct PipeIo { - pub stdio_config: StdioConfig, - pub capture: Option>, +/// Downstream destinations for bytes read from the child's stdout/stderr: +/// two pass-through writers plus an optional capture buffer (populated in +/// place during drain for cache replay). +pub struct PipeSinks<'a> { + pub stdout_writer: &'a mut dyn Write, + pub stderr_writer: &'a mut dyn Write, + pub capture: Option<&'a mut Vec>, } -/// Drain the child's stdout/stderr concurrently into `io`. +/// Drain the child's stdout/stderr concurrently into `sinks`. /// -/// Bytes are written through `io.stdio_config`'s writers in real time and, -/// when `io.capture` is `Some`, also appended (with adjacent same-kind chunks -/// coalesced) for cache replay. +/// Bytes are written through `sinks.stdout_writer` / `sinks.stderr_writer` in +/// real time and, when `sinks.capture` is `Some`, also appended (with adjacent +/// same-kind chunks coalesced) for cache replay. /// /// On cancellation: returns `Ok(())` without killing the child — the caller /// drives the child's cancellation-aware `wait` future next, which observes the @@ -45,11 +47,9 @@ pub struct PipeIo { pub async fn pipe_stdio( mut stdout: ChildStdout, mut stderr: ChildStderr, - io: &mut PipeIo, + mut sinks: PipeSinks<'_>, cancellation_token: CancellationToken, ) -> std::io::Result<()> { - use std::io::Write as _; - let mut stdout_buf = [0u8; 8192]; let mut stderr_buf = [0u8; 8192]; let mut stdout_done = false; @@ -65,9 +65,9 @@ pub async fn pipe_stdio( 0 => stdout_done = true, n => { let bytes = &stdout_buf[..n]; - io.stdio_config.stdout_writer.write_all(bytes)?; - io.stdio_config.stdout_writer.flush()?; - if let Some(capture) = io.capture.as_mut() { + sinks.stdout_writer.write_all(bytes)?; + sinks.stdout_writer.flush()?; + if let Some(capture) = &mut sinks.capture { append_output_chunk(capture, OutputKind::StdOut, bytes); } } @@ -78,9 +78,9 @@ pub async fn pipe_stdio( 0 => stderr_done = true, n => { let bytes = &stderr_buf[..n]; - io.stdio_config.stderr_writer.write_all(bytes)?; - io.stdio_config.stderr_writer.flush()?; - if let Some(capture) = io.capture.as_mut() { + sinks.stderr_writer.write_all(bytes)?; + sinks.stderr_writer.flush()?; + if let Some(capture) = &mut sinks.capture { append_output_chunk(capture, OutputKind::StdErr, bytes); } } diff --git a/crates/vite_task/src/session/reporter/grouped/mod.rs b/crates/vite_task/src/session/reporter/grouped/mod.rs index 5002a3a0..fd181684 100644 --- a/crates/vite_task/src/session/reporter/grouped/mod.rs +++ b/crates/vite_task/src/session/reporter/grouped/mod.rs @@ -8,8 +8,8 @@ use vite_task_plan::{ExecutionItemDisplay, LeafExecutionKind}; use super::{ ColorizeExt, ExitStatus, GraphExecutionReporter, GraphExecutionReporterBuilder, - LeafExecutionReporter, StdioConfig, StdioSuggestion, format_command_with_cache_status, - format_task_label, write_leaf_trailing_output, + LeafExecutionReporter, PipeWriters, StdioConfig, StdioSuggestion, + format_command_with_cache_status, format_task_label, write_leaf_trailing_output, }; use crate::session::event::{CacheStatus, CacheUpdateStatus, ExecutionError}; @@ -94,8 +94,10 @@ impl LeafExecutionReporter for GroupedLeafReporter { StdioConfig { suggestion: StdioSuggestion::Piped, - stdout_writer: Box::new(GroupedWriter::new(Rc::clone(&buffer))), - stderr_writer: Box::new(GroupedWriter::new(buffer)), + writers: PipeWriters { + stdout_writer: Box::new(GroupedWriter::new(Rc::clone(&buffer))), + stderr_writer: Box::new(GroupedWriter::new(buffer)), + }, } } diff --git a/crates/vite_task/src/session/reporter/interleaved/mod.rs b/crates/vite_task/src/session/reporter/interleaved/mod.rs index 5a30ec5f..e7f17f83 100644 --- a/crates/vite_task/src/session/reporter/interleaved/mod.rs +++ b/crates/vite_task/src/session/reporter/interleaved/mod.rs @@ -7,7 +7,8 @@ use vite_task_plan::{ExecutionItemDisplay, LeafExecutionKind}; use super::{ ExitStatus, GraphExecutionReporter, GraphExecutionReporterBuilder, LeafExecutionReporter, - StdioConfig, StdioSuggestion, format_command_with_cache_status, write_leaf_trailing_output, + PipeWriters, StdioConfig, StdioSuggestion, format_command_with_cache_status, + write_leaf_trailing_output, }; use crate::session::event::{CacheStatus, CacheUpdateStatus, ExecutionError}; @@ -84,8 +85,10 @@ impl LeafExecutionReporter for InterleavedLeafReporter { StdioConfig { suggestion: self.stdio_suggestion, - stdout_writer: Box::new(std::io::stdout()), - stderr_writer: Box::new(std::io::stderr()), + writers: PipeWriters { + stdout_writer: Box::new(std::io::stdout()), + stderr_writer: Box::new(std::io::stderr()), + }, } } diff --git a/crates/vite_task/src/session/reporter/labeled/mod.rs b/crates/vite_task/src/session/reporter/labeled/mod.rs index 01c63435..500e190e 100644 --- a/crates/vite_task/src/session/reporter/labeled/mod.rs +++ b/crates/vite_task/src/session/reporter/labeled/mod.rs @@ -7,7 +7,7 @@ use vite_task_plan::{ExecutionItemDisplay, LeafExecutionKind}; use super::{ ExitStatus, GraphExecutionReporter, GraphExecutionReporterBuilder, LeafExecutionReporter, - StdioConfig, StdioSuggestion, format_command_with_cache_status, format_task_label, + PipeWriters, StdioConfig, StdioSuggestion, format_command_with_cache_status, format_task_label, write_leaf_trailing_output, }; use crate::session::event::{CacheStatus, CacheUpdateStatus, ExecutionError}; @@ -86,14 +86,16 @@ impl LeafExecutionReporter for LabeledLeafReporter { StdioConfig { suggestion: StdioSuggestion::Piped, - stdout_writer: Box::new(LabeledWriter::new( - Box::new(std::io::stdout()), - prefix.as_bytes().to_vec(), - )), - stderr_writer: Box::new(LabeledWriter::new( - Box::new(std::io::stderr()), - prefix.as_bytes().to_vec(), - )), + writers: PipeWriters { + stdout_writer: Box::new(LabeledWriter::new( + Box::new(std::io::stdout()), + prefix.as_bytes().to_vec(), + )), + stderr_writer: Box::new(LabeledWriter::new( + Box::new(std::io::stderr()), + prefix.as_bytes().to_vec(), + )), + }, } } diff --git a/crates/vite_task/src/session/reporter/mod.rs b/crates/vite_task/src/session/reporter/mod.rs index 64f5f441..82724ecd 100644 --- a/crates/vite_task/src/session/reporter/mod.rs +++ b/crates/vite_task/src/session/reporter/mod.rs @@ -96,9 +96,12 @@ pub enum StdioSuggestion { pub struct StdioConfig { /// The reporter's preferred stdio mode. pub suggestion: StdioSuggestion, - /// Writer for the child process's stdout (used in piped mode and cache replay). + /// Writer for the child process's stderr and stdout (used in piped mode and cache replay). + pub writers: PipeWriters, +} + +pub struct PipeWriters { pub stdout_writer: Box, - /// Writer for the child process's stderr (used in piped mode and cache replay). pub stderr_writer: Box, } diff --git a/crates/vite_task/src/session/reporter/plain.rs b/crates/vite_task/src/session/reporter/plain.rs index 860e3817..ce7b7abc 100644 --- a/crates/vite_task/src/session/reporter/plain.rs +++ b/crates/vite_task/src/session/reporter/plain.rs @@ -6,7 +6,7 @@ use std::io::Write; use super::{ - LeafExecutionReporter, StdioConfig, StdioSuggestion, format_cache_hit_message, + LeafExecutionReporter, PipeWriters, StdioConfig, StdioSuggestion, format_cache_hit_message, format_error_message, }; use crate::session::event::{CacheStatus, CacheUpdateStatus, ExecutionError}; @@ -62,14 +62,18 @@ impl LeafExecutionReporter for PlainReporter { if self.silent_if_cache_hit && self.is_cache_hit { StdioConfig { suggestion: StdioSuggestion::Inherited, - stdout_writer: Box::new(std::io::sink()), - stderr_writer: Box::new(std::io::sink()), + writers: PipeWriters { + stdout_writer: Box::new(std::io::sink()), + stderr_writer: Box::new(std::io::sink()), + }, } } else { StdioConfig { suggestion: StdioSuggestion::Inherited, - stdout_writer: Box::new(std::io::stdout()), - stderr_writer: Box::new(std::io::stderr()), + writers: PipeWriters { + stdout_writer: Box::new(std::io::stdout()), + stderr_writer: Box::new(std::io::stderr()), + }, } } }