diff --git a/crates/vite_task/src/session/cache/mod.rs b/crates/vite_task/src/session/cache/mod.rs index 484d7d60..885dc97d 100644 --- a/crates/vite_task/src/session/cache/mod.rs +++ b/crates/vite_task/src/session/cache/mod.rs @@ -23,7 +23,7 @@ use wincode::{ io::{Reader, Writer}, }; -use super::execute::{fingerprint::PostRunFingerprint, spawn::StdOutput}; +use super::execute::{fingerprint::PostRunFingerprint, pipe::StdOutput}; /// Cache lookup key identifying a task's execution configuration. /// diff --git a/crates/vite_task/src/session/execute/fingerprint.rs b/crates/vite_task/src/session/execute/fingerprint.rs index dee12a89..d73a7f61 100644 --- a/crates/vite_task/src/session/execute/fingerprint.rs +++ b/crates/vite_task/src/session/execute/fingerprint.rs @@ -16,7 +16,7 @@ use vite_path::{AbsolutePath, RelativePathBuf}; use vite_str::Str; use wincode::{SchemaRead, SchemaWrite}; -use super::spawn::PathRead; +use super::tracked_accesses::PathRead; use crate::{collections::HashMap, session::cache::InputChangeKind}; /// Post-run fingerprint capturing file state after execution. @@ -53,7 +53,8 @@ pub enum DirEntryKind { impl PostRunFingerprint { /// Creates a new fingerprint from path accesses after task execution. /// - /// Negative glob filtering is done upstream in `spawn_with_tracking`. + /// Negative glob filtering is done upstream (see + /// [`super::tracked_accesses::TrackedPathAccesses::from_raw`]). /// Paths already present in `globbed_inputs` are skipped — they are /// already tracked by the prerun glob fingerprint, and the read-write /// overlap check in `execute_spawn` guarantees the task did not modify diff --git a/crates/vite_task/src/session/execute/mod.rs b/crates/vite_task/src/session/execute/mod.rs index 9d5bf8e4..8af09888 100644 --- a/crates/vite_task/src/session/execute/mod.rs +++ b/crates/vite_task/src/session/execute/mod.rs @@ -1,25 +1,31 @@ pub mod fingerprint; pub mod glob_inputs; mod hash; +pub mod pipe; pub mod spawn; +pub mod tracked_accesses; +#[cfg(windows)] +mod win_job; -use std::{cell::RefCell, collections::BTreeMap, io::Write as _, process::Stdio, sync::Arc}; +use std::{cell::RefCell, collections::BTreeMap, io::Write as _, sync::Arc, time::Instant}; use futures_util::{FutureExt, StreamExt, future::LocalBoxFuture, stream::FuturesUnordered}; use petgraph::Direction; use rustc_hash::FxHashMap; use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; -use vite_path::AbsolutePath; +use vite_path::{AbsolutePath, RelativePathBuf}; use vite_task_plan::{ - ExecutionGraph, ExecutionItemDisplay, ExecutionItemKind, LeafExecutionKind, SpawnCommand, - SpawnExecution, execution_graph::ExecutionNodeIndex, + ExecutionGraph, ExecutionItemDisplay, ExecutionItemKind, LeafExecutionKind, SpawnExecution, + cache_metadata::CacheMetadata, execution_graph::ExecutionNodeIndex, }; use self::{ fingerprint::PostRunFingerprint, glob_inputs::compute_globbed_inputs, - spawn::{SpawnResult, TrackedPathAccesses, spawn_with_tracking}, + pipe::{PipeSinks, StdOutput, pipe_stdio}, + spawn::{SpawnStdio, spawn}, + tracked_accesses::TrackedPathAccesses, }; use super::{ cache::{CacheEntryValue, ExecutionCache}, @@ -29,7 +35,7 @@ use super::{ }, reporter::{ ExitStatus, GraphExecutionReporter, GraphExecutionReporterBuilder, LeafExecutionReporter, - StdioSuggestion, + PipeWriters, StdioSuggestion, }, }; use crate::{Session, collections::HashMap}; @@ -208,8 +214,8 @@ impl ExecutionContext<'_> { let execution_output = in_process_execution.execute(); // Write output to the stdout writer from StdioConfig - let _ = stdio_config.stdout_writer.write_all(&execution_output.stdout); - let _ = stdio_config.stdout_writer.flush(); + let _ = stdio_config.writers.stdout_writer.write_all(&execution_output.stdout); + let _ = stdio_config.writers.stdout_writer.flush(); leaf_reporter.finish( None, @@ -245,6 +251,46 @@ impl ExecutionContext<'_> { } } +/// All valid runtime configurations for a leaf execution, after the cache-hit +/// early-return has been ruled out. +/// +/// The type shape enforces two invariants statically: +/// - fspy tracking only exists inside [`ExecutionMode::Cached`] (fspy requires +/// `includes_auto`, which only lives on cache metadata). +/// - Cached execution always owns [`PipeWriters`] (piped stdio is forced so +/// that output can be captured for replay). +enum ExecutionMode<'a> { + Cached { + /// Borrowed by [`PipeSinks`] during drain; dropped at end of function. + pipe_writers: PipeWriters, + /// Carried through drain into the cache-update phase. Drain writes + /// into `state.std_outputs` in place via a borrow inside `PipeSinks`. + state: CacheState<'a>, + }, + Uncached { + /// `Some` iff the reporter suggested piped stdio. `None` means the + /// child inherits stdin/stdout/stderr from the parent; the reporter's + /// writers were dropped here so we don't hold `std::io::Stdout` while + /// the child writes to the same FD. + pipe_writers: Option, + }, +} + +/// Cached-only state carried from mode construction through the cache-update +/// phase. `std_outputs` starts empty and is written in place during drain via +/// a borrow inside [`PipeSinks::capture`]. +struct CacheState<'a> { + metadata: &'a CacheMetadata, + globbed_inputs: BTreeMap, + /// Captured stdout/stderr for cache replay. Written in place during drain; + /// always present (possibly empty) once we reach the cache-update phase. + std_outputs: Vec, + /// `Some` iff fspy is enabled (`includes_auto`). Holds the resolved + /// negative globs used by [`TrackedPathAccesses::from_raw`] to filter + /// tracked accesses. `None` means fspy tracking is off for this task. + fspy_negatives: Option>>, +} + /// Execute a spawned process with cache-aware lifecycle. /// /// This is a free function (not tied to `ExecutionContext`) so it can be reused @@ -254,8 +300,8 @@ impl ExecutionContext<'_> { /// 1. Cache lookup (determines cache status) /// 2. `leaf_reporter.start(cache_status)` → `StdioConfig` /// 3. If cache hit: replay cached outputs via `StdioConfig` writers → finish -/// 4. If `Inherited` suggestion AND caching disabled: `spawn_inherited()` → finish -/// 5. Else (piped): `spawn_with_tracking()` with writers → cache update → finish +/// 4. Otherwise: `spawn()` with the chosen stdio mode, optionally `pipe_stdio()` +/// to drain, then `child.wait` → cache update → finish /// /// Errors (cache lookup failure, spawn failure, cache update failure) are reported /// through `leaf_reporter.finish()` and do not abort the caller. @@ -335,8 +381,8 @@ pub async fn execute_spawn( if let Some(cached) = cached_value { for output in cached.std_outputs.iter() { let writer: &mut dyn std::io::Write = match output.kind { - spawn::OutputKind::StdOut => &mut stdio_config.stdout_writer, - spawn::OutputKind::StdErr => &mut stdio_config.stderr_writer, + pipe::OutputKind::StdOut => &mut stdio_config.writers.stdout_writer, + pipe::OutputKind::StdErr => &mut stdio_config.writers.stderr_writer, }; let _ = writer.write_all(&output.content); let _ = writer.flush(); @@ -349,96 +395,82 @@ pub async fn execute_spawn( return SpawnOutcome::CacheHit; } - // 4. Determine actual stdio mode based on the suggestion AND cache state. - // Inherited stdio is only used when the reporter suggests it AND caching is - // completely disabled (no cache_metadata). If caching is enabled but missed, - // we still need piped mode to capture output for the cache update. - let use_inherited = - stdio_config.suggestion == StdioSuggestion::Inherited && cache_metadata.is_none(); - - if use_inherited { - // Inherited mode: all three stdio FDs (stdin, stdout, stderr) are inherited - // from the parent process. No fspy tracking, no output capture. - // Drop the StdioConfig writers before spawning to avoid holding std::io::Stdout - // while the child also writes to the same FD. - drop(stdio_config); - - match spawn_inherited(&spawn_execution.spawn_command, fast_fail_token).await { - Ok(result) => { - leaf_reporter.finish( - Some(result.exit_status), - CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), - None, - ); - return SpawnOutcome::Spawned(result.exit_status); - } - Err(err) => { - leaf_reporter.finish( - None, - CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), - Some(ExecutionError::Spawn(err)), - ); - return SpawnOutcome::Failed; - } - } - } - - // 5. Piped mode: execute spawn with tracking, streaming output to writers. - // - std_outputs: always captured when caching is enabled (for cache replay) - // - path_accesses: only tracked when includes_auto is true (fspy inference) - let (mut std_outputs, mut path_accesses, cache_metadata_and_inputs) = - cache_metadata.map_or((None, None, None), |cache_metadata| { - // On musl targets, LD_PRELOAD-based tracking is unavailable but seccomp - // unotify provides equivalent file access tracing. - let path_accesses = if cache_metadata.input_config.includes_auto { - Some(TrackedPathAccesses::default()) + // 4. Build the execution mode. This folds the cache/fspy/stdio decisions + // and their associated state into a single value whose shape encodes + // the valid combinations. The inherited arm drops `stdio_config` here so + // we don't hold `std::io::Stdout` while the child writes to the same FD. + // + // ───────────────────────────────────────────────────────────────────── + // Before adding a new local variable alongside `mode`: think twice. + // Does it make sense for every variant, or only for some? If it's + // variant-specific (only for `Cached`, only when fspy is on, etc.) put + // it inside the variant (or `CacheState`) so the compiler enforces the + // invariant at construction. Sibling locals drift out of sync with the + // mode and force re-derivation (`if let Some(_) = _`, + // `cache_metadata.is_some_and(_)`) at every downstream use site. + // ───────────────────────────────────────────────────────────────────── + let mut mode: ExecutionMode<'_> = match cache_metadata { + Some(metadata) => { + let fspy = if metadata.input_config.includes_auto { + // Resolve negative globs for fspy path filtering + // (already workspace-root-relative). + match metadata + .input_config + .negative_globs + .iter() + .map(|p| Ok(wax::Glob::new(p.as_str())?.into_owned())) + .collect::>>() + { + Ok(negs) => Some(negs), + Err(err) => { + leaf_reporter.finish( + None, + CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), + Some(ExecutionError::PostRunFingerprint(err)), + ); + return SpawnOutcome::Failed; + } + } } else { - None // Skip fspy when inference is disabled or unavailable + None }; - (Some(Vec::new()), path_accesses, Some((cache_metadata, globbed_inputs))) - }); - - // Build negative globs for fspy path filtering (already workspace-root-relative) - let resolved_negatives: Vec> = - if let Some((cache_metadata, _)) = &cache_metadata_and_inputs { - match cache_metadata - .input_config - .negative_globs - .iter() - .map(|p| Ok(wax::Glob::new(p.as_str())?.into_owned())) - .collect::>>() - { - Ok(negs) => negs, - Err(err) => { - leaf_reporter.finish( - None, - CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), - Some(ExecutionError::PostRunFingerprint(err)), - ); - return SpawnOutcome::Failed; - } + ExecutionMode::Cached { + pipe_writers: stdio_config.writers, + state: CacheState { + metadata, + globbed_inputs, + std_outputs: Vec::new(), + fspy_negatives: fspy, + }, } - } else { - Vec::new() - }; + } + None => ExecutionMode::Uncached { + pipe_writers: (stdio_config.suggestion == StdioSuggestion::Piped) + .then_some(stdio_config.writers), + }, + }; + + // 5. Derive the arguments for `spawn()` from the mode without consuming it. + let (spawn_stdio, fspy_enabled) = match &mode { + ExecutionMode::Cached { state, .. } => (SpawnStdio::Piped, state.fspy_negatives.is_some()), + ExecutionMode::Uncached { pipe_writers: Some(_) } => (SpawnStdio::Piped, false), + ExecutionMode::Uncached { pipe_writers: None } => (SpawnStdio::Inherited, false), + }; + + // Measure end-to-end duration here — spawn() no longer tracks time. + let start = Instant::now(); - #[expect( - clippy::large_futures, - reason = "spawn_with_tracking manages process I/O and creates a large future" - )] - let result = match spawn_with_tracking( + // 6. Spawn. Returns pipes (Piped) or `None` (Inherited) plus a + // cancellation-aware wait future. + let mut child = match spawn( &spawn_execution.spawn_command, - cache_base_path, - &mut *stdio_config.stdout_writer, - &mut *stdio_config.stderr_writer, - std_outputs.as_mut(), - path_accesses.as_mut(), - &resolved_negatives, + fspy_enabled, + spawn_stdio, fast_fail_token.clone(), ) .await { - Ok(result) => result, + Ok(child) => child, Err(err) => { leaf_reporter.finish( None, @@ -449,16 +481,77 @@ pub async fn execute_spawn( } }; - // 6. Update cache if successful and determine cache update status. - // Errors during cache update are terminal (reported through finish). - let (cache_update_status, cache_error) = if let Some((cache_metadata, globbed_inputs)) = - cache_metadata_and_inputs - { + // 7. Build `PipeSinks` by borrowing into `mode`. The drain fills + // `state.std_outputs` in place (via the borrow inside `capture`), so no + // post-drain transfer is needed. `sinks` is `None` only in the + // inherited-uncached case, where there are no pipes to drain. + let sinks: Option> = match &mut mode { + ExecutionMode::Cached { pipe_writers, state } => Some(PipeSinks { + stdout_writer: &mut pipe_writers.stdout_writer, + stderr_writer: &mut pipe_writers.stderr_writer, + capture: Some(&mut state.std_outputs), + }), + ExecutionMode::Uncached { pipe_writers: Some(pipe_writers) } => Some(PipeSinks { + stdout_writer: &mut pipe_writers.stdout_writer, + stderr_writer: &mut pipe_writers.stderr_writer, + capture: None, + }), + ExecutionMode::Uncached { pipe_writers: None } => None, + }; + + if let Some(sinks) = sinks { + let stdout = child.stdout.take().expect("SpawnStdio::Piped yields a stdout pipe"); + let stderr = child.stderr.take().expect("SpawnStdio::Piped yields a stderr pipe"); + #[expect( + clippy::large_futures, + reason = "pipe_stdio streams child I/O and creates a large future" + )] + let pipe_result = pipe_stdio(stdout, stderr, sinks, fast_fail_token.clone()).await; + if let Err(err) = pipe_result { + // Cancel so `child.wait` kills the child instead of orphaning it. + fast_fail_token.cancel(); + let _ = child.wait.await; + leaf_reporter.finish( + None, + CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), + Some(ExecutionError::Spawn(err.into())), + ); + return SpawnOutcome::Failed; + } + } + + // 8. Wait for exit (handles cancellation internally). + let outcome = match child.wait.await { + Ok(outcome) => outcome, + Err(err) => { + leaf_reporter.finish( + None, + CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), + Some(ExecutionError::Spawn(err.into())), + ); + return SpawnOutcome::Failed; + } + }; + let duration = start.elapsed(); + + // 9. Cache update (only when we were in `Cached` mode). Errors during cache + // update are reported but do not affect the exit status we return. + let (cache_update_status, cache_error) = if let ExecutionMode::Cached { state, .. } = mode { + let CacheState { metadata, globbed_inputs, std_outputs, fspy_negatives } = state; + + // Normalize fspy accesses. `zip` gives `Some` iff fspy was enabled + // (both outcome.path_accesses and fspy_negatives are Some together). + let path_accesses = outcome + .path_accesses + .as_ref() + .zip(fspy_negatives.as_deref()) + .map(|(raw, negs)| TrackedPathAccesses::from_raw(raw, cache_base_path, negs)); + let cancelled = fast_fail_token.is_cancelled() || interrupt_token.is_cancelled(); if cancelled { // Cancelled (Ctrl-C or sibling failure) — result is untrustworthy (CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::Cancelled), None) - } else if result.exit_status.success() { + } else if outcome.exit_status.success() { // Check for read-write overlap: if the task wrote to any file it also // read, the inputs were modified during execution — don't cache. // Note: this only checks fspy-inferred reads, not globbed_inputs keys. @@ -488,11 +581,11 @@ pub async fn execute_spawn( Ok(post_run_fingerprint) => { let new_cache_value = CacheEntryValue { post_run_fingerprint, - std_outputs: std_outputs.unwrap_or_default().into(), - duration: result.duration, + std_outputs: std_outputs.into(), + duration, globbed_inputs, }; - match cache.update(cache_metadata, new_cache_value).await { + match cache.update(metadata, new_cache_value).await { Ok(()) => (CacheUpdateStatus::Updated, None), Err(err) => ( CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), @@ -521,182 +614,9 @@ pub async fn execute_spawn( // 7. Finish the leaf execution with the result and optional cache error. // Cache update/fingerprint failures are reported but do not affect the outcome — // the process ran, so we return its actual exit status. - leaf_reporter.finish(Some(result.exit_status), cache_update_status, cache_error); - - SpawnOutcome::Spawned(result.exit_status) -} + leaf_reporter.finish(Some(outcome.exit_status), cache_update_status, cache_error); -/// Spawn a command with all three stdio file descriptors inherited from the parent. -/// -/// Used when the reporter suggests inherited stdio AND caching is disabled. -/// All three FDs (stdin, stdout, stderr) are inherited, allowing interactive input -/// and direct terminal output. No fspy tracking is performed since there's no -/// cache to update. -/// -/// The child process will see `is_terminal() == true` for stdout/stderr when the -/// parent is running in a terminal. This is expected behavior. -#[tracing::instrument(level = "debug", skip_all)] -async fn spawn_inherited( - spawn_command: &SpawnCommand, - fast_fail_token: CancellationToken, -) -> anyhow::Result { - let mut cmd = fspy::Command::new(spawn_command.program_path.as_path()); - cmd.args(spawn_command.args.iter().map(vite_str::Str::as_str)); - cmd.envs(spawn_command.all_envs.iter()); - cmd.current_dir(&*spawn_command.cwd); - cmd.stdin(Stdio::inherit()).stdout(Stdio::inherit()).stderr(Stdio::inherit()); - - let start = std::time::Instant::now(); - let mut tokio_cmd = cmd.into_tokio_command(); - - // Clear FD_CLOEXEC on stdio fds before exec. libuv (used by Node.js) marks - // stdin/stdout/stderr as close-on-exec, which causes them to be closed when - // the child process calls exec(). Without this fix, the child's fds 0-2 are - // closed after exec and Node.js reopens them as /dev/null, losing all output. - // See: https://github.com/libuv/libuv/issues/2062 - // SAFETY: The pre_exec closure only performs fcntl operations to clear - // FD_CLOEXEC flags on stdio fds, which is safe in a post-fork context. - #[cfg(unix)] - unsafe { - tokio_cmd.pre_exec(|| { - use std::os::fd::BorrowedFd; - - use nix::{ - fcntl::{FcntlArg, FdFlag, fcntl}, - libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO}, - }; - for fd in [STDIN_FILENO, STDOUT_FILENO, STDERR_FILENO] { - // SAFETY: fds 0-2 are always valid in a post-fork context - let borrowed = BorrowedFd::borrow_raw(fd); - if let Ok(flags) = fcntl(borrowed, FcntlArg::F_GETFD) { - let mut fd_flags = FdFlag::from_bits_retain(flags); - if fd_flags.contains(FdFlag::FD_CLOEXEC) { - fd_flags.remove(FdFlag::FD_CLOEXEC); - let _ = fcntl(borrowed, FcntlArg::F_SETFD(fd_flags)); - } - } - } - Ok(()) - }); - } - - let mut child = tokio_cmd.spawn()?; - - // On Windows, assign the child to a Job Object with KILL_ON_JOB_CLOSE so that - // all descendant processes (e.g., node.exe spawned by a .cmd shim) are killed - // when the job handle is dropped. Without this, TerminateProcess only kills the - // direct child, leaving grandchildren alive. - #[cfg(windows)] - let _job = { - use std::os::windows::io::{AsRawHandle, BorrowedHandle}; - // Duplicate the process handle so the job outlives tokio's handle. - // SAFETY: The child was just spawned, so its raw handle is valid. - let borrowed = unsafe { BorrowedHandle::borrow_raw(child.raw_handle().unwrap()) }; - let owned = borrowed.try_clone_to_owned()?; - win_job::assign_to_kill_on_close_job(owned.as_raw_handle())? - }; - - let exit_status = tokio::select! { - status = child.wait() => status?, - () = fast_fail_token.cancelled() => { - child.start_kill()?; - child.wait().await? - } - }; - - Ok(SpawnResult { exit_status, duration: start.elapsed() }) -} - -/// Win32 Job Object utilities for process tree management. -/// -/// On Windows, `TerminateProcess` only kills the direct child process, not its -/// descendants. This module creates a Job Object with `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE`, -/// which automatically terminates all processes in the job when the handle is dropped. -#[cfg(windows)] -mod win_job { - use std::{io, os::windows::io::RawHandle}; - - use winapi::{ - shared::minwindef::FALSE, - um::{ - handleapi::CloseHandle, - jobapi2::{ - AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject, - TerminateJobObject, - }, - winnt::{ - HANDLE, JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, JOBOBJECT_EXTENDED_LIMIT_INFORMATION, - }, - }, - }; - - /// RAII wrapper around a Win32 Job Object `HANDLE` that closes it on drop. - pub(super) struct OwnedJobHandle(HANDLE); - - impl OwnedJobHandle { - /// Immediately terminate all processes in the job. - /// - /// This is needed when pipes to a grandchild process must be closed before - /// the job handle is dropped (e.g., to unblock pipe reads in `spawn_with_tracking`). - pub(super) fn terminate(&self) { - // SAFETY: self.0 is a valid job handle from CreateJobObjectW. - unsafe { TerminateJobObject(self.0, 1) }; - } - } - - impl Drop for OwnedJobHandle { - fn drop(&mut self) { - // SAFETY: self.0 is a valid handle obtained from CreateJobObjectW. - unsafe { CloseHandle(self.0) }; - } - } - - /// Create a Job Object with `KILL_ON_JOB_CLOSE` and assign a process to it. - /// - /// Returns the job handle wrapped in an RAII guard. When dropped, all processes - /// in the job (the child and its descendants) are terminated. - pub(super) fn assign_to_kill_on_close_job( - process_handle: RawHandle, - ) -> io::Result { - // SAFETY: Creating an anonymous job object with no security attributes. - let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) }; - if job.is_null() { - return Err(io::Error::last_os_error()); - } - let job = OwnedJobHandle(job); - - // Configure the job to kill all processes when the handle is closed. - // SAFETY: JOBOBJECT_EXTENDED_LIMIT_INFORMATION is a plain C struct (no pointers - // in the zeroed fields). Zeroing then setting LimitFlags is the standard pattern. - let mut info = unsafe { - let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = std::mem::zeroed(); - info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; - info - }; - - // SAFETY: info is a valid JOBOBJECT_EXTENDED_LIMIT_INFORMATION, job.0 is a valid handle. - let ok = unsafe { - SetInformationJobObject( - job.0, - // JobObjectExtendedLimitInformation = 9 - 9, - std::ptr::from_mut(&mut info).cast(), - std::mem::size_of::().try_into().unwrap(), - ) - }; - if ok == FALSE { - return Err(io::Error::last_os_error()); - } - - // SAFETY: Both handles are valid — job from CreateJobObjectW, process handle - // from the caller. - let ok = unsafe { AssignProcessToJobObject(job.0, process_handle as HANDLE) }; - if ok == FALSE { - return Err(io::Error::last_os_error()); - } - - Ok(job) - } + SpawnOutcome::Spawned(outcome.exit_status) } impl Session<'_> { diff --git a/crates/vite_task/src/session/execute/pipe.rs b/crates/vite_task/src/session/execute/pipe.rs new file mode 100644 index 00000000..359d3a03 --- /dev/null +++ b/crates/vite_task/src/session/execute/pipe.rs @@ -0,0 +1,104 @@ +//! Drain child stdout/stderr concurrently to writers, with optional capture. + +use std::io::Write; + +use serde::Serialize; +use tokio::{ + io::AsyncReadExt as _, + process::{ChildStderr, ChildStdout}, +}; +use tokio_util::sync::CancellationToken; +use wincode::{SchemaRead, SchemaWrite}; + +/// Output kind for stdout/stderr +#[derive(Debug, PartialEq, Eq, Clone, Copy, SchemaWrite, SchemaRead, Serialize)] +pub enum OutputKind { + StdOut, + StdErr, +} + +/// Output chunk with stream kind +#[derive(Debug, SchemaWrite, SchemaRead, Serialize, Clone)] +pub struct StdOutput { + pub kind: OutputKind, + pub content: Vec, +} + +/// Downstream destinations for bytes read from the child's stdout/stderr: +/// two pass-through writers plus an optional capture buffer (populated in +/// place during drain for cache replay). +pub struct PipeSinks<'a> { + pub stdout_writer: &'a mut dyn Write, + pub stderr_writer: &'a mut dyn Write, + pub capture: Option<&'a mut Vec>, +} + +/// Drain the child's stdout/stderr concurrently into `sinks`. +/// +/// Bytes are written through `sinks.stdout_writer` / `sinks.stderr_writer` in +/// real time and, when `sinks.capture` is `Some`, also appended (with adjacent +/// same-kind chunks coalesced) for cache replay. +/// +/// On cancellation: returns `Ok(())` without killing the child — the caller +/// drives the child's cancellation-aware `wait` future next, which observes the +/// same already-fired token and performs the kill. Dropping `stdout`/`stderr` +/// closes the pipe read ends (EPIPE on Unix, `ERROR_BROKEN_PIPE` on Windows). +#[tracing::instrument(level = "debug", skip_all)] +pub async fn pipe_stdio( + mut stdout: ChildStdout, + mut stderr: ChildStderr, + mut sinks: PipeSinks<'_>, + cancellation_token: CancellationToken, +) -> std::io::Result<()> { + let mut stdout_buf = [0u8; 8192]; + let mut stderr_buf = [0u8; 8192]; + let mut stdout_done = false; + let mut stderr_done = false; + + loop { + if stdout_done && stderr_done { + return Ok(()); + } + tokio::select! { + result = stdout.read(&mut stdout_buf), if !stdout_done => { + match result? { + 0 => stdout_done = true, + n => { + let bytes = &stdout_buf[..n]; + sinks.stdout_writer.write_all(bytes)?; + sinks.stdout_writer.flush()?; + if let Some(capture) = &mut sinks.capture { + append_output_chunk(capture, OutputKind::StdOut, bytes); + } + } + } + } + result = stderr.read(&mut stderr_buf), if !stderr_done => { + match result? { + 0 => stderr_done = true, + n => { + let bytes = &stderr_buf[..n]; + sinks.stderr_writer.write_all(bytes)?; + sinks.stderr_writer.flush()?; + if let Some(capture) = &mut sinks.capture { + append_output_chunk(capture, OutputKind::StdErr, bytes); + } + } + } + } + () = cancellation_token.cancelled() => { + return Ok(()); + } + } + } +} + +fn append_output_chunk(capture: &mut Vec, kind: OutputKind, bytes: &[u8]) { + if let Some(last) = capture.last_mut() + && last.kind == kind + { + last.content.extend_from_slice(bytes); + } else { + capture.push(StdOutput { kind, content: bytes.to_vec() }); + } +} diff --git a/crates/vite_task/src/session/execute/spawn.rs b/crates/vite_task/src/session/execute/spawn.rs index 92da92b1..0f29926b 100644 --- a/crates/vite_task/src/session/execute/spawn.rs +++ b/crates/vite_task/src/session/execute/spawn.rs @@ -1,331 +1,186 @@ -//! Process spawning with file system tracking via fspy. +//! Unified spawn abstraction over fspy and plain tokio processes. +//! +//! [`spawn`] does one thing: hand back the child's stdio pipes plus a +//! cancellation-aware `wait` future. Draining the pipes is [`super::pipe`]'s +//! job; normalizing fspy path accesses is [`super::tracked_accesses`]'s. -use std::{ - collections::hash_map::Entry, - io::Write, - process::{ExitStatus, Stdio}, - time::{Duration, Instant}, -}; +use std::{io, process::Stdio}; -use fspy::AccessMode; -use rustc_hash::FxHashSet; -use serde::Serialize; -use tokio::io::AsyncReadExt as _; +use fspy::PathAccessIterable; +use futures_util::{FutureExt, future::LocalBoxFuture}; +use tokio::process::{ChildStderr, ChildStdout}; use tokio_util::sync::CancellationToken; -use vite_path::{AbsolutePath, RelativePathBuf}; use vite_task_plan::SpawnCommand; -use wax::Program as _; -use wincode::{SchemaRead, SchemaWrite}; -use crate::collections::HashMap; - -/// Path read access info -#[derive(Debug, Clone, Copy)] -pub struct PathRead { - pub read_dir_entries: bool, -} - -/// Output kind for stdout/stderr -#[derive(Debug, PartialEq, Eq, Clone, Copy, SchemaWrite, SchemaRead, Serialize)] -pub enum OutputKind { - StdOut, - StdErr, +/// How the child's stdin/stdout/stderr are configured. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SpawnStdio { + /// All three fds inherited from the parent. On Unix, [`spawn`] also clears + /// `FD_CLOEXEC` on fds 0-2 (libuv workaround — + /// ). + Inherited, + /// stdin is `/dev/null`; stdout and stderr are piped. Drain the pipes with + /// [`super::pipe::pipe_stdio`]. + Piped, } -/// Output chunk with stream kind -#[derive(Debug, SchemaWrite, SchemaRead, Serialize, Clone)] -pub struct StdOutput { - pub kind: OutputKind, - pub content: Vec, +/// Handle to a spawned child. +/// +/// `stdout` and `stderr` are `Some` iff [`SpawnStdio::Piped`] was requested. +/// `wait` resolves when the child exits and handles cancellation internally: +/// when the token fires, the child (and on Windows its descendants via the Job +/// Object) is killed before the future resolves. +pub struct ChildHandle { + pub stdout: Option, + pub stderr: Option, + pub wait: LocalBoxFuture<'static, io::Result>, } -/// Result of spawning a process with file tracking -#[derive(Debug)] -pub struct SpawnResult { - pub exit_status: ExitStatus, - pub duration: Duration, +/// Result of waiting for a child to exit. +pub struct ChildOutcome { + pub exit_status: std::process::ExitStatus, + /// Raw fspy accesses. `Some` iff `fspy` was `true` at spawn time. + pub path_accesses: Option, } -/// Tracked file accesses from fspy. -/// Only populated when fspy tracking is enabled (`includes_auto` is true). -#[derive(Default, Debug)] -pub struct TrackedPathAccesses { - /// Tracked path reads - pub path_reads: HashMap, +/// Spawn a command with the requested fspy and stdio configuration. +/// +/// Cancellation is unified: whether fspy is enabled or not, the returned `wait` +/// future observes `cancellation_token` and kills the child before resolving. +#[tracing::instrument(level = "debug", skip_all)] +pub async fn spawn( + cmd: &SpawnCommand, + fspy: bool, + stdio: SpawnStdio, + cancellation_token: CancellationToken, +) -> anyhow::Result { + let mut fspy_cmd = fspy::Command::new(cmd.program_path.as_path()); + fspy_cmd.args(cmd.args.iter().map(vite_str::Str::as_str)); + fspy_cmd.envs(cmd.all_envs.iter()); + fspy_cmd.current_dir(&*cmd.cwd); + + match stdio { + SpawnStdio::Inherited => { + fspy_cmd.stdin(Stdio::inherit()).stdout(Stdio::inherit()).stderr(Stdio::inherit()); + // libuv (used by Node.js) marks stdin/stdout/stderr as close-on-exec; + // without this fix the child reopens fds 0-2 as /dev/null after exec. + // See: https://github.com/libuv/libuv/issues/2062 + // SAFETY: the pre_exec closure only performs fcntl operations on + // stdio fds, which is safe in a post-fork context. + #[cfg(unix)] + unsafe { + fspy_cmd.pre_exec(clear_stdio_cloexec); + } + } + SpawnStdio::Piped => { + fspy_cmd.stdin(Stdio::null()).stdout(Stdio::piped()).stderr(Stdio::piped()); + } + } - /// Tracked path writes - pub path_writes: FxHashSet, + if fspy { + spawn_fspy(fspy_cmd, cancellation_token).await + } else { + spawn_tokio(fspy_cmd, cancellation_token) + } } -#[expect( - clippy::disallowed_types, - reason = "fspy strip_path_prefix exposes std::path::Path; convert to RelativePathBuf immediately" -)] -fn normalize_tracked_workspace_path( - stripped_path: &std::path::Path, - resolved_negatives: &[wax::Glob<'static>], -) -> Option { - // On Windows, paths are possible to be still absolute after stripping the workspace root. - // For example: c:\workspace\subdir\c:\workspace\subdir - // Just ignore those accesses. - let relative = RelativePathBuf::new(stripped_path).ok()?; +async fn spawn_fspy( + cmd: fspy::Command, + cancellation_token: CancellationToken, +) -> anyhow::Result { + let mut tracked = cmd.spawn(cancellation_token).await?; - // Clean `..` components — fspy may report paths like - // `packages/sub-pkg/../shared/dist/output.js`. Normalize them for - // consistent behavior across platforms and clean user-facing messages. - let relative = relative.clean().ok()?; + // On Windows, assign the child to a Job Object so that killing the child + // also kills all descendant processes (e.g., node.exe via a .cmd shim). + #[cfg(windows)] + let job = { + use std::os::windows::io::AsRawHandle; + super::win_job::assign_to_kill_on_close_job(tracked.process_handle.as_raw_handle())? + }; - // Skip .git directory accesses (workaround for tools like oxlint) - if relative.as_path().strip_prefix(".git").is_ok() { - return None; - } + let stdout = tracked.stdout.take(); + let stderr = tracked.stderr.take(); + let wait_handle = tracked.wait_handle; - if !resolved_negatives.is_empty() - && resolved_negatives.iter().any(|neg| neg.is_match(relative.as_str())) - { - return None; + let wait = async move { + let termination = wait_handle.await?; + // Drop order: `job` drops here, KILL_ON_JOB_CLOSE kills any descendants + // still alive. fspy's wait handle already watched the cancellation + // token and killed the direct child. + #[cfg(windows)] + drop(job); + Ok(ChildOutcome { + exit_status: termination.status, + path_accesses: Some(termination.path_accesses), + }) } + .boxed_local(); - Some(relative) -} - -/// How the child process is awaited after stdout/stderr are drained. -enum ChildWait { - /// fspy tracking enabled — fspy manages cancellation internally. - Fspy(fspy::TrackedChild), - - /// Plain tokio process — cancellation is handled in the pipe read loop. - Tokio(tokio::process::Child), + Ok(ChildHandle { stdout, stderr, wait }) } -/// Spawn a command with optional file system tracking via fspy, using piped stdio. -/// -/// Returns the execution result including exit status and duration. -/// -/// - stdin is always `/dev/null` (piped mode is for non-interactive execution). -/// - `stdout_writer`/`stderr_writer` receive the child's stdout/stderr output in real-time. -/// - `std_outputs` if provided, will be populated with captured outputs for cache replay. -/// - `path_accesses` if provided, fspy will be used to track file accesses. If `None`, fspy is disabled. -/// - `resolved_negatives` - resolved negative glob patterns for filtering fspy-tracked paths. -#[tracing::instrument(level = "debug", skip_all)] -#[expect( - clippy::too_many_lines, - reason = "spawn logic is inherently sequential and splitting would reduce clarity" -)] -#[expect( - clippy::too_many_arguments, - reason = "spawn parameters are all distinct concerns that don't form a natural group" -)] -pub async fn spawn_with_tracking( - spawn_command: &SpawnCommand, - workspace_root: &AbsolutePath, - stdout_writer: &mut dyn Write, - stderr_writer: &mut dyn Write, - std_outputs: Option<&mut Vec>, - path_accesses: Option<&mut TrackedPathAccesses>, - resolved_negatives: &[wax::Glob<'static>], - fast_fail_token: CancellationToken, -) -> anyhow::Result { - let mut cmd = fspy::Command::new(spawn_command.program_path.as_path()); - cmd.args(spawn_command.args.iter().map(vite_str::Str::as_str)); - cmd.envs(spawn_command.all_envs.iter()); - cmd.current_dir(&*spawn_command.cwd); - cmd.stdin(Stdio::null()).stdout(Stdio::piped()).stderr(Stdio::piped()); +fn spawn_tokio( + cmd: fspy::Command, + cancellation_token: CancellationToken, +) -> anyhow::Result { + let mut child = cmd.into_tokio_command().spawn()?; - // On Windows, assign the child to a Job Object so that killing the child also - // kills all descendant processes (e.g., node.exe spawned by a .cmd shim). #[cfg(windows)] - let job; - - let (mut child_stdout, mut child_stderr, mut child_wait) = if path_accesses.is_some() { - // fspy tracking enabled — fspy manages cancellation internally via a clone - // of the token. We keep the original for the pipe read loop. - let mut tracked_child = cmd.spawn(fast_fail_token.clone()).await?; - let stdout = tracked_child.stdout.take().unwrap(); - let stderr = tracked_child.stderr.take().unwrap(); - #[cfg(windows)] - { - use std::os::windows::io::AsRawHandle; - job = super::win_job::assign_to_kill_on_close_job( - tracked_child.process_handle.as_raw_handle(), - )?; - } - (stdout, stderr, ChildWait::Fspy(tracked_child)) - } else { - let mut child = cmd.into_tokio_command().spawn()?; - let stdout = child.stdout.take().unwrap(); - let stderr = child.stderr.take().unwrap(); - #[cfg(windows)] - { - use std::os::windows::io::{AsRawHandle, BorrowedHandle}; - // SAFETY: The child was just spawned, so its raw handle is valid. - let borrowed = unsafe { BorrowedHandle::borrow_raw(child.raw_handle().unwrap()) }; - let owned = borrowed.try_clone_to_owned()?; - job = super::win_job::assign_to_kill_on_close_job(owned.as_raw_handle())?; - } - (stdout, stderr, ChildWait::Tokio(child)) + let job = { + use std::os::windows::io::{AsRawHandle, BorrowedHandle}; + // Duplicate the process handle so the job outlives tokio's handle. + // SAFETY: The child was just spawned, so its raw handle is valid. + let borrowed = unsafe { BorrowedHandle::borrow_raw(child.raw_handle().unwrap()) }; + let owned = borrowed.try_clone_to_owned()?; + super::win_job::assign_to_kill_on_close_job(owned.as_raw_handle())? }; - // Output capturing is independent of fspy tracking - let mut outputs = std_outputs; - let mut stdout_buf = [0u8; 8192]; - let mut stderr_buf = [0u8; 8192]; - let mut stdout_done = false; - let mut stderr_done = false; - - let start = Instant::now(); + let stdout = child.stdout.take(); + let stderr = child.stderr.take(); - // Read from both stdout and stderr concurrently using select! - // Cancellation is handled directly in the loop: kill the child process (and - // on Windows, terminate the Job Object to kill grandchildren holding pipes). - loop { - if stdout_done && stderr_done { - break; - } - tokio::select! { - result = child_stdout.read(&mut stdout_buf), if !stdout_done => { - match result? { - 0 => stdout_done = true, - n => { - let content = stdout_buf[..n].to_vec(); - // Write to the sync writer immediately - stdout_writer.write_all(&content)?; - stdout_writer.flush()?; - // Store outputs for caching - if let Some(outputs) = &mut outputs { - if let Some(last) = outputs.last_mut() - && last.kind == OutputKind::StdOut - { - last.content.extend(&content); - } else { - outputs.push(StdOutput { kind: OutputKind::StdOut, content }); - } - } - } - } - } - result = child_stderr.read(&mut stderr_buf), if !stderr_done => { - match result? { - 0 => stderr_done = true, - n => { - let content = stderr_buf[..n].to_vec(); - // Write to the sync writer immediately - stderr_writer.write_all(&content)?; - stderr_writer.flush()?; - // Store outputs for caching - if let Some(outputs) = &mut outputs { - if let Some(last) = outputs.last_mut() - && last.kind == OutputKind::StdErr - { - last.content.extend(&content); - } else { - outputs.push(StdOutput { kind: OutputKind::StdErr, content }); - } - } - } - } - } - () = fast_fail_token.cancelled() => { - // Kill the direct child (no-op for fspy which handles it internally). - if let ChildWait::Tokio(ref mut child) = child_wait { - let _ = child.start_kill(); - } - // On Windows, terminate the entire process tree so grandchild - // processes release their pipe handles. + let wait = async move { + let exit_status = tokio::select! { + status = child.wait() => status?, + () = cancellation_token.cancelled() => { + child.start_kill()?; + // Eagerly kill descendants; KILL_ON_JOB_CLOSE on drop is a backstop. #[cfg(windows)] job.terminate(); - break; + child.wait().await? } - } + }; + // `job` drops here on Windows, terminating any stragglers. + #[cfg(windows)] + drop(job); + Ok(ChildOutcome { exit_status, path_accesses: None }) } + .boxed_local(); - // Wait for process termination and collect results. - // The child may have closed its pipes without exiting (e.g., daemonized), - // so we still need a cancellation arm here. - match child_wait { - ChildWait::Fspy(tracked_child) => { - // fspy's wait_handle already monitors the cancellation token internally, - // so no additional select! is needed here. - let termination = tracked_child.wait_handle.await?; - let duration = start.elapsed(); - - // path_accesses must be Some when fspy is enabled (they're set together) - let path_accesses = path_accesses.ok_or_else(|| { - anyhow::anyhow!("internal error: fspy enabled but path_accesses is None") - })?; - let path_reads = &mut path_accesses.path_reads; - let path_writes = &mut path_accesses.path_writes; - - for access in termination.path_accesses.iter() { - // Strip workspace root, clean `..` components, and filter in one pass. - // fspy may report paths like `packages/sub-pkg/../shared/dist/output.js`. - let relative_path = access.path.strip_path_prefix(workspace_root, |strip_result| { - let Ok(stripped_path) = strip_result else { - return None; - }; - normalize_tracked_workspace_path(stripped_path, resolved_negatives) - }); + Ok(ChildHandle { stdout, stderr, wait }) +} - let Some(relative_path) = relative_path else { - continue; - }; +#[cfg(unix)] +#[expect( + clippy::unnecessary_wraps, + reason = "signature matches Command::pre_exec's FnMut() -> io::Result<()> contract" +)] +fn clear_stdio_cloexec() -> io::Result<()> { + use std::os::fd::BorrowedFd; - if access.mode.contains(AccessMode::READ) { - path_reads - .entry(relative_path.clone()) - .or_insert(PathRead { read_dir_entries: false }); - } - if access.mode.contains(AccessMode::WRITE) { - path_writes.insert(relative_path.clone()); - } - if access.mode.contains(AccessMode::READ_DIR) { - match path_reads.entry(relative_path) { - Entry::Occupied(mut occupied) => { - occupied.get_mut().read_dir_entries = true; - } - Entry::Vacant(vacant) => { - vacant.insert(PathRead { read_dir_entries: true }); - } - } - } + use nix::{ + fcntl::{FcntlArg, FdFlag, fcntl}, + libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO}, + }; + for fd in [STDIN_FILENO, STDOUT_FILENO, STDERR_FILENO] { + // SAFETY: fds 0-2 are always valid in a post-fork context + let borrowed = unsafe { BorrowedFd::borrow_raw(fd) }; + if let Ok(flags) = fcntl(borrowed, FcntlArg::F_GETFD) { + let mut fd_flags = FdFlag::from_bits_retain(flags); + if fd_flags.contains(FdFlag::FD_CLOEXEC) { + fd_flags.remove(FdFlag::FD_CLOEXEC); + let _ = fcntl(borrowed, FcntlArg::F_SETFD(fd_flags)); } - - tracing::debug!( - "spawn finished, path_reads: {}, path_writes: {}, exit_status: {}", - path_reads.len(), - path_writes.len(), - termination.status, - ); - - Ok(SpawnResult { exit_status: termination.status, duration }) - } - ChildWait::Tokio(mut child) => { - let exit_status = tokio::select! { - status = child.wait() => status?, - () = fast_fail_token.cancelled() => { - child.start_kill()?; - child.wait().await? - } - }; - Ok(SpawnResult { exit_status, duration: start.elapsed() }) } } -} - -#[cfg(test)] -mod tests { - #[cfg(windows)] - use super::*; - - #[cfg(windows)] - #[test] - fn malformed_windows_drive_path_after_workspace_strip_is_ignored() { - #[expect( - clippy::disallowed_types, - reason = "normalize_tracked_workspace_path requires std::path::Path for fspy strip_path_prefix output" - )] - let relative_path = - normalize_tracked_workspace_path(std::path::Path::new(r"foo\C:\bar"), &[]); - assert!(relative_path.is_none()); - } + Ok(()) } diff --git a/crates/vite_task/src/session/execute/tracked_accesses.rs b/crates/vite_task/src/session/execute/tracked_accesses.rs new file mode 100644 index 00000000..79c77f10 --- /dev/null +++ b/crates/vite_task/src/session/execute/tracked_accesses.rs @@ -0,0 +1,122 @@ +//! Normalize raw fspy path accesses into workspace-relative, filtered form. + +use std::collections::hash_map::Entry; + +use fspy::{AccessMode, PathAccessIterable}; +use rustc_hash::FxHashSet; +use vite_path::{AbsolutePath, RelativePathBuf}; + +use crate::collections::HashMap; + +/// Path read access info +#[derive(Debug, Clone, Copy)] +pub struct PathRead { + pub read_dir_entries: bool, +} + +/// Tracked file accesses from fspy, normalized to workspace-relative paths. +#[derive(Default, Debug)] +pub struct TrackedPathAccesses { + /// Tracked path reads + pub path_reads: HashMap, + + /// Tracked path writes + pub path_writes: FxHashSet, +} + +impl TrackedPathAccesses { + /// Build from fspy's raw iterable by stripping the workspace prefix, + /// normalizing `..` components, and filtering against the negative globs. + pub fn from_raw( + raw: &PathAccessIterable, + workspace_root: &AbsolutePath, + resolved_negatives: &[wax::Glob<'static>], + ) -> Self { + let mut accesses = Self::default(); + for access in raw.iter() { + // Strip workspace root, clean `..` components, and filter in one pass. + // fspy may report paths like `packages/sub-pkg/../shared/dist/output.js`. + let relative_path = access.path.strip_path_prefix(workspace_root, |strip_result| { + let Ok(stripped_path) = strip_result else { + return None; + }; + normalize_tracked_workspace_path(stripped_path, resolved_negatives) + }); + + let Some(relative_path) = relative_path else { + continue; + }; + + if access.mode.contains(AccessMode::READ) { + accesses + .path_reads + .entry(relative_path.clone()) + .or_insert(PathRead { read_dir_entries: false }); + } + if access.mode.contains(AccessMode::WRITE) { + accesses.path_writes.insert(relative_path.clone()); + } + if access.mode.contains(AccessMode::READ_DIR) { + match accesses.path_reads.entry(relative_path) { + Entry::Occupied(mut occupied) => { + occupied.get_mut().read_dir_entries = true; + } + Entry::Vacant(vacant) => { + vacant.insert(PathRead { read_dir_entries: true }); + } + } + } + } + accesses + } +} + +#[expect( + clippy::disallowed_types, + reason = "fspy strip_path_prefix exposes std::path::Path; convert to RelativePathBuf immediately" +)] +fn normalize_tracked_workspace_path( + stripped_path: &std::path::Path, + resolved_negatives: &[wax::Glob<'static>], +) -> Option { + // On Windows, paths are possible to be still absolute after stripping the workspace root. + // For example: c:\workspace\subdir\c:\workspace\subdir + // Just ignore those accesses. + let relative = RelativePathBuf::new(stripped_path).ok()?; + + // Clean `..` components — fspy may report paths like + // `packages/sub-pkg/../shared/dist/output.js`. Normalize them for + // consistent behavior across platforms and clean user-facing messages. + let relative = relative.clean().ok()?; + + // Skip .git directory accesses (workaround for tools like oxlint) + if relative.as_path().strip_prefix(".git").is_ok() { + return None; + } + + if !resolved_negatives.is_empty() + && resolved_negatives.iter().any(|neg| wax::Program::is_match(neg, relative.as_str())) + { + return None; + } + + Some(relative) +} + +#[cfg(test)] +mod tests { + #[cfg(windows)] + use super::*; + + #[cfg(windows)] + #[test] + fn malformed_windows_drive_path_after_workspace_strip_is_ignored() { + #[expect( + clippy::disallowed_types, + reason = "normalize_tracked_workspace_path requires std::path::Path for fspy strip_path_prefix output" + )] + let relative_path = + normalize_tracked_workspace_path(std::path::Path::new(r"foo\C:\bar"), &[]); + assert!(relative_path.is_none()); + } +} diff --git a/crates/vite_task/src/session/execute/win_job.rs b/crates/vite_task/src/session/execute/win_job.rs new file mode 100644 index 00000000..659203b1 --- /dev/null +++ b/crates/vite_task/src/session/execute/win_job.rs @@ -0,0 +1,84 @@ +//! Win32 Job Object utilities for process tree management. +//! +//! On Windows, `TerminateProcess` only kills the direct child process, not its +//! descendants. This module creates a Job Object with `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE`, +//! which automatically terminates all processes in the job when the handle is dropped. + +use std::{io, os::windows::io::RawHandle}; + +use winapi::{ + shared::minwindef::FALSE, + um::{ + handleapi::CloseHandle, + jobapi2::{ + AssignProcessToJobObject, CreateJobObjectW, SetInformationJobObject, TerminateJobObject, + }, + winnt::{HANDLE, JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE, JOBOBJECT_EXTENDED_LIMIT_INFORMATION}, + }, +}; + +/// RAII wrapper around a Win32 Job Object `HANDLE` that closes it on drop. +pub(super) struct OwnedJobHandle(HANDLE); + +impl OwnedJobHandle { + /// Immediately terminate all processes in the job. + /// + /// This is needed when pipes to a grandchild process must be closed before + /// the job handle is dropped (e.g., to unblock pipe reads in `spawn`). + pub(super) fn terminate(&self) { + // SAFETY: self.0 is a valid job handle from CreateJobObjectW. + unsafe { TerminateJobObject(self.0, 1) }; + } +} + +impl Drop for OwnedJobHandle { + fn drop(&mut self) { + // SAFETY: self.0 is a valid handle obtained from CreateJobObjectW. + unsafe { CloseHandle(self.0) }; + } +} + +/// Create a Job Object with `KILL_ON_JOB_CLOSE` and assign a process to it. +/// +/// Returns the job handle wrapped in an RAII guard. When dropped, all processes +/// in the job (the child and its descendants) are terminated. +pub(super) fn assign_to_kill_on_close_job(process_handle: RawHandle) -> io::Result { + // SAFETY: Creating an anonymous job object with no security attributes. + let job = unsafe { CreateJobObjectW(std::ptr::null_mut(), std::ptr::null()) }; + if job.is_null() { + return Err(io::Error::last_os_error()); + } + let job = OwnedJobHandle(job); + + // Configure the job to kill all processes when the handle is closed. + // SAFETY: JOBOBJECT_EXTENDED_LIMIT_INFORMATION is a plain C struct (no pointers + // in the zeroed fields). Zeroing then setting LimitFlags is the standard pattern. + let mut info = unsafe { + let mut info: JOBOBJECT_EXTENDED_LIMIT_INFORMATION = std::mem::zeroed(); + info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; + info + }; + + // SAFETY: info is a valid JOBOBJECT_EXTENDED_LIMIT_INFORMATION, job.0 is a valid handle. + let ok = unsafe { + SetInformationJobObject( + job.0, + // JobObjectExtendedLimitInformation = 9 + 9, + std::ptr::from_mut(&mut info).cast(), + std::mem::size_of::().try_into().unwrap(), + ) + }; + if ok == FALSE { + return Err(io::Error::last_os_error()); + } + + // SAFETY: Both handles are valid — job from CreateJobObjectW, process handle + // from the caller. + let ok = unsafe { AssignProcessToJobObject(job.0, process_handle as HANDLE) }; + if ok == FALSE { + return Err(io::Error::last_os_error()); + } + + Ok(job) +} diff --git a/crates/vite_task/src/session/reporter/grouped/mod.rs b/crates/vite_task/src/session/reporter/grouped/mod.rs index 5002a3a0..fd181684 100644 --- a/crates/vite_task/src/session/reporter/grouped/mod.rs +++ b/crates/vite_task/src/session/reporter/grouped/mod.rs @@ -8,8 +8,8 @@ use vite_task_plan::{ExecutionItemDisplay, LeafExecutionKind}; use super::{ ColorizeExt, ExitStatus, GraphExecutionReporter, GraphExecutionReporterBuilder, - LeafExecutionReporter, StdioConfig, StdioSuggestion, format_command_with_cache_status, - format_task_label, write_leaf_trailing_output, + LeafExecutionReporter, PipeWriters, StdioConfig, StdioSuggestion, + format_command_with_cache_status, format_task_label, write_leaf_trailing_output, }; use crate::session::event::{CacheStatus, CacheUpdateStatus, ExecutionError}; @@ -94,8 +94,10 @@ impl LeafExecutionReporter for GroupedLeafReporter { StdioConfig { suggestion: StdioSuggestion::Piped, - stdout_writer: Box::new(GroupedWriter::new(Rc::clone(&buffer))), - stderr_writer: Box::new(GroupedWriter::new(buffer)), + writers: PipeWriters { + stdout_writer: Box::new(GroupedWriter::new(Rc::clone(&buffer))), + stderr_writer: Box::new(GroupedWriter::new(buffer)), + }, } } diff --git a/crates/vite_task/src/session/reporter/interleaved/mod.rs b/crates/vite_task/src/session/reporter/interleaved/mod.rs index 5a30ec5f..e7f17f83 100644 --- a/crates/vite_task/src/session/reporter/interleaved/mod.rs +++ b/crates/vite_task/src/session/reporter/interleaved/mod.rs @@ -7,7 +7,8 @@ use vite_task_plan::{ExecutionItemDisplay, LeafExecutionKind}; use super::{ ExitStatus, GraphExecutionReporter, GraphExecutionReporterBuilder, LeafExecutionReporter, - StdioConfig, StdioSuggestion, format_command_with_cache_status, write_leaf_trailing_output, + PipeWriters, StdioConfig, StdioSuggestion, format_command_with_cache_status, + write_leaf_trailing_output, }; use crate::session::event::{CacheStatus, CacheUpdateStatus, ExecutionError}; @@ -84,8 +85,10 @@ impl LeafExecutionReporter for InterleavedLeafReporter { StdioConfig { suggestion: self.stdio_suggestion, - stdout_writer: Box::new(std::io::stdout()), - stderr_writer: Box::new(std::io::stderr()), + writers: PipeWriters { + stdout_writer: Box::new(std::io::stdout()), + stderr_writer: Box::new(std::io::stderr()), + }, } } diff --git a/crates/vite_task/src/session/reporter/labeled/mod.rs b/crates/vite_task/src/session/reporter/labeled/mod.rs index 01c63435..500e190e 100644 --- a/crates/vite_task/src/session/reporter/labeled/mod.rs +++ b/crates/vite_task/src/session/reporter/labeled/mod.rs @@ -7,7 +7,7 @@ use vite_task_plan::{ExecutionItemDisplay, LeafExecutionKind}; use super::{ ExitStatus, GraphExecutionReporter, GraphExecutionReporterBuilder, LeafExecutionReporter, - StdioConfig, StdioSuggestion, format_command_with_cache_status, format_task_label, + PipeWriters, StdioConfig, StdioSuggestion, format_command_with_cache_status, format_task_label, write_leaf_trailing_output, }; use crate::session::event::{CacheStatus, CacheUpdateStatus, ExecutionError}; @@ -86,14 +86,16 @@ impl LeafExecutionReporter for LabeledLeafReporter { StdioConfig { suggestion: StdioSuggestion::Piped, - stdout_writer: Box::new(LabeledWriter::new( - Box::new(std::io::stdout()), - prefix.as_bytes().to_vec(), - )), - stderr_writer: Box::new(LabeledWriter::new( - Box::new(std::io::stderr()), - prefix.as_bytes().to_vec(), - )), + writers: PipeWriters { + stdout_writer: Box::new(LabeledWriter::new( + Box::new(std::io::stdout()), + prefix.as_bytes().to_vec(), + )), + stderr_writer: Box::new(LabeledWriter::new( + Box::new(std::io::stderr()), + prefix.as_bytes().to_vec(), + )), + }, } } diff --git a/crates/vite_task/src/session/reporter/mod.rs b/crates/vite_task/src/session/reporter/mod.rs index 64f5f441..82724ecd 100644 --- a/crates/vite_task/src/session/reporter/mod.rs +++ b/crates/vite_task/src/session/reporter/mod.rs @@ -96,9 +96,12 @@ pub enum StdioSuggestion { pub struct StdioConfig { /// The reporter's preferred stdio mode. pub suggestion: StdioSuggestion, - /// Writer for the child process's stdout (used in piped mode and cache replay). + /// Writer for the child process's stderr and stdout (used in piped mode and cache replay). + pub writers: PipeWriters, +} + +pub struct PipeWriters { pub stdout_writer: Box, - /// Writer for the child process's stderr (used in piped mode and cache replay). pub stderr_writer: Box, } diff --git a/crates/vite_task/src/session/reporter/plain.rs b/crates/vite_task/src/session/reporter/plain.rs index 860e3817..ce7b7abc 100644 --- a/crates/vite_task/src/session/reporter/plain.rs +++ b/crates/vite_task/src/session/reporter/plain.rs @@ -6,7 +6,7 @@ use std::io::Write; use super::{ - LeafExecutionReporter, StdioConfig, StdioSuggestion, format_cache_hit_message, + LeafExecutionReporter, PipeWriters, StdioConfig, StdioSuggestion, format_cache_hit_message, format_error_message, }; use crate::session::event::{CacheStatus, CacheUpdateStatus, ExecutionError}; @@ -62,14 +62,18 @@ impl LeafExecutionReporter for PlainReporter { if self.silent_if_cache_hit && self.is_cache_hit { StdioConfig { suggestion: StdioSuggestion::Inherited, - stdout_writer: Box::new(std::io::sink()), - stderr_writer: Box::new(std::io::sink()), + writers: PipeWriters { + stdout_writer: Box::new(std::io::sink()), + stderr_writer: Box::new(std::io::sink()), + }, } } else { StdioConfig { suggestion: StdioSuggestion::Inherited, - stdout_writer: Box::new(std::io::stdout()), - stderr_writer: Box::new(std::io::stderr()), + writers: PipeWriters { + stdout_writer: Box::new(std::io::stdout()), + stderr_writer: Box::new(std::io::stderr()), + }, } } }