Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/vite_task/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ serde = { workspace = true, features = ["derive", "rc"] }
serde_json = { workspace = true }
smallvec.workspace = true
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "io-std", "macros", "sync"] }
tokio = { workspace = true, features = ["rt-multi-thread", "io-std", "io-util", "macros", "sync"] }
tracing = { workspace = true }
twox-hash = { workspace = true }
vite_glob = { workspace = true }
Expand Down
6 changes: 0 additions & 6 deletions crates/vite_task/src/session/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@ pub enum ExecutionError {
PostRunFingerprint(#[source] anyhow::Error),
}

#[derive(Debug)]
pub enum OutputKind {
Stdout,
Stderr,
}

#[derive(Debug)]
pub enum CacheDisabledReason {
InProcessExecution,
Expand Down
134 changes: 88 additions & 46 deletions crates/vite_task/src/session/execute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@ pub mod spawn;
use std::{process::Stdio, sync::Arc};

use futures_util::FutureExt;
use tokio::io::AsyncWriteExt as _;
use vite_path::AbsolutePath;
use vite_task_plan::{ExecutionGraph, ExecutionItemKind, LeafExecutionKind, SpawnExecution};
use vite_task_plan::{
ExecutionGraph, ExecutionItemKind, LeafExecutionKind, SpawnCommand, SpawnExecution,
};

use self::{
fingerprint::PostRunFingerprint,
spawn::{OutputKind as SpawnOutputKind, spawn_with_tracking},
spawn::{SpawnResult, spawn_with_tracking},
};
use super::{
cache::{CommandCacheValue, ExecutionCache},
event::{
CacheDisabledReason, CacheErrorKind, CacheNotUpdatedReason, CacheStatus, CacheUpdateStatus,
ExecutionError, OutputKind,
ExecutionError,
},
reporter::{
ExitStatus, GraphExecutionReporter, GraphExecutionReporterBuilder, LeafExecutionPath,
LeafExecutionReporter, StdinSuggestion,
LeafExecutionReporter, StdioSuggestion,
},
};
use crate::{Session, session::execute::spawn::SpawnTrackResult};
Expand Down Expand Up @@ -116,10 +119,13 @@ impl ExecutionContext<'_> {
match leaf_execution_kind {
LeafExecutionKind::InProcess(in_process_execution) => {
// In-process (built-in) commands: caching is disabled, execute synchronously
leaf_reporter.start(CacheStatus::Disabled(CacheDisabledReason::InProcessExecution));
let mut stdio_config = leaf_reporter
.start(CacheStatus::Disabled(CacheDisabledReason::InProcessExecution));

let execution_output = in_process_execution.execute();
leaf_reporter.output(OutputKind::Stdout, execution_output.stdout.into());
// Write output to the stdout writer from StdioConfig
let _ = stdio_config.stdout_writer.write_all(&execution_output.stdout).await;
let _ = stdio_config.stdout_writer.flush().await;

leaf_reporter.finish(
None,
Expand Down Expand Up @@ -147,9 +153,10 @@ impl ExecutionContext<'_> {
///
/// The full lifecycle is:
/// 1. Cache lookup (determines cache status)
/// 2. `leaf_reporter.start(cache_status)`
/// 3. If cache hit: replay cached outputs → finish
/// 4. If cache miss/disabled: spawn process → stream output → update cache → finish
/// 2. `leaf_reporter.start(cache_status)` → `StdioConfig`
/// 3. If cache hit: replay cached outputs via `StdioConfig` writers → finish
/// 4. If `Inherited` suggestion AND caching disabled: `spawn_inherited()` → finish
/// 5. Else (piped): `spawn_with_tracking()` with writers → cache update → finish
///
/// Errors (cache lookup failure, spawn failure, cache update failure) are reported
/// through `leaf_reporter.finish()` and do not abort the caller.
Expand Down Expand Up @@ -197,20 +204,20 @@ pub async fn execute_spawn(
(CacheStatus::Disabled(CacheDisabledReason::NoCacheMetadata), None)
};

// 2. Report execution start with the determined cache status
leaf_reporter.start(cache_status);
// 2. Report execution start with the determined cache status.
// Returns StdioConfig with the reporter's suggestion and async writers.
let mut stdio_config = leaf_reporter.start(cache_status);

// 3. If cache hit, replay outputs and finish early.
// 3. If cache hit, replay outputs via the StdioConfig writers and finish early.
// No need to actually execute the command — just replay what was cached.
if let Some(cached) = cached_value {
for output in cached.std_outputs.iter() {
leaf_reporter.output(
match output.kind {
SpawnOutputKind::StdOut => OutputKind::Stdout,
SpawnOutputKind::StdErr => OutputKind::Stderr,
},
output.content.clone().into(),
);
let writer: &mut (dyn tokio::io::AsyncWrite + Unpin) = match output.kind {
spawn::OutputKind::StdOut => &mut stdio_config.stdout_writer,
spawn::OutputKind::StdErr => &mut stdio_config.stderr_writer,
};
let _ = writer.write_all(&output.content).await;
let _ = writer.flush().await;
}
leaf_reporter.finish(
None,
Expand All @@ -220,41 +227,53 @@ pub async fn execute_spawn(
return SpawnOutcome::CacheHit;
}

// 4. Execute spawn (cache miss or disabled).
// Track file system access if caching is enabled (for future cache updates).
// 4. Determine actual stdio mode based on the suggestion AND cache state.
// Inherited stdio is only used when the reporter suggests it AND caching is
// completely disabled (no cache_metadata). If caching is enabled but missed,
// we still need piped mode to capture output for the cache update.
let use_inherited =
stdio_config.suggestion == StdioSuggestion::Inherited && cache_metadata.is_none();

if use_inherited {
// Inherited mode: all three stdio FDs (stdin, stdout, stderr) are inherited
// from the parent process. No fspy tracking, no output capture.
// Drop the StdioConfig writers before spawning to avoid holding tokio::io::Stdout
// while the child also writes to the same FD.
drop(stdio_config);

match spawn_inherited(&spawn_execution.spawn_command).await {
Ok(result) => {
leaf_reporter.finish(
Some(result.exit_status),
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
None,
);
return SpawnOutcome::Spawned(result.exit_status);
}
Err(err) => {
leaf_reporter.finish(
None,
CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled),
Some(ExecutionError::Spawn(err)),
);
return SpawnOutcome::Failed;
}
}
}

// 5. Piped mode: execute spawn with tracking, streaming output to writers.
let mut track_result_with_cache_metadata =
cache_metadata.map(|cache_metadata| (SpawnTrackResult::default(), cache_metadata));

// Determine the child process's stdin mode based on:
// - The reporter's suggestion (inherited only when appropriate, e.g., single task)
// - Whether caching is disabled (inherited stdin would make output non-deterministic,
// breaking cache semantics)
let stdin = if leaf_reporter.stdin_suggestion() == StdinSuggestion::Inherited
&& cache_metadata.is_none()
{
Stdio::inherit()
} else {
Stdio::null()
};

// Execute command with tracking, streaming output in real-time via the reporter
#[expect(
clippy::large_futures,
reason = "spawn_with_tracking manages process I/O and creates a large future"
)]
let result = match spawn_with_tracking(
&spawn_execution.spawn_command,
cache_base_path,
stdin,
|kind, content| {
leaf_reporter.output(
match kind {
SpawnOutputKind::StdOut => OutputKind::Stdout,
SpawnOutputKind::StdErr => OutputKind::Stderr,
},
content,
);
},
&mut stdio_config.stdout_writer,
&mut stdio_config.stderr_writer,
track_result_with_cache_metadata.as_mut().map(|(track_result, _)| track_result),
)
.await
Expand All @@ -270,7 +289,7 @@ pub async fn execute_spawn(
}
};

// 5. Update cache if successful and determine cache update status.
// 6. Update cache if successful and determine cache update status.
// Errors during cache update are terminal (reported through finish).
let (cache_update_status, cache_error) = if let Some((track_result, cache_metadata)) =
track_result_with_cache_metadata
Expand Down Expand Up @@ -315,14 +334,37 @@ pub async fn execute_spawn(
(CacheUpdateStatus::NotUpdated(CacheNotUpdatedReason::CacheDisabled), None)
};

// 6. Finish the leaf execution with the result and optional cache error.
// 7. Finish the leaf execution with the result and optional cache error.
// Cache update/fingerprint failures are reported but do not affect the outcome —
// the process ran, so we return its actual exit status.
leaf_reporter.finish(Some(result.exit_status), cache_update_status, cache_error);

SpawnOutcome::Spawned(result.exit_status)
}

/// Spawn a command with all three stdio file descriptors inherited from the parent.
///
/// Used when the reporter suggests inherited stdio AND caching is disabled.
/// All three FDs (stdin, stdout, stderr) are inherited, allowing interactive input
/// and direct terminal output. No fspy tracking is performed since there's no
/// cache to update.
///
/// The child process will see `is_terminal() == true` for stdout/stderr when the
/// parent is running in a terminal. This is expected behavior.
async fn spawn_inherited(spawn_command: &SpawnCommand) -> anyhow::Result<SpawnResult> {
let mut cmd = fspy::Command::new(spawn_command.program_path.as_path());
cmd.args(spawn_command.args.iter().map(vite_str::Str::as_str));
cmd.envs(spawn_command.all_envs.iter());
cmd.current_dir(&*spawn_command.cwd);
cmd.stdin(Stdio::inherit()).stdout(Stdio::inherit()).stderr(Stdio::inherit());

let start = std::time::Instant::now();
let mut child = cmd.into_tokio_command().spawn()?;
let exit_status = child.wait().await?;

Ok(SpawnResult { exit_status, duration: start.elapsed() })
}

impl Session<'_> {
/// Execute an execution graph, reporting events through the provided reporter builder.
///
Expand Down
75 changes: 42 additions & 33 deletions crates/vite_task/src/session/execute/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use std::{
};

use bincode::{Decode, Encode};
use bstr::BString;
use fspy::AccessMode;
use rustc_hash::FxHashSet;
use serde::Serialize;
use tokio::io::AsyncReadExt as _;
use tokio::io::{AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
use vite_path::{AbsolutePath, RelativePathBuf};
use vite_task_plan::SpawnCommand;

Expand Down Expand Up @@ -57,28 +56,26 @@ pub struct SpawnTrackResult {
pub path_writes: FxHashSet<RelativePathBuf>,
}

/// Spawn a command with file system tracking via fspy.
/// Spawn a command with file system tracking via fspy, using piped stdio.
///
/// Returns the execution result including captured outputs, exit status,
/// and tracked file accesses.
///
/// - `stdin` controls the child process's stdin (typically `Stdio::null()` or `Stdio::inherit()`).
/// - `on_output` is called in real-time as stdout/stderr data arrives.
/// - stdin is always `/dev/null` (piped mode is for non-interactive execution).
/// - `stdout_writer`/`stderr_writer` receive the child's stdout/stderr output in real-time.
/// - `track_result` if provided, will be populated with captured outputs and path accesses for caching. If `None`, tracking is disabled.
#[expect(clippy::future_not_send, reason = "uses !Send dyn AsyncWrite writers internally")]
#[expect(
clippy::too_many_lines,
reason = "spawn logic is inherently sequential and splitting would reduce clarity"
)]
pub async fn spawn_with_tracking<F>(
pub async fn spawn_with_tracking(
spawn_command: &SpawnCommand,
workspace_root: &AbsolutePath,
stdin: Stdio,
mut on_output: F,
stdout_writer: &mut (dyn AsyncWrite + Unpin),
stderr_writer: &mut (dyn AsyncWrite + Unpin),
track_result: Option<&mut SpawnTrackResult>,
) -> anyhow::Result<SpawnResult>
where
F: FnMut(OutputKind, BString),
{
) -> anyhow::Result<SpawnResult> {
/// The tracking state of the spawned process
enum TrackingState<'a> {
/// Tacking is enabled, with the tracked child and result reference
Expand All @@ -92,7 +89,7 @@ where
cmd.args(spawn_command.args.iter().map(vite_str::Str::as_str));
cmd.envs(spawn_command.all_envs.iter());
cmd.current_dir(&*spawn_command.cwd);
cmd.stdin(stdin).stdout(Stdio::piped()).stderr(Stdio::piped());
cmd.stdin(Stdio::null()).stdout(Stdio::piped()).stderr(Stdio::piped());

let mut tracking_state = if let Some(track_result) = track_result {
// track_result is Some. Spawn with tracking enabled
Expand Down Expand Up @@ -122,37 +119,49 @@ where

let start = Instant::now();

// Helper closure to process output chunks
let mut process_output = |kind: OutputKind, content: Vec<u8>| {
// Emit event immediately
on_output(kind, content.clone().into());

// Store outputs for caching
if let Some(outputs) = &mut outputs {
// Merge consecutive outputs of the same kind for caching
if let Some(last) = outputs.last_mut()
&& last.kind == kind
{
last.content.extend(&content);
} else {
outputs.push(StdOutput { kind, content });
}
}
};

// Read from both stdout and stderr concurrently using select!
loop {
tokio::select! {
result = child_stdout.read(&mut stdout_buf), if !stdout_done => {
match result? {
0 => stdout_done = true,
n => process_output(OutputKind::StdOut, stdout_buf[..n].to_vec()),
n => {
let content = stdout_buf[..n].to_vec();
// Write to the async writer immediately
stdout_writer.write_all(&content).await?;
stdout_writer.flush().await?;
// Store outputs for caching
if let Some(outputs) = &mut outputs {
if let Some(last) = outputs.last_mut()
&& last.kind == OutputKind::StdOut
{
last.content.extend(&content);
} else {
outputs.push(StdOutput { kind: OutputKind::StdOut, content });
}
}
}
}
}
result = child_stderr.read(&mut stderr_buf), if !stderr_done => {
match result? {
0 => stderr_done = true,
n => process_output(OutputKind::StdErr, stderr_buf[..n].to_vec()),
n => {
let content = stderr_buf[..n].to_vec();
// Write to the async writer immediately
stderr_writer.write_all(&content).await?;
stderr_writer.flush().await?;
// Store outputs for caching
if let Some(outputs) = &mut outputs {
if let Some(last) = outputs.last_mut()
&& last.kind == OutputKind::StdErr
{
last.content.extend(&content);
} else {
outputs.push(StdOutput { kind: OutputKind::StdErr, content });
}
}
}
}
}
else => break,
Expand Down
7 changes: 3 additions & 4 deletions crates/vite_task/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,7 @@ impl<'a> Session<'a> {
.await
}
Ok(graph) => {
let builder =
LabeledReporterBuilder::new(std::io::stdout(), self.workspace_path());
let builder = LabeledReporterBuilder::new(self.workspace_path());
Ok(self
.execute_graph(graph, Box::new(builder))
.await
Expand Down Expand Up @@ -392,7 +391,7 @@ impl<'a> Session<'a> {

let cwd = Arc::clone(&self.cwd);
let graph = self.plan_from_cli_run(cwd, run_command).await?;
let builder = LabeledReporterBuilder::new(std::io::stdout(), self.workspace_path());
let builder = LabeledReporterBuilder::new(self.workspace_path());
Ok(self.execute_graph(graph, Box::new(builder)).await.err().unwrap_or(ExitStatus::SUCCESS))
}

Expand Down Expand Up @@ -468,7 +467,7 @@ impl<'a> Session<'a> {
let cache = self.cache()?;

// Create a plain (standalone) reporter — no graph awareness, no summary
let plain_reporter = reporter::PlainReporter::new(std::io::stdout(), silent_if_cache_hit);
let plain_reporter = reporter::PlainReporter::new(silent_if_cache_hit);

// Execute the spawn directly using the free function, bypassing the graph pipeline
match execute::execute_spawn(
Expand Down
Loading