From 196a70667c0950b820c2ac75f97f3bde4aa0215e Mon Sep 17 00:00:00 2001 From: Ben Lovell Date: Fri, 17 Apr 2026 11:22:33 +0200 Subject: [PATCH] fix: ensure tower-runtime setup failures surface a terminal status Several setup error paths in execute_local_app (uv spawn, venv, PYTHONPATH construction, etc.) returned without notifying the status channel, so the app got stuck reporting an unusable state and runs crashed with exit 1. Wrap the sender in a drop guard that always sends a -2 sentinel on failure, distinct from -1 (cancelled). --- crates/tower-runtime/src/local.rs | 42 ++++++++++++--- crates/tower-runtime/tests/local_test.rs | 66 +++++++++++++++++++++++- 2 files changed, 99 insertions(+), 9 deletions(-) diff --git a/crates/tower-runtime/src/local.rs b/crates/tower-runtime/src/local.rs index 707d4dbb..a4d9cb94 100644 --- a/crates/tower-runtime/src/local.rs +++ b/crates/tower-runtime/src/local.rs @@ -49,6 +49,31 @@ pub struct LocalApp { execute_handle: Option>>, } +// Sent when execute_local_app returns Err before producing a real exit code +// (uv not found, venv spawn failed, PYTHONPATH construction failed, etc). +// Distinct from the cancellation sentinel -1. +pub(crate) const SETUP_FAILURE_EXIT_CODE: i32 = -2; + +// Drop guard that ensures the oneshot waiter always receives a code, so `?` +// paths in execute_local_app cannot leave status() stuck on WaiterClosed. +struct StatusReporter { + tx: Option>, +} + +impl StatusReporter { + fn send(&mut self, code: i32) { + if let Some(tx) = self.tx.take() { + let _ = tx.send(code); + } + } +} + +impl Drop for StatusReporter { + fn drop(&mut self) { + self.send(SETUP_FAILURE_EXIT_CODE); + } +} + // Helper function to check if a file is executable async fn is_executable(path: &PathBuf) -> bool { let metadata = match fs::metadata(path).await { @@ -104,6 +129,7 @@ async fn execute_local_app( sx: oneshot::Sender, cancel_token: CancellationToken, ) -> Result<(), Error> { + let mut reporter = StatusReporter { tx: Some(sx) }; let ctx = opts.ctx.clone(); let package = opts.package; let environment = opts.environment; @@ -159,7 +185,7 @@ async fn execute_local_app( if cancel_token.is_cancelled() { // if there's a waiter, we want them to know that the process was cancelled so we have // to return something on the relevant channel. - let _ = sx.send(-1); + reporter.send(-1); return Err(Error::Cancelled); } @@ -176,7 +202,7 @@ async fn execute_local_app( ) .await?; - let _ = sx.send(wait_for_process(ctx.clone(), &cancel_token, child).await); + reporter.send(wait_for_process(ctx.clone(), &cancel_token, child).await); } else { // we put Uv in to protected mode when there's no caching configured/enabled. let protected_mode = opts.cache_dir.is_none(); @@ -198,7 +224,7 @@ async fn execute_local_app( // ensure everything is in place. if cancel_token.is_cancelled() { // again tell any waiters that we cancelled. - let _ = sx.send(-1); + reporter.send(-1); return Err(Error::Cancelled); } @@ -226,7 +252,7 @@ async fn execute_local_app( if res != 0 { // If the venv process failed, we want to return an error. - let _ = sx.send(res); + reporter.send(res); return Err(Error::VirtualEnvCreationFailed); } @@ -234,7 +260,7 @@ async fn execute_local_app( // once started, will take a while and we have logic for checking for cancellation. if cancel_token.is_cancelled() { // again tell any waiters that we cancelled. - let _ = sx.send(-1); + reporter.send(-1); return Err(Error::Cancelled); } @@ -279,7 +305,7 @@ async fn execute_local_app( if res != 0 { // If the sync process failed, we want to return an error. - let _ = sx.send(res); + reporter.send(res); return Err(Error::DependencyInstallationFailed); } } @@ -289,7 +315,7 @@ async fn execute_local_app( if cancel_token.is_cancelled() { // if there's a waiter, we want them to know that the process was cancelled so we have // to return something on the relevant channel. - let _ = sx.send(-1); + reporter.send(-1); return Err(Error::Cancelled); } @@ -312,7 +338,7 @@ async fn execute_local_app( BufReader::new(stderr), )); - let _ = sx.send(wait_for_process(ctx.clone(), &cancel_token, child).await); + reporter.send(wait_for_process(ctx.clone(), &cancel_token, child).await); } // Everything was properly executed I suppose. diff --git a/crates/tower-runtime/tests/local_test.rs b/crates/tower-runtime/tests/local_test.rs index 5dda4d9d..3b83e03e 100644 --- a/crates/tower-runtime/tests/local_test.rs +++ b/crates/tower-runtime/tests/local_test.rs @@ -4,7 +4,8 @@ use std::path::PathBuf; use tower_runtime::{local::LocalApp, App, StartOptions, Status}; use config::Towerfile; -use tower_package::{Package, PackageSpec}; +use tmpdir::TmpDir; +use tower_package::{Manifest, Package, PackageSpec}; use tower_telemetry::{self, debug}; use tokio::sync::mpsc::unbounded_channel; @@ -329,6 +330,69 @@ async fn test_running_app_with_secret() { assert!(status == Status::Exited, "App should be running"); } +#[cfg(unix)] +#[tokio::test] +async fn test_setup_failure_reports_status_instead_of_waiter_closed() { + // Regression: execute_local_app used to bubble up `?` errors (e.g. a failing + // std::env::join_paths) without ever signalling the oneshot waiter, so + // LocalApp::status() returned Err(WaiterClosed) and callers exited 1 + // silently. A setup failure must now surface as a terminal Status. + let tmp = TmpDir::new("test-silent-setup") + .await + .expect("Failed to create tmp dir"); + let unpacked_path: PathBuf = tmp.as_ref().to_path_buf(); + + // On Unix, std::env::join_paths rejects paths containing ':'. Feeding one + // into manifest.import_paths causes the PYTHONPATH construction inside + // execute_local_app to fail via `?` before any subprocess is spawned. + let package = Package { + tmp_dir: None, + package_file_path: None, + unpacked_path: Some(unpacked_path), + manifest: Manifest { + version: Some(1), + invoke: "main.py".to_string(), + parameters: vec![], + schedule: None, + import_paths: vec!["bad:path".to_string()], + app_dir_name: "app".to_string(), + modules_dir_name: "modules".to_string(), + checksum: "".to_string(), + }, + }; + + let (sender, _receiver) = unbounded_channel(); + let opts = StartOptions { + ctx: tower_telemetry::Context::new("runner-id".to_string()), + package, + output_sender: sender, + cwd: None, + environment: "local".to_string(), + secrets: HashMap::new(), + parameters: HashMap::new(), + env_vars: HashMap::new(), + cache_dir: Some(config::default_cache_dir()), + }; + + let app = LocalApp::start(opts).await.expect("Failed to start app"); + + for _ in 0..20 { + let status = app.status().await.expect("status should not be WaiterClosed"); + if status.is_terminal() { + match status { + Status::Crashed { code } => { + assert!(code < 0, "expected negative sentinel, got {}", code); + return; + } + other => panic!("expected Crashed, got {:?}", other), + } + } + tokio::time::sleep(std::time::Duration::from_millis(25)).await; + } + + panic!("status never reached a terminal state"); +} + #[tokio::test] async fn test_abort_on_dependency_installation_failure() { debug!("Running 05-broken-dependencies");